Dstream에서 rdd에서 샘플을 가져오고 싶습니다. d 스트림은 sample()
변환을하지 않으며 내가 그것에 단어 수를 d 스트림에서 샘플을 채취하고 적용하기 위해 이런 짓을 수 있도록 rdds의 순서이므로 :이 코드와Spark Dstream으로 간단한 임의 샘플링을하는 방법 (spark 1.6.1을 사용하는 pyspark)
from pyspark import SparkContext
from pyspark import SparkConf
# Optionally configure Spark Settings
conf=SparkConf()
conf.set("spark.executor.memory", "1g")
conf.set("spark.cores.max", "2")
conf.setAppName("SRS")
sc = SparkContext('local[3]', conf=conf)
from pyspark.streaming import StreamingContext
streamContext = StreamingContext(sc,3)
lines = streamContext.socketTextStream("localhost", 9000)
def sampleWord(rdd):
return rdd.sample(false,0.5,10)
lineSample = lines.foreachRDD(sampleWord)
words = lineSample.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word , 1))
wordCount = pairs.reduceByKey(lambda x, y: x + y)
wordCount.pprint(60)
streamContext.start()
streamContext.stop()
시작 스파크 아무것도 정확히 발생하지 않습니다. 내가 왜 rdd.sample()
이 이런 식으로 작동하지 않는지 모르겠다? foreachRDD
을 사용하면 스트림의 각 rdd에 액세스 할 수 있으므로 지금은 rdd에 특정한 변환을 사용할 수 있다고 생각합니다.
다시 sampleWord => 반환 rdd.sample의 오류 (거짓, 0.5,100) 있다. 그리고 아무 일도 일어나지 않습니다. 스파크가 표본을 계산하는지 아닌지는 모르겠다. – YyAaSs