2014-10-23 3 views
8

컨텍스트 : Apache Spark를 사용하여 로그에서 다른 이벤트 유형의 실행 횟수를 집계합니다. 로그는 과거 분석 목적으로는 Cassandra에, 실시간 분석에는 Kafka에 저장됩니다. 각 로그에는 날짜 및 이벤트 유형이 있습니다. 단순화를 위해 매일 한 가지 유형의 로그 수를 추적하고 싶다고 가정합니다.Apache Spark에서 스트리밍 RDD를 사용하는 배치 RDD의 결과 결합

우리는 두 개의 RDD, 카산드라의 배치 데이터 RDD 및 카프카의 다른 스트리밍 RDD를 가지고 있습니다. 의사 코드 :

CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type"); 

JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() { 
    @Override 
    public Tuple2<String, Integer> call(CassandraRow row) { 
     return new Tuple2<String, Integer>(row.getString("date"), 1); 
    } 
}).reduceByKey(new Function2<Integer, Integer, Integer>() { 
    @Override 
    public Integer call(Integer count1, Integer count2) { 
     return count1 + count2; 
    } 
}); 

save(batchRDD) // Assume this saves the batch RDD somewhere 

... 

// Assume we read a chunk of logs from the Kafka stream every x seconds. 
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(...); 
JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, String>, String, Integer>() { 
    @Override 
    public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) { 
     String jsonString = data._2; 
     JSON jsonObj = JSON.parse(jsonString); 
     Date eventDate = ... // get date from json object 
     // Assume startTime is broadcast variable that is set to the time when the job started. 
     if (eventDate.after(startTime.value())) { 
      ArrayList<Tuple2<String, Integer>> pairs = new ArrayList<Tuple2<String, Integer>>(); 
      pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1)); 
      return pairs; 
     } else { 
      return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs 
     } 
    } 
}).reduceByKey(new Function2<Integer, Integer, Integer>() { 
    @Override 
    public Integer call(Integer count1, Integer count2) { 
     return count1 + count2; 
    } 
}).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() { 
    @Override 
    public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) { 
     Integer previousValue = state.or(0l); 
     Integer currentValue = ... // Sum of counts 
     return Optional.of(previousValue + currentValue); 
    } 
}); 
save(streamRDD); // Assume this saves the stream RDD somewhere 

sc.start(); 
sc.awaitTermination(); 

질문 : 가 어떻게이 batchRDD와 streamRDD에서 결과를 결합 하는가? 의이 batchRDD는 다음과 같은 데이터가 있다고하자이 작업은 2014년 10월 16일에서 실행되었습니다

("2014-10-15", 1000000) 
("2014-10-16", 2000000) 

을 카산드라 쿼리 만, 우리가해야 일괄 쿼리의 시작 시간까지의 모든 데이터를 포함하기 때문에 쿼리가 끝나면 Kafka에서 읽은 다음 작업 시작 시간 이후의 로그 만 고려합니다. 쿼리가 오랜 시간이 걸린다 고 가정합니다. 즉, 과거 결과와 스트리밍 결과를 결합해야합니다.

|------------------------|-------------|--------------|---------> 
tBatchStart    tStreamStart streamBatch1 streamBatch2 

다음 첫 번째 스트림 일괄 우리는이 데이터를 가지고 있다고 가정 : 그림에 대한

나는이 스트림 RDD으로 배치 RDD을 결합하려는 그런

("2014-10-19", 1000) 

를 스트림 있도록 RDD는 이제 값이 있습니다

("2014-10-19", 2001000) 

가 그런 생각을하는 두 번째 스트림 배치 승 그런 다음 스트림 RDD이 값이 업데이트되어야한다

("2014-10-19", 4000) 

: 등등

("2014-10-19", 2005000) 

그리고를 ...

그것은 streamRDD을 결합 streamRDD.transformToPair(...)를 사용하는 것이 가능 전자는이 데이터를 가지고 데이터를 join을 사용하여 batchRDD 데이터와 비교할 수 있지만, 각 스트림 청크에 대해이 작업을 수행하면 모든 스트림 청크에 대해 batchRDD의 카운트를 추가하여 "double counted"상태 값을 추가합니다. 제 1 스트림 덩어리.

답변

4

이 사건을 해결하기를, 나는 노동 조합에게 스트리밍 데이터의 합계를 유지하는 집계 StateDStream의 결과 RDD 기반을 것입니다. 이는 상기 스트리밍 간격에 대해보고 된 데이터에 대한 기준선을 상기 기준 x 번을 카운트하지 않고 효과적으로 제공한다.

샘플 WordCount를 사용하여이 아이디어를 시도했지만 작동합니다.라이브 예를 들어, REPL에 드롭 :

(사용 nc -lk 9876을 별도의 쉘에 socketTextStream에 입력을 제공하기 위해) 나는 이미 그것을 사용하고

import org.apache.spark.SparkConf 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.storage.StorageLevel 

@transient val defaults = List("magic" -> 2, "face" -> 5, "dust" -> 7) 
val defaultRdd = sc.parallelize(defaults) 

@transient val ssc = new StreamingContext(sc, Seconds(10)) 
ssc.checkpoint("/tmp/spark") 

val lines = ssc.socketTextStream("localhost", 9876, StorageLevel.MEMORY_AND_DISK_SER) 
val words = lines.flatMap(_.split(" ")) 
val wordCount = words.map(x => (x, 1)).reduceByKey(_ + _) 
val historicCount = wordCount.updateStateByKey[Int]{(newValues: Seq[Int], runningCount: Option[Int]) => 
    Some(newValues.sum + runningCount.getOrElse(0)) 
} 
val runningTotal = historicCount.transform{ rdd => rdd.union(defaultRdd)}.reduceByKey(_+_) 

wordCount.print() 
historicCount.print() 
runningTotal.print() 
ssc.start() 
+1

감사합니다. 나는'rdd.unft (defaultRdd)'대신에'rdd.leftOuterJoin (defaultRdd)'를 사용하여 끝내기 때문에'runningTotal'에는 변경되지 않은 쌍을 포함하지 않기 때문에 그냥 추가하고 싶습니다. 그런 다음, 값이 변경된 쌍만 저장하면됩니다. – Bobby

0

당신은 updateStateByKey 시도 줄 수 :

def main(args: Array[String]) { 

    val updateFunc = (values: Seq[Int], state: Option[Int]) => { 
     val currentCount = values.foldLeft(0)(_ + _) 
     val previousCount = state.getOrElse(0) 
     Some(currentCount + previousCount) 
    } 

    // stream 
    val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(1)) 
    ssc.checkpoint(".") 
    val lines = ssc.socketTextStream("127.0.0.1", 9999) 
    val words = lines.flatMap(_.split(" ")) 
    val pairs = words.map(word => (word, 1)) 
    val stateWordCounts = pairs.updateStateByKey[Int](updateFunc) 
    stateWordCounts.print() 
    ssc.start() 
    ssc.awaitTermination() 
} 
+0

. 문제는 선택적 상태 값이 null 인 경우 기본값으로 설정해야한다는 것입니다. 이상적으로 이것은 배치 RDD에서 계산 된 값입니다. 문제는'updateStateByKey()'가 키를 전달하지 않기 때문에 일괄 RDD에서 계산 된 값을 조회 할 수 없다는 것입니다. – Bobby

관련 문제