2016-07-10 3 views
0

Pyspark를 사용하여 Elasticsearch에서 데이터를 읽어야합니다. 다음 I가 pyspark에서 장치 흐름을 시도하고 -RDD의 각 요소에 대해 sparkcontext 함수 사용

Ⅰ) rdd1를 생성
II)의 foreach 발생 된-rdd1
의 conf = {rdd1의 요소에 기초하여 동적 값}
rdd2 = sc.newAPIHadoopRDD ( "org.apache.hadoop.io.NullWritable"\ "org.elasticsearch.hadoop.mr.EsInputFormat", "org.elasticsearch.hadoop.mr.LinkedMapWritable"의 conf =의 conf)

"foreach"는 작업자간에 작업을 배포하고 sc.newAPIHadoopRDD를 호출하여 작업자가 sc를 사용할 수 없다는 오류가 발생한다는 것을 알고 있습니다.

위와 같은 다른 방법이 있습니까?
주 - 처리의 나머지 부분은 이에 따라 달라 지므로 "newAPIHadoopRDD"를 사용해야합니다.

+0

예. RDD에서 레코드 당 원하는 것을 출력하기 위해'map()'을 사용하고 결과 RDD에 newAPIHadoopRDD() 메소드를 사용하십시오. –

+0

귀하의 제안에 감사드립니다 Avihoo. 아쉽게도 newAPIHadoopRDD API는 sc에서만 작동합니다. 즉 sc.newAPIHadoopRDD()를 사용해야하며 드라이버에서 실행됩니다. 또한 클러스터에서 실행되기를 원하며 newAPIHadoopRDD()를 직렬화하여 처리자가 병렬 처리되도록하는 방법이 필요하기를 바랬습니다. – Yogesh

답변

0

RDD를 중첩 할 수 없습니다. rdd1의 결과를 반복하려면 먼저 드라이버에 collect을 입력해야합니다.

val rdd1Result = rdd1.collect() 
rdd1Result.foreach { v => 
    val conf = ... 
    sc.newAPIHadoopRDD... 
} 
+0

감사합니다. 귀하의 의견을 감사하십시오. i) "rdd1.collect"는 목록을 반환하지만 ".foreach"는 RDD에서 작동합니다. ii) 확장 성을 위해 작업자 노드에서 실행하고 싶습니다. 그러나 우리는 전체 "rdd1Result"가 드라이버 노드에서만 처리 될 것으로 생각합니다. 작업자 노드에서 병렬로 실행되도록 제안 하시겠습니까? – Yogesh

+0

스칼라리스트도'foreach' 메소드를 가지고 있습니다. 작업자 노드에서 SparkContext에 액세스하는 것은 불가능합니다. – Dikei

+0

예 Dikei. 우리는 작업자 노드에서 SparkContext에 액세스 할 수 없다는 것을 깨달았습니다. 귀하의 의견을 보내 주셔서 감사합니다. – Yogesh

0

foreach 내에서 RDD를 보낼 수 없으므로 시도하지 마십시오. 조인을하지 않는 한 :이 경우 spark는 두 개의 rdd를 처리 할 수 ​​있으며, 생각하면이 모든 것이 필요한 것입니다.

당신이 theta-join을하고있는 것처럼 보입니다. 데이터에 따라 대략적인 값으로 정확한 결합을 사용하여 루프를 우회 할 수 있습니다.

+0

감사 Marmouset. 우리는 우리의 접근 방식을 바꿀 필요가 있음을 깨달았습니다. 결과적으로 우리는 newAPIHadoopRDD()가하는 것과 유사한 것을 수행하는 함수를 만들었습니다. 기본 요구 사항은 우리가 대체 방식으로 수행 한 elasticsearch를 쿼리하는 것이 었습니다. 이를 위해 Hadoop API를 사용했습니다. 이 새로운 함수를 .map을 사용하여 RDD에 전달하여 작업자 노드에서 실행되도록했습니다. 이것은 우리가 원하는 것을 성취하는데 도움이되었습니다. 즉, i) elastic searcharch 질의 ii) 확장 성있는 방식으로 그것을 수행하십시오. – Yogesh

관련 문제