2016-08-11 2 views
0

Spark Kafka 커넥터를 사용하여 Kafka 클러스터에서 데이터를 가져옵니다. 그것으로부터, 나는 데이터를 JavaDStream<String>으로 받고 있습니다. 데이터를 JavaDStream<EventLog>으로 가져 오려면 어떻게해야합니까? EventLog은 Java bean입니까?Spark Kafka Connector에서 객체의 JavaDStream을 얻는 방법은 무엇입니까?

public static JavaDStream<EventLog> fetchAndValidateData(String zkQuorum, String group, Map<String, Integer> topicMap) { 
    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); 
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 
    JavaPairReceiverInputDStream<String, String> messages = 
      KafkaUtils.createStream(jssc, zkQuorum, group, topicMap); 
    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
     @Override 
     public String call(Tuple2<String, String> tuple2) { 
      return tuple2._2(); 
     } 
    }); 
    jssc.start(); 
    jssc.awaitTermination(); 
    return lines; 
} 

내 목표는 카산드라에이 데이터를 저장하는 곳 EventLog와 같은 사양의 테이블. Spark Cassandra 커넥터는 javaFunctions(rdd).writerBuilder("ks", "event", mapToRow(EventLog.class)).saveToCassandra();과 같은 insert 문에 JavaRDD<EventLog>을 허용합니다. 나는 카프카 출신이 JavaRDD<EventLog>을 갖고 싶다.

+0

메시지를 문자열 쌍으로 사용하여 변환하고 싶습니까? 또는 JavaReceiverInputDStream 을 사용 하시겠습니까? 정확히 어디에서 EventLog 유형을 가져 오시겠습니까? Hvae 당신은 EventLog 형식을 받아들이고 JavaDStream을 구축하는 수신기를 정의하려고 시도 했습니까? – Sunny

+0

@Sunny 내 목표는 카산드라에 데이터를 쓰는 것입니다. Spark Cassandra 커넥터는'javaFunctions (rdd) .writerBuilder ("ks", "event", mapToRow (EventLog.class)). saveToCassandra();'와 같은 insert 문에서'JavaRDD '을 허용합니다. 카프카에서이 'JavaRDD '을 얻고 싶습니다. – khateeb

+0

kafka에 해당 EventLog를 쓰는 코드에 액세스 할 수 있습니까? 커스텀 시리얼 라이저가 구현되어 있으며, EventLog가 직렬화되어 Kafka에 EventLog로 작성되어 있습니까? – Sunny

답변

0

오버로드 된 createStream 메서드를 사용하여 키/값 유형 및 디코더 클래스를 전달할 수 있습니다.

예 :

createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class, 
     kafkaParams, topicsMap, StorageLevel.MEMORY_AND_DISK_SER_2()); 

위는 JavaPairDStream<String, EventLog>

JavaDStream<EventLog> lines = messages.map(new Function<Tuple2<String, EventLog>, EventLog>() { 
    @Override 
    public EventLog call(Tuple2<String, EventLog> tuple2) { 
    return tuple2._2(); 
    } 
}); 

에게 kafka.serializer.Decoder을 구현해야하는 EventLogDecoder를 제공해야합니다. 다음은 json 디코더 예제입니다.

public class EventLogDecoder implements Decoder<EventLog> { 

public EventLogDecoder(VerifiableProperties verifiableProperties) { 
} 

@Override 
public EventLog fromBytes(byte[] bytes) { 
    ObjectMapper objectMapper = new ObjectMapper(); 
    try { 
    return objectMapper.readValue(bytes, EventLog.class); 
    } catch (IOException e) { 
    //do something 
    } 
    return null; 
} 
} 
+0

'StringDecoder'에 대한 전체 패키지를 알려주시겠습니까? 'EventLogDecoder'에 어떤 함수가 포함되어 있습니까? – khateeb

+0

'Function'의 세 번째 인수는'Function , EventLog>()'와 같은'EventLog' 여야합니다. – khateeb

+0

예 세 번째 인수가 업데이트되었습니다. 그리고 StringDecoder 패키지는 kafka.serializer.StringDecoder입니다. 예제 디코더를 포함하도록 답변을 업데이트했습니다. –

관련 문제