저는 파이썬에서 csv 데이터를로드하고 SPark Streaming을 통해 각 행 스파크를 스트리밍하려고합니다.파이썬은 CSV 데이터를 보내어 스트리밍을 시작합니다.
메신저 네트워크에 새로운 것들. Im Im가 정확하게 (스파크 스트리밍을 사용하여) 연결을 설정하면 각 행을 보내기 시작할 서버 파이썬 스크립트를 생성해야합니다. Spark Streaming Documentation에서 그들은 nc -l 9999를 수행하는데, 이것은 정확하게 맞으면 포트 9999에서 수신하는 netcat 서버입니다. 그래서 60000
import socket # Import socket module
import csv
port = 60000 # Reserve a port for your service.
s = socket.socket() # Create a socket object
host = socket.gethostname() # Get local machine name
s.bind((host, port)) # Bind to the port
s.listen(5) # Now wait for client connection.
print('Server listening....')
while True:
conn, addr = s.accept() # Establish connection with client.
print('Got connection from', addr)
csvfile = open('Titantic.csv', 'rb')
reader = csv.reader(csvfile, delimiter = ',')
for row in reader:
line = ','.join(row)
conn.send(line)
print(line)
csvfile.close()
print('Done sending')
conn.send('Thank you for connecting')
conn.close()
스파크 스트리밍 스크립트를 CSV를 구문 분석하고 포트에 전송 유사한 파이썬 스크립트를 작성하려고 - 스파크 스크립트를 실행
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)
# Create a DStream that will connect to hostname:port, like localhost:9999
linesRDD = ssc.socketTextStream("localhost", 60000)
# Split each line into words
dataRDD = lines.flatMap(lambda line: line.split(","))
dataRDD.pprint()
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
를 내가 얻을 (이 BTW Jupyter 노트북에) 이 오류 - IllegalArgumentException : '요구 사항이 실패했습니다 : 등록 된 출력 작업이 없으므로 아무 것도 실행되지 않습니다' '
소켓 스크립트를 올바로 수행하고 있다고 생각하지만 실제로 무엇을해야할지 모르겠습니다. 기본적으로 w를 복제하려고합니다. hat nc -lk 9999는 포트를 통해 텍스트 데이터를 전송 한 다음 스트리밍을 듣고 데이터를 수신하고 처리합니다.
은 어떤 도움이 크게