2016-08-18 3 views
0

Dstream에서 rdd에서 샘플을 가져오고 싶습니다. d 스트림은 sample() 변환을하지 않으며 내가 그것에 단어 수를 d 스트림에서 샘플을 채취하고 적용하기 위해 이런 짓을 수 있도록 rdds의 순서이므로 :이 코드와Spark Dstream으로 간단한 임의 샘플링을하는 방법 (spark 1.6.1을 사용하는 pyspark)

from pyspark import SparkContext 
from pyspark import SparkConf 

# Optionally configure Spark Settings 
conf=SparkConf() 
conf.set("spark.executor.memory", "1g") 
conf.set("spark.cores.max", "2") 

conf.setAppName("SRS") 
sc = SparkContext('local[3]', conf=conf) 

from pyspark.streaming import StreamingContext 
streamContext = StreamingContext(sc,3) 
lines = streamContext.socketTextStream("localhost", 9000) 

def sampleWord(rdd): 
    return rdd.sample(false,0.5,10) 


lineSample = lines.foreachRDD(sampleWord) 
words = lineSample.flatMap(lambda line: line.split(" ")) 
pairs = words.map(lambda word: (word , 1)) 
wordCount = pairs.reduceByKey(lambda x, y: x + y) 
wordCount.pprint(60) 


streamContext.start() 
streamContext.stop() 

시작 스파크 아무것도 정확히 발생하지 않습니다. 내가 왜 rdd.sample()이 이런 식으로 작동하지 않는지 모르겠다? foreachRDD을 사용하면 스트림의 각 rdd에 액세스 할 수 있으므로 지금은 rdd에 특정한 변환을 사용할 수 있다고 생각합니다.

답변

0

대신 foreachRDDtransform 사용. 또한 코드에는 오타가 있습니다. 정의되지 않은 전역 이름 '사실'나가서 설명하자면 NameError :

def sampleWord(rdd): 
return rdd.sample(False,0.5,10) //False, not false 

lineSample = lines.transform(sampleWord) 
words = lineSample.flatMap(lambda line: line.split(" ")) 
pairs = words.map(lambda word: (word , 1)) 
wordCount = pairs.reduceByKey(lambda x, y: x + y) 
wordCount.pprint(60) 
0

사용 transform :

lineSample = lines.transform(sampleWord) 
+0

다시 sampleWord => 반환 rdd.sample의 오류 (거짓, 0.5,100) 있다. 그리고 아무 일도 일어나지 않습니다. 스파크가 표본을 계산하는지 아닌지는 모르겠다. – YyAaSs

관련 문제