저는 Spark 2.1을 사용합니다.Java의 구조화 된 스트리밍을 사용하여 Kafka의 레코드를 deserialize하는 방법은 무엇입니까?
나는 Spaf Structured Streaming을 사용하여 카프카에서 레코드를 읽고,이를 역 직렬화하고 이후에 집계를 적용하려고합니다.
나는 다음과 같은 코드가 있습니다 :
SparkSession spark = SparkSession
.builder()
.appName("Statistics")
.getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaUri)
.option("subscribe", "Statistics")
.option("startingOffsets", "earliest")
.load();
df.selectExpr("CAST(value AS STRING)")
내가 원하는 것은 내 객체 대신 String
로 주조로 value
필드를 역 직렬화하는 것입니다.
저는 이것을위한 맞춤 디시리얼라이저가 있습니다.
public StatisticsRecord deserialize(String s, byte[] bytes)
어떻게하면 Java에서이 작업을 수행 할 수 있습니까?
내가 발견 한 유일한 관련 링크는 https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html이지만 이것은 Scala 용입니다.
메시지가 JSON 형식입니까? – abaghel
데이터를 JSON으로 저장하거나 사용자 정의 직렬 변환기를 통해 직렬화 할 수 있습니다. – dchar