컨텍스트 : Apache Spark를 사용하여 로그에서 다른 이벤트 유형의 실행 횟수를 집계합니다. 로그는 과거 분석 목적으로는 Cassandra에, 실시간 분석에는 Kafka에 저장됩니다. 각 로그에는 날짜 및 이벤트 유형이 있습니다. 단순화를 위해 매일 한 가지 유형의 로그 수를 추적하고 싶다고 가정합니다.Apache Spark에서 스트리밍 RDD를 사용하는 배치 RDD의 결과 결합
우리는 두 개의 RDD, 카산드라의 배치 데이터 RDD 및 카프카의 다른 스트리밍 RDD를 가지고 있습니다. 의사 코드 :
CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type");
JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() {
@Override
public Tuple2<String, Integer> call(CassandraRow row) {
return new Tuple2<String, Integer>(row.getString("date"), 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
});
save(batchRDD) // Assume this saves the batch RDD somewhere
...
// Assume we read a chunk of logs from the Kafka stream every x seconds.
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(...);
JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, String>, String, Integer>() {
@Override
public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) {
String jsonString = data._2;
JSON jsonObj = JSON.parse(jsonString);
Date eventDate = ... // get date from json object
// Assume startTime is broadcast variable that is set to the time when the job started.
if (eventDate.after(startTime.value())) {
ArrayList<Tuple2<String, Integer>> pairs = new ArrayList<Tuple2<String, Integer>>();
pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1));
return pairs;
} else {
return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs
}
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
}).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) {
Integer previousValue = state.or(0l);
Integer currentValue = ... // Sum of counts
return Optional.of(previousValue + currentValue);
}
});
save(streamRDD); // Assume this saves the stream RDD somewhere
sc.start();
sc.awaitTermination();
질문 : 가 어떻게이 batchRDD와 streamRDD에서 결과를 결합 하는가? 의이 batchRDD
는 다음과 같은 데이터가 있다고하자이 작업은 2014년 10월 16일에서 실행되었습니다
("2014-10-15", 1000000)
("2014-10-16", 2000000)
을 카산드라 쿼리 만, 우리가해야 일괄 쿼리의 시작 시간까지의 모든 데이터를 포함하기 때문에 쿼리가 끝나면 Kafka에서 읽은 다음 작업 시작 시간 이후의 로그 만 고려합니다. 쿼리가 오랜 시간이 걸린다 고 가정합니다. 즉, 과거 결과와 스트리밍 결과를 결합해야합니다.
|------------------------|-------------|--------------|--------->
tBatchStart tStreamStart streamBatch1 streamBatch2
다음 첫 번째 스트림 일괄 우리는이 데이터를 가지고 있다고 가정 : 그림에 대한
나는이 스트림 RDD으로 배치 RDD을 결합하려는 그런
("2014-10-19", 1000)
를 스트림 있도록 RDD는 이제 값이 있습니다
("2014-10-19", 2001000)
가 그런 생각을하는 두 번째 스트림 배치 승 그런 다음 스트림 RDD이 값이 업데이트되어야한다
("2014-10-19", 4000)
: 등등
("2014-10-19", 2005000)
그리고를 ...
그것은 streamRDD을 결합 streamRDD.transformToPair(...)
를 사용하는 것이 가능 전자는이 데이터를 가지고 데이터를 join
을 사용하여 batchRDD 데이터와 비교할 수 있지만, 각 스트림 청크에 대해이 작업을 수행하면 모든 스트림 청크에 대해 batchRDD의 카운트를 추가하여 "double counted"상태 값을 추가합니다. 제 1 스트림 덩어리.
감사합니다. 나는'rdd.unft (defaultRdd)'대신에'rdd.leftOuterJoin (defaultRdd)'를 사용하여 끝내기 때문에'runningTotal'에는 변경되지 않은 쌍을 포함하지 않기 때문에 그냥 추가하고 싶습니다. 그런 다음, 값이 변경된 쌍만 저장하면됩니다. – Bobby