1
저는 스파크 초보자입니다.스파크 rdd 필드 값을 다른 값으로 대체하십시오.
내가 사용하여 내 elasticsearch 데이터베이스의 첫 번째 RDD의 내용을 볼 수 있습니다:
print(es_rdd.first())
>>>(u'1', {u'name': u'john'})
가 나는 또한 사용하여 내 d 스트림에 필요한 값을 얻을 수는 :
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list":brokers})
name=kvs.map(lambda x: x[1])
name.pprint()
>>>>robert
나는 대체하려는 rdd "name": "john"을 "robert"로 입력 한 다음 saveAsNewAPIHadoopFile()을 사용하여 elasticsearch에 새 rdd를 삽입하십시오.
어떻게하면됩니까? "robert"를 새로운 rdd에 매핑하는 방법이 있습니까? 뭔가 같은 ..
new_rdd=es_rdd.map(lambda item: {item[0]:name})
감사