Pyspark를 사용하여 Elasticsearch에서 데이터를 읽어야합니다. 다음 I가 pyspark에서 장치 흐름을 시도하고 -RDD의 각 요소에 대해 sparkcontext 함수 사용
Ⅰ) rdd1를 생성
II)의 foreach 발생 된-rdd1
의 conf = {rdd1의 요소에 기초하여 동적 값}
rdd2 = sc.newAPIHadoopRDD ( "org.apache.hadoop.io.NullWritable"\ "org.elasticsearch.hadoop.mr.EsInputFormat", "org.elasticsearch.hadoop.mr.LinkedMapWritable"의 conf =의 conf)
"foreach"는 작업자간에 작업을 배포하고 sc.newAPIHadoopRDD를 호출하여 작업자가 sc를 사용할 수 없다는 오류가 발생한다는 것을 알고 있습니다.
위와 같은 다른 방법이 있습니까?
주 - 처리의 나머지 부분은 이에 따라 달라 지므로 "newAPIHadoopRDD"를 사용해야합니다.
예. RDD에서 레코드 당 원하는 것을 출력하기 위해'map()'을 사용하고 결과 RDD에 newAPIHadoopRDD() 메소드를 사용하십시오. –
귀하의 제안에 감사드립니다 Avihoo. 아쉽게도 newAPIHadoopRDD API는 sc에서만 작동합니다. 즉 sc.newAPIHadoopRDD()를 사용해야하며 드라이버에서 실행됩니다. 또한 클러스터에서 실행되기를 원하며 newAPIHadoopRDD()를 직렬화하여 처리자가 병렬 처리되도록하는 방법이 필요하기를 바랬습니다. – Yogesh