2017-04-13 4 views
1

Spark Streaming 작업을 수행하기 위해 Spark 2.1 및 Kafka 0.08.xx를 사용하고 있습니다. 이것은 텍스트 필터링 작업이며, 대부분의 텍스트는 프로세스 중에 필터링됩니다. 나는 첫 번째 방법을 발견Spark Streaming + Kafka에서 foreachRDD가 느린 이유는 무엇입니까?

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) 
val jsonMsg = messages.map(_._2) 
val filteredMsg = jsonMsg.filter(x=>x.contains(TEXT1) && x.contains(TEXT2) && x.contains(TEXT3)) 
  • 가 foreachRDD 기능

    messages.foreachRDD { rdd => 
          val record = rdd.map(_.2).filter(x => x.contains(TEXT1) && 
                   x.contains(TEXT2) && 
                   x.contains(TEXT3))} 
    
  • 를 사용

    1. 이 DirectStream의 출력에 직접 필터링을 수행합니다 나는 두 가지 방법으로 구현 두 번째 방법보다 눈에 띄게 빠르지 만 이것이 일반적인 경우인지는 잘 모르겠습니다.

      방법 1과 방법 2간에 차이가 있습니까?

    답변

    1

    filter은 변형입니다. 변환은 느리게 평가됩니다. 즉, foreachRDD과 같은 작업을 수행하거나 데이터를 쓰는 등의 작업을 수행 할 때까지 아무 것도하지 않습니다.

    따라서 1에서 실제로 일어나지 않으므로 2보다 훨씬 빠릅니다. 어떤 작업을 수행하려면 foreachRDD 액션을 사용하고 있습니다.

    관련 문제