2016-10-17 1 views
2

Spark Streaming에서 새 메시지가 수신 될 때마다 모델이이 새 메시지를 기반으로 sth를 예측하는 데 사용됩니다. 그러나 시간이 지남에 따라, 모델은 어떤 이유로 변경 될 수 있으므로 새로운 메시지가 온다 때마다 나는 모델을 다시로드 할. 나는이 코드를 실행하면 내 코드가,이[Spark Streaming] 새 메시지가 올 때마다 모델을로드하는 방법은 무엇입니까?

def loadingModel(@transient sc:SparkContext)={ 
    val model=LogisticRegressionModel.load(sc, "/home/zefu/BIA800/LRModel") 
    model 
} 

var error=0.0 
var size=0.0 
implicit def bool2int(b:Boolean) = if (b) 1 else 0 
def updateState(batchTime: Time, key: String, value: Option[String], state: State[Array[Double]]): Option[(String, Double,Double)] = { 
    val model=loadingModel(sc) 
    val parts = value.getOrElse("0,0,0,0").split(",").map { _.toDouble } 
    val pairs = LabeledPoint(parts(0), Vectors.dense(parts.tail)) 
    val prediction = model.predict(pairs.features) 
    val wrong= prediction != pairs.label 
    error = state.getOption().getOrElse(Array(0.0,0.0))(0) + 1.0*(wrong:Int) 
    size=state.getOption().getOrElse(Array(0.0,0.0))(1) + 1.0 
    val output = (key, error,size) 
    state.update(Array(error,size)) 
    Some(output) 
} 
val stateSpec = StateSpec.function(updateState _) 
    .numPartitions(1) 
setupLogging() 
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092") 
val topics = List("test").toSet 
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topics).mapWithState(stateSpec) 

처럼 보인다 이 같은 예외가 될 것입니다.

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 

자세한 정보가 필요하면 알려주십시오. 감사합니다.

+0

전체 스택 추적을 게시 할 수 있습니까? 또한지도, 필터 같은 스파크 변환 내에서 직렬화되지 않는 클래스를 사용하려고합니까? – Shankar

+0

@Shankar 안녕하세요,'loadMode'l을 정의하지 않고'updateState' 외부에 모델 ('val model = LogisticRegressionModel.load (sc, "/ home/zefu/BIA800/LRModel")')을로드하면 잘 동작합니다. 나는 문제가'sc'와 함께 오는 것 같아 –

+0

@Shankar와 내가 거기에 더 많은 코드를 추가했다 : P –

답변

0

DStream 함수 내에서 모델을 사용할 때 spark가 컨텍스트 개체를 직렬화하는 것처럼 보이며 (모델의로드 함수가 sc를 사용하기 때문에) 컨텍스트 개체가 직렬화되지 않기 때문에 실패합니다. 한 가지 해결 방법은 DStream을 RDD로 변환하고 결과를 수집 한 다음 드라이버에서 모델 예측/채점을 실행하는 것입니다.

스트리밍을 시뮬레이트하기 위해 netcat 유틸리티를 사용했습니다. 다음 코드를 사용하여 DStream을 RDD로 변환했는데 작동합니다. 도움이되는지 확인하십시오.

val ssc = new StreamingContext(sc,Seconds(10)) 
val lines = ssc.socketTextStream("xxx", 9998) 
val linedstream = lines.map(lineRDD => Vectors.dense(lineRDD.split(" ").map(_.toDouble))) 
val logisModel = LogisticRegressionModel.load(sc, /path/LR_Model") 
linedstream.foreachRDD(rdd => { 
    for(item <- rdd.collect().toArray) { 
    val predictedVal = logisModel.predict(item) 
     println(predictedVal + "|" + item); 
    } 
}) 

수집 이해 여기에 확장 성이 아니라, 당신이 당신의 스트리밍 메시지가 어떤 간격의 수가 적은 것을 생각하면, 이것은 아마도 옵션입니다. 이것이 내가 Spark 1.4.0에서 볼 수있는 것인데, 높은 버전은 아마도이 문제를 해결할 것입니다. 유용 할 경우이 참조하십시오.

Save ML model for future usage

관련 문제