1

나는 spark-streaming에서 몇 개의 탄성 검색 색인으로 저장하려고합니다. <key(index), value> 쌍을 생성합니다. groupByKey를 실행하면 결과는 <key(index), Iterable<value>>이지만, elasticsearch-spark 플러그인을 사용하여 elasticsearch에 저장하려면 JavaRDD<value>으로 값이 필요합니다.반복 실행을 RDD로 변환

목록에서 JavaRDD를 만드는 sparkContext.parallelize (list) 옵션이 있지만 드라이버에서만 실행할 수 있다는 것을 알고 있습니다.

실행 프로그램에서 실행할 수있는 JavaRDD를 만드는 또 다른 옵션이 있습니까? 또는 실행자에서 작동하는 Tuple2<key(index), JavaRDD<value>>을 얻을 수있는 또 다른 방법은 무엇입니까? 드라이버에서 JavaRDD에 대한 Iterator의 스위치 만 만들고 실행 프로그램에서 elasticsearch에 쓰는 플러그인을 어떻게 만들 수 있습니까?

감사합니다,

다니엘라

+0

에 흠이 될 것입니다 다음

JavaPairRDD<Key, Iterable<Value>> pair = ...; JavaRDD<Iterable<Value>> values = pair.map(t2 -> t2._2()); JavaRDD<Value> onlyValues = values.flatMap(it -> it); 

대체 접근 방식처럼 떨어지게을 가질 수 있어야한다 말 것 , AFAIK,'groupByKey'는 여전히'rdd' 인'JavaPairRDD >'을 낳습니다. . 따라서'rdd '의 추가 처리는 드라이버가 아닌 실행 프로그램에서 실행됩니다. –

답변

1

나는 그것이

JavaPairRDD<Key, Iterable<Value>> pair = ...; 
JavaRDD<Key, Value> keyValues = pair.flatMapValues(v1 -> v1); 
JavaRDD<Value> values = keyValues.map(t2 -> t2._2()); 
+0

감사 evgenii, 내가 내 JavaRDD 값의 결과를 foreachRDD JavaRDD 에 <키, Iterable을 > JavaPairRDD에서 얻을 필요가 있기 때문에 = rdd.flatMap ((FlatMapFunction >, 문자열>) tuple2 - > { final list l = Lists.newArrayList(); tuple2._2(). forEach (l :: add); return l; }); 동일한 키와 관련이 있습니까? – Daniela

+0

질문에 대한 오해가있을 것입니다. 나는 나의 대답을 편집 할 것이고, 이번에는 더 나아질 것이기를 희망한다. – evgenii

관련 문제