Spark Kafka 커넥터를 사용하여 Kafka 클러스터에서 데이터를 가져옵니다. 그것으로부터, 나는 데이터를 JavaDStream<String>
으로 받고 있습니다. 데이터를 JavaDStream<EventLog>
으로 가져 오려면 어떻게해야합니까? EventLog
은 Java bean입니까?Spark Kafka Connector에서 객체의 JavaDStream을 얻는 방법은 무엇입니까?
public static JavaDStream<EventLog> fetchAndValidateData(String zkQuorum, String group, Map<String, Integer> topicMap) {
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
jssc.start();
jssc.awaitTermination();
return lines;
}
내 목표는 카산드라에이 데이터를 저장하는 곳 EventLog
와 같은 사양의 테이블. Spark Cassandra 커넥터는 javaFunctions(rdd).writerBuilder("ks", "event", mapToRow(EventLog.class)).saveToCassandra();
과 같은 insert 문에 JavaRDD<EventLog>
을 허용합니다. 나는 카프카 출신이 JavaRDD<EventLog>
을 갖고 싶다.
메시지를 문자열 쌍으로 사용하여 변환하고 싶습니까? 또는 JavaReceiverInputDStream을 사용 하시겠습니까? 정확히 어디에서 EventLog 유형을 가져 오시겠습니까? Hvae 당신은 EventLog 형식을 받아들이고 JavaDStream을 구축하는 수신기를 정의하려고 시도 했습니까? –
Sunny
@Sunny 내 목표는 카산드라에 데이터를 쓰는 것입니다. Spark Cassandra 커넥터는'javaFunctions (rdd) .writerBuilder ("ks", "event", mapToRow (EventLog.class)). saveToCassandra();'와 같은 insert 문에서'JavaRDD'을 허용합니다. 카프카에서이 'JavaRDD '을 얻고 싶습니다. –
khateeb
kafka에 해당 EventLog를 쓰는 코드에 액세스 할 수 있습니까? 커스텀 시리얼 라이저가 구현되어 있으며, EventLog가 직렬화되어 Kafka에 EventLog로 작성되어 있습니까? – Sunny