2016-06-06 3 views
1

저는 파이썬에서 csv 데이터를로드하고 SPark Streaming을 통해 각 행 스파크를 스트리밍하려고합니다.파이썬은 CSV 데이터를 보내어 스트리밍을 시작합니다.

메신저 네트워크에 새로운 것들. Im Im가 정확하게 (스파크 스트리밍을 사용하여) 연결을 설정하면 각 행을 보내기 시작할 서버 파이썬 스크립트를 생성해야합니다. Spark Streaming Documentation에서 그들은 nc -l 9999를 수행하는데, 이것은 정확하게 맞으면 포트 9999에서 수신하는 netcat 서버입니다. 그래서 60000

import socket     # Import socket module 
import csv 

port = 60000     # Reserve a port for your service. 
s = socket.socket()    # Create a socket object 
host = socket.gethostname()  # Get local machine name 
s.bind((host, port))   # Bind to the port 
s.listen(5)      # Now wait for client connection. 

print('Server listening....') 

while True: 
    conn, addr = s.accept()  # Establish connection with client. 
    print('Got connection from', addr) 



    csvfile = open('Titantic.csv', 'rb') 

    reader = csv.reader(csvfile, delimiter = ',') 
    for row in reader: 
     line = ','.join(row) 

     conn.send(line) 
     print(line) 

    csvfile.close() 

    print('Done sending') 
    conn.send('Thank you for connecting') 
    conn.close() 

스파크 스트리밍 스크립트를 CSV를 구문 분석하고 포트에 전송 유사한 파이썬 스크립트를 작성하려고 - 스파크 스크립트를 실행

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
ssc = StreamingContext(sc, 1) 

# Create a DStream that will connect to hostname:port, like localhost:9999 
linesRDD = ssc.socketTextStream("localhost", 60000) 

# Split each line into words 
dataRDD = lines.flatMap(lambda line: line.split(",")) 

dataRDD.pprint() 

ssc.start()    # Start the computation 
ssc.awaitTermination() # Wait for the computation to terminate 

를 내가 얻을 (이 BTW Jupyter 노트북에) 이 오류 - IllegalArgumentException : '요구 사항이 실패했습니다 : 등록 된 출력 작업이 없으므로 아무 것도 실행되지 않습니다' '

소켓 스크립트를 올바로 수행하고 있다고 생각하지만 실제로 무엇을해야할지 모르겠습니다. 기본적으로 w를 복제하려고합니다. hat nc -lk 9999는 포트를 통해 텍스트 데이터를 전송 한 다음 스트리밍을 듣고 데이터를 수신하고 처리합니다.

은 어떤 도움이 크게

답변

0

나는 비슷한 할 노력하고있어 감사합니다,하지만 난 행마다 10 초 스트리밍 할 것입니다. 이 스크립트로 해결했습니다 :

import socket 
from time import sleep 

host = 'localhost' 
port = 12345 

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
s.bind((host, port)) 
s.listen(1) 
while True: 
    print('\nListening for a client at',host , port) 
    conn, addr = s.accept() 
    print('\nConnected by', addr) 
    try: 
     print('\nReading file...\n') 
     with open('iris_test.csv') as f: 
      for line in f: 
       out = line.encode('utf-8') 
       print('Sending line',line) 
       conn.send(out) 
       sleep(10) 
      print('End Of Stream.') 
    except socket.error: 
     print ('Error Occured.\n\nClient disconnected.\n') 
conn.close() 

희망이 도움이됩니다.