2016-09-12 2 views
5

TL; 저는 PySpark 애플리케이션에서 문자열의 DStream처럼 보입니다. Scala 라이브러리에 DStream[String]으로 보내고 싶습니다. 하지만 문자열은 Py4j에 의해 변환되지 않습니다.스칼라로 PySpark RDD 변환하기

저는 Spark Streaming을 사용하여 카프카에서 데이터를 가져 오는 PySpark 응용 프로그램을 만들고 있습니다. 내 메시지는 문자열이고 스칼라 코드로 메서드를 호출하여 DStream[String] 인스턴스를 전달하고 싶습니다. 그러나 스칼라 코드에 적절한 JVM 문자열을받을 수 없습니다. 파이썬 문자열이 자바 문자열로 변환되지 않고 대신 직렬화 된 것처럼 보입니다.

내 질문은 : DStream 개체에서 자바 문자열을 가져 오는 방법은 무엇입니까?

from pyspark.streaming import StreamingContext 
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1)) 

from pyspark.streaming.kafka import KafkaUtils 
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"}) 
values = stream.map(lambda tuple: tuple[1]) 

ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream) 

ssc.start() 

내 JAR에에게 경로를 통과, PySpark이 코드를 실행 해요 :

pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar 

여기에


내가 생각 해낸 간단한 파이썬 코드 스칼라 쪽, 내가 가지고있다 :

package com.seigneurin 

import org.apache.spark.streaming.api.java.JavaDStream 

object MyPythonHelper { 
    def doSomething(jdstream: JavaDStream[String]) = { 
    val dstream = jdstream.dstream 
    dstream.foreachRDD(rdd => { 
     rdd.foreach(println) 
    }) 
    } 
} 

아니요 승,의 내가 카프카로 일부 데이터를 보낼 가정 해 봅시다 :

echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN 

과 같은 스칼라 코드를 인쇄 뭔가에 println 문 : 내가 대신 foo bar을받을 것으로 예상

[[email protected] 

. 나는 다음과 스칼라 코드에서 간단한 println 문을 교체 할 경우

이제 :

rdd.foreach(v => println(v.getClass.getCanonicalName)) 

를 내가 얻을 :

java.lang.ClassCastException: [B cannot be cast to java.lang.String 

이 문자열이 실제로 바이트의 배열로 전달됩니다 제안 . 간단히 말해서 나는 (내가 심지어 인코딩을 지정하고 있지 않다 알고) 문자열로 바이트의 배열을 변환하려고하면

것은 :

 def doSomething(jdstream: JavaDStream[Array[Byte]]) = { 
     val dstream = jdstream.dstream 
     dstream.foreachRDD(rdd => { 
      rdd.foreach(bytes => println(new String(bytes))) 
     }) 
     } 

내가 같이 보이는 뭔가를 얻을 수 (특수 문자는 다음과 같을 수 있습니다 제거됨) :

�]qXfoo barqa. 

이는 파이썬 문자열이 일련 번호가 지정된 (절어 내기?) 것을 의미합니다. 대신 적절한 Java 문자열을 검색 할 수 있습니까?

답변

6

길게도 짧은 것을 지원하는 방법은 없습니다. 프로덕션 환경에서이 작업을 시도하지 마십시오. 너는 경고 당했다.

일반적으로 Spark는 드라이버의 기본 RPC 호출 이외에는 Py4j를 사용하지 않으며 다른 시스템에서 Py4j 게이트웨이를 시작하지 않습니다.필요한 경우 (대부분 MLlib 및 일부 SQL) Spark은 Pyrolite을 사용하여 JVM과 Python간에 전달되는 객체를 직렬화합니다.

이 부분의 API는 비공개 (스칼라) 또는 내부 (Python)이며 일반적인 용도로 사용되지 않습니다. 당신도 배치 당 어쨌든 액세스 이론적 동안 :

package dummy 

import org.apache.spark.api.java.JavaRDD 
import org.apache.spark.streaming.api.java.JavaDStream 
import org.apache.spark.sql.DataFrame 

object PythonRDDHelper { 
    def go(rdd: JavaRDD[Any]) = { 
    rdd.rdd.collect { 
     case s: String => s 
    }.take(5).foreach(println) 
    } 
} 

전체 스트림 :

object PythonDStreamHelper { 
    def go(stream: JavaDStream[Any]) = { 
    stream.dstream.transform(_.collect { 
     case s: String => s 
    }).print 
    } 
} 

또는 DataFrames 개별 배치 (아마도 가장 사악한 옵션) 노출 :

object PythonDataFrameHelper { 
    def go(df: DataFrame) = { 
    df.show 
    } 
} 

및 사용을 이러한 래퍼는 다음과 같습니다.

from pyspark.streaming import StreamingContext 
from pyspark.mllib.common import _to_java_object_rdd 
from pyspark.rdd import RDD 

ssc = StreamingContext(spark.sparkContext, 10) 
spark.catalog.listTables() 

q = ssc.queueStream([sc.parallelize(["foo", "bar"]) for _ in range(10)]) 

# Reserialize RDD as Java RDD<Object> and pass 
# to Scala sink (only for output) 
q.foreachRDD(lambda rdd: ssc._jvm.dummy.PythonRDDHelper.go(
    _to_java_object_rdd(rdd) 
)) 

# Reserialize and convert to JavaDStream<Object> 
# This is the only option which allows further transformations 
# on DStream 
ssc._jvm.dummy.PythonDStreamHelper.go(
    q.transform(lambda rdd: RDD( # Reserialize but keep as Python RDD 
     _to_java_object_rdd(rdd), ssc.sparkContext 
    ))._jdstream 
) 

# Convert to DataFrame and pass to Scala sink. 
# Arguably there are relatively few moving parts here. 
q.foreachRDD(lambda rdd: 
    ssc._jvm.dummy.PythonDataFrameHelper.go(
     rdd.map(lambda x: (x,)).toDF()._jdf 
    ) 
) 

ssc.start() 
ssc.awaitTerminationOrTimeout(30) 
ssc.stop() 

이것은 지원되지 않으며 테스트되지 않았으므로 Spark API를 사용한 실험 이외의 다른 용도에서는 오히려 쓸모가 없습니다.

+1

완벽하고 명확하며 매우 유용합니다. 감사! –

+0

도와 드릴 수있어서 기쁩니다. 아마도 여기에 약간 과장했을 것입니다. 당신의 목표가 언어 독립적 인 확장을 구축하는 것이라면, 개발자는 내부적으로 땜질을 피할 수는 없지만 개발자는 여기에서 의식적인 결정을 내렸고 그걸로 어지럽 혀지는 것은 희소 한 마음이 아닙니다. – zero323

+0

안녕하세요 @ zero323 내가 여기에 같은 과정을하고 있지만, 과정에서 큰 문제가 발생, 나는 kerberized 카프카와 파이썬 응용 프로그램을 통신하는 개체를 만듭니다. 그러나 객체를 만들 때 spark의 jvm이 객체에서 내 함수를 찾을 수 없습니다. 클래스를 만들면 클래스를 찾습니다. 그러나 오류로 인해 rdd 객체를 보낼 수 없습니다 :'pyKafka ([org.apache.spark.api.java.JavaRDD, class java.lang.String]) 존재하지 않습니다.' 무엇이 worng 일 수 있 었는가? –