2016-10-06 2 views
0

내 테스트 프레임 워크에서 Spark-MongoDB 커넥터를 설정하려고합니다. 내 StreamingContext은 다음과 같이 설정 :Spark Mongodb Connector 단위 테스트

val records = new ConstantInputDStream(ssc, ssc.sparkContext.makeRDD(seq))

나는이 오류

맞아 본 :

val conf = new SparkConf() 
      .setMaster("local[*]") 
      .setAppName("test") 
      .set("spark.mongodb.input.uri", "mongodb://localhost:27017/testdb.testread") 
      .set("spark.mongodb.output.uri", "mongodb://localhost:27017/testdb.testwrite") 

lazy val ssc = new StreamingContext(conf, Seconds(1))

내가 이런 d 스트림을 설정하려고 할 때마다

java.lang.IllegalStateException : 중지 된 SparkC의 메소드를 호출 할 수 없습니다. 문맥.

컨텍스트가 시작된 후 즉시 중지되는 것처럼 보입니다. 그러나 이유를 알 수 없습니다. 로그는 오류를주지 않습니다. 시작 @ 4858ms osjsServletContextHandler @ 33b85bc {- 51,625 org.spark_project.jetty.util.component.AbstractLifeCycle setStarted :

DEBUG] 2016년 10월 6일 18 : 29이 시동 완료 후 즉시 정지 곳이다/metrics/json, null, AVAILABLE} [WARN] 2016-10-06 18 : 29 : 51,660 org.apache.spark.streaming.StreamingContext logWarning - StreamingContext가 아직 시작되지 않았습니다. [DEBUG] 2016-10-06 18 : 29 : 51,662 org.spark_project.jetty.util.component.AbstractLifeCycle setStopping - 중지하기 [email protected] [DEBUG] 2016-10-06 18 : 29 : 51,664 org.spark_project.jetty.server .Server doStop - 정상 종료 [email protected] by

나는 종료되지 않는 설정 MongoDB의 연결을 제거하고

편집 (나는 :(몽고/쓰기를 읽을 수 없습니다 제외) 다 잘 때 : 이 내가하려고 테스트입니다 몽고에 글쓰기. 그러나 필자가이 시점에 도달하기 전에는 테스트 스위트가 실패합니다.

"read from kafka queue" in new SparkScope{ 

    val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](List("topic"), 
     Map[String, Object](
     "bootstrap.servers"->s"localhost:${kServer.kafkaPort}", 
     "key.deserializer" -> classOf[StringDeserializer], 
     "value.deserializer" -> classOf[StringDeserializer], 
     "group.id" -> "testing", 
     "auto.offset.reset" -> "latest", 
     "enable.auto.commit" -> (false: java.lang.Boolean) 
    ) 
    ) 
) 
    val writeConfig = WriteConfig(Map(
    "collection"->"testcollection", 
    "writeConcern.w"->"majority", 
    "db"->"testdb" 
), Some(WriteConfig(ssc.sparkContext))) 

    stream.map(r => (r.key.toLong, r.value.toLong)) 
    .reduceByKey(_+_) 
    .map{case (k,v) => { 
     val d = new Document() 
     d.put("key", k) 
     d.put("value", v) 
     d 
    }} 
    .foreachRDD(rdd => rdd.saveToMongoDB(writeConfig)) 

    ssc.start 
    (1 until 10).foreach(x => producer.send(KafkaProducerRecord("topic", "1", "1"))) 
    ssc.awaitTerminationOrTimeout(1500) 
    ok 
} 

나는 스칼라 컬렉션에서 스트림을 만들려고 할 때 실패가 여기서 발생은 :

"return a single record with the correct sum" in new SparkScope{ 
    val stream = new ConstantInputDStream(ssc, ssc.sparkContext.makeRDD(seq)) 
    val m = HashMap.empty[Long,Long] 
    FlattenTimeSeries.flatten(stream).foreachRDD(rdd => m ++= rdd.collect()) 
    ssc.start() 
    ssc.awaitTerminationOrTimeout(1500) 
    m.size === 1 and m(1) === 20 
    } 

SparkScope 클래스는 제가 위에서 보여 StreamingContext를 생성하고 테스트 후 ssc.stop()를 호출

+0

아주 이상하게 - 예를 들어 몽고에서 아무 것도하지 않는 경우가 있습니다. 확장 할 수 있습니까? – Ross

답변

1

알았어. 문제는 SparkConf 변수가 lazy으로 선언되지 않았지만 StreamingContext이 선언 된 것입니다. 왜 그렇게 중요한지는 모르겠지만 그렇습니다. 결정된.

+0

안녕하세요 사용자 1748268, 나는 MongoDB에 데이터를 저장하려고했지만 여전히 성공하지 못했습니다. 내가보기 엔 계속 진행할 수 있도록 실행중인 프로젝트의 단순화 된 형식을 공유해 주실 수 있습니까? 미리 감사드립니다, 건배 :) –

+0

안녕하세요 @ Dynamiclemo. 위에서 게시 한 코드는 기본적으로 완전하며 실제로 완벽하게 작동합니다. 내가 겪었던 문제는 specs2와 관련되어있었습니다 (내 범위는 특성 대신 추상 클래스였습니다). 어떤 특별한 문제가 있습니까? 아마 내가 도울 수있어. – Tim

관련 문제