2017-05-12 4 views
4

저는 Spark 2.1을 사용합니다.Java의 구조화 된 스트리밍을 사용하여 Kafka의 레코드를 deserialize하는 방법은 무엇입니까?

나는 Spaf Structured Streaming을 사용하여 카프카에서 레코드를 읽고,이를 역 직렬화하고 이후에 집계를 적용하려고합니다.

나는 다음과 같은 코드가 있습니다 :

SparkSession spark = SparkSession 
      .builder() 
      .appName("Statistics") 
      .getOrCreate(); 

    Dataset<Row> df = spark 
      .readStream() 
      .format("kafka") 
      .option("kafka.bootstrap.servers", kafkaUri) 
      .option("subscribe", "Statistics") 
      .option("startingOffsets", "earliest") 
      .load(); 

    df.selectExpr("CAST(value AS STRING)") 

내가 원하는 것은 내 객체 대신 String로 주조로 value 필드를 역 직렬화하는 것입니다.

저는 이것을위한 맞춤 디시리얼라이저가 있습니다.

public StatisticsRecord deserialize(String s, byte[] bytes) 

어떻게하면 Java에서이 작업을 수행 할 수 있습니까?


내가 발견 한 유일한 관련 링크는 https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html이지만 이것은 Scala 용입니다.

+0

메시지가 JSON 형식입니까? – abaghel

+0

데이터를 JSON으로 저장하거나 사용자 정의 직렬 변환기를 통해 직렬화 할 수 있습니다. – dchar

답변

2

JSON 메시지의 스키마를 정의하십시오.

StructType schema = DataTypes.createStructType(new StructField[] { 
       DataTypes.createStructField("Id", DataTypes.IntegerType, false), 
       DataTypes.createStructField("Name", DataTypes.StringType, false), 
       DataTypes.createStructField("DOB", DataTypes.DateType, false) }); 

이제 다음과 같은 메시지가 표시됩니다. MessageData는 JSON 메시지의 JavaBean입니다. 당신이 당신의 데이터에 대한 자바에서 사용자 지정 디시리얼라이저가있는 경우

Dataset<MessageData> df = spark 
      .readStream() 
      .format("kafka") 
      .option("kafka.bootstrap.servers", kafkaUri) 
      .option("subscribe", "Statistics") 
      .option("startingOffsets", "earliest") 
      .load() 
      .selectExpr("CAST(value AS STRING) as message") 
      .select(functions.from_json(functions.col("message"),schema).as("json")) 
      .select("json.*") 
      .as(Encoders.bean(MessageData.class)); 
+1

스키마가 올바르게 적용되었지만 모든 열에 대해 null 값이 나타납니다. df.createOrReplaceTempView ("data")로 열을 읽으려고합니다. StreamingQuery query = spark.sql ("SELECT * FROM data"). writeStream(). format ("console"). start(); 내가 뭔가 잘못하고 있는거야? – dchar

+0

아래와 같이 Dataset df를 직접 읽을 수 있습니다. df.writeStream(). 형식 ("콘솔"). start(); – abaghel

+1

이것은 똑같은 결과를 만들어 냈습니다. 모든 열에 "null"이있는 상위 20 개의 행이 표시됩니다. – dchar

2

, 당신은 load 후 카프카에서 얻을 바이트에 그것을 사용할 수 있습니다.

df.select("value") 

그 선은 단지 하나의 열 value 당신에게 Dataset<Row>을 제공합니다. 나는 스칼라에 대한 스파크 API와 독점적 해요


은 그래서 스칼라에서 다음을 처리 할 수있는 "직렬화"경우 할 거라고 : 당신이 원하는 무엇을 제공해야

import org.apache.spark.sql.Encoders 
implicit val statisticsRecordEncoder = Encoders.product[StatisticsRecord] 
val myDeserializerUDF = udf { bytes => deserialize("hello", bytes) } 
df.select(myDeserializerUDF($"value") as "value_des") 

을 ... 스칼라에서. 그것을 Java로 변환하는 것은 가정에서의 훈련입니다.

사용자 정의 개체에 사용 가능한 인코더가 있어야하거나 Spark SQL이 해당 개체를 데이터 집합 안에 넣지 못하도록주의하십시오.

관련 문제