2

스파크 스트리밍 1.1.0을 로컬로 사용하고 있습니다 (클러스터가 아님). 데이터 (약 10.000 개의 항목)를 구문 분석하여 스트림에 저장 한 다음 일부 변환을 수행하는 간단한 응용 프로그램을 만들었습니다.스파크 스트리밍 성능이 느립니다.

def main(args : Array[String]){ 

    val master = "local[8]" 
    val conf = new SparkConf().setAppName("Tester").setMaster(master) 
    val sc = new StreamingContext(conf, Milliseconds(110000)) 

    val stream = sc.receiverStream(new MyReceiver("localhost", 9999)) 

    val parsedStream = parse(stream) 

    parsedStream.foreachRDD(rdd => 
     println(rdd.first()+"\nRULE STARTS "+System.currentTimeMillis())) 

    val result1 = parsedStream 
     .filter(entry => entry.symbol.contains("walking") 
     && entry.symbol.contains("true") && entry.symbol.contains("id0")) 
     .map(_.time) 

    val result2 = parsedStream 
     .filter(entry => 
     entry.symbol == "disappear" && entry.symbol.contains("id0")) 
     .map(_.time) 

    val result3 = result1 
     .transformWith(result2, (rdd1, rdd2: RDD[Int]) => rdd1.subtract(rdd2)) 

    result3.foreachRDD(rdd => 
    println(rdd.first()+"\nRULE ENDS "+System.currentTimeMillis())) 

    sc.start() 
    sc.awaitTermination() 
} 

def parse(stream: DStream[String]) = { 

    stream.flatMap { line => 
     val entries = line.split("assert").filter(entry => !entry.isEmpty) 
     entries.map { tuple => 

      val pattern = """\s*[(](.+)[,]\s*([0-9]+)+\s*[)]\s*[)]\s*[,|\.]\s*""".r 

      tuple match { 
       case pattern(symbol, time) => 
       new Data(symbol, time.toInt) 
      } 
     } 
    } 
} 

case class Data (symbol: String, time: Int) 

나는 하나 개의 배치에서 모든 데이터를 수신하기 위해 110.000 밀리 세컨드의 배치 기간을 가지고 : 여기에 코드입니다. 나는 로컬에서도 스파크가 매우 빠르다고 믿었다. 이 경우 규칙 실행 ("규칙 스타트"와 "규칙 종료"사이)에는 약 3.5 초가 걸립니다. 틀린 일을하고 있습니까? 아니면 예상되는 시간입니까? 모든 조언

답변

0

그래서 내 일의 할당에 일치하는 대/소문자를 사용하여 json 파서를 도입했을 때보 다 성능이 떨어졌습니다. 또한 StreamingContext에서 배치 시간을 조정 해보십시오. 그것은 나를 위해 상당한 차이를 만들었다. 또한 얼마나 많은 현지 노동자가 있습니까?

관련 문제