2016-12-30 2 views
1

저는 스파크 초보자입니다.스파크 rdd 필드 값을 다른 값으로 대체하십시오.

내가 사용하여 내 elasticsearch 데이터베이스의 첫 번째 RDD의 내용을 볼 수 있습니다

:

print(es_rdd.first()) 
>>>(u'1', {u'name': u'john'}) 

가 나는 또한 사용하여 내 d 스트림에 필요한 값을 얻을 수는 :

kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list":brokers}) 
name=kvs.map(lambda x: x[1]) 
name.pprint() 
>>>>robert 

나는 대체하려는 rdd "name": "john"을 "robert"로 입력 한 다음 saveAsNewAPIHadoopFile()을 사용하여 elasticsearch에 새 rdd를 삽입하십시오.

어떻게하면됩니까? "robert"를 새로운 rdd에 매핑하는 방법이 있습니까? 뭔가 같은 ..

new_rdd=es_rdd.map(lambda item: {item[0]:name}) 

감사

답변

2

우리는 지수의 목록에 따라 다른 RDD와 RDD의 일부를 대체 할 수 있습니다. 예를 들어, 1,2,3,4에서 2,3,4,4로 (RDD)의 요소를 대체하십시오.

a = sc.parallelize([1,2,3,4]) 
repVals = sc.parallelize([2,3,4]) 
idx = sc.parallelize([0,1,2]) . # idx has the same number of values with repVals 

a = a.zipWithIndex() 
ref = idx.zip(repVals).collectAsMap() # create a dictionary of format {idex:repValue} 

anew = a.map(lambda x:ref[x[1]] if x[1] in ref else x[0]) 
anew.collect() 

결과에 [2,3,4,4]

관련 문제