TL; 저는 PySpark 애플리케이션에서 문자열의 DStream처럼 보입니다. Scala 라이브러리에 DStream[String]
으로 보내고 싶습니다. 하지만 문자열은 Py4j에 의해 변환되지 않습니다.스칼라로 PySpark RDD 변환하기
저는 Spark Streaming을 사용하여 카프카에서 데이터를 가져 오는 PySpark 응용 프로그램을 만들고 있습니다. 내 메시지는 문자열이고 스칼라 코드로 메서드를 호출하여 DStream[String]
인스턴스를 전달하고 싶습니다. 그러나 스칼라 코드에 적절한 JVM 문자열을받을 수 없습니다. 파이썬 문자열이 자바 문자열로 변환되지 않고 대신 직렬화 된 것처럼 보입니다.
내 질문은 : DStream
개체에서 자바 문자열을 가져 오는 방법은 무엇입니까?
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))
from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])
ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)
ssc.start()
내 JAR에에게 경로를 통과, PySpark이 코드를 실행 해요 :
pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar
여기에
내가 생각 해낸 간단한 파이썬 코드 스칼라 쪽, 내가 가지고있다 :
package com.seigneurin
import org.apache.spark.streaming.api.java.JavaDStream
object MyPythonHelper {
def doSomething(jdstream: JavaDStream[String]) = {
val dstream = jdstream.dstream
dstream.foreachRDD(rdd => {
rdd.foreach(println)
})
}
}
아니요 승,의 내가 카프카로 일부 데이터를 보낼 가정 해 봅시다 :
echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN
과 같은 스칼라 코드를 인쇄 뭔가에 println
문 : 내가 대신 foo bar
을받을 것으로 예상
[[email protected]
. 나는 다음과 스칼라 코드에서 간단한 println
문을 교체 할 경우
이제 :
rdd.foreach(v => println(v.getClass.getCanonicalName))
를 내가 얻을 :
java.lang.ClassCastException: [B cannot be cast to java.lang.String
이 문자열이 실제로 바이트의 배열로 전달됩니다 제안 . 간단히 말해서 나는 (내가 심지어 인코딩을 지정하고 있지 않다 알고) 문자열로 바이트의 배열을 변환하려고하면
것은 :
이 def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
val dstream = jdstream.dstream
dstream.foreachRDD(rdd => {
rdd.foreach(bytes => println(new String(bytes)))
})
}
내가 같이 보이는 뭔가를 얻을 수 (특수 문자는 다음과 같을 수 있습니다 제거됨) :
�]qXfoo barqa.
이는 파이썬 문자열이 일련 번호가 지정된 (절어 내기?) 것을 의미합니다. 대신 적절한 Java 문자열을 검색 할 수 있습니까?
완벽하고 명확하며 매우 유용합니다. 감사! –
도와 드릴 수있어서 기쁩니다. 아마도 여기에 약간 과장했을 것입니다. 당신의 목표가 언어 독립적 인 확장을 구축하는 것이라면, 개발자는 내부적으로 땜질을 피할 수는 없지만 개발자는 여기에서 의식적인 결정을 내렸고 그걸로 어지럽 혀지는 것은 희소 한 마음이 아닙니다. – zero323
안녕하세요 @ zero323 내가 여기에 같은 과정을하고 있지만, 과정에서 큰 문제가 발생, 나는 kerberized 카프카와 파이썬 응용 프로그램을 통신하는 개체를 만듭니다. 그러나 객체를 만들 때 spark의 jvm이 객체에서 내 함수를 찾을 수 없습니다. 클래스를 만들면 클래스를 찾습니다. 그러나 오류로 인해 rdd 객체를 보낼 수 없습니다 :'pyKafka ([org.apache.spark.api.java.JavaRDD, class java.lang.String]) 존재하지 않습니다.' 무엇이 worng 일 수 있 었는가? –