Spark Streaming에서 새 메시지가 수신 될 때마다 모델이이 새 메시지를 기반으로 sth를 예측하는 데 사용됩니다. 그러나 시간이 지남에 따라, 모델은 어떤 이유로 변경 될 수 있으므로 새로운 메시지가 온다 때마다 나는 모델을 다시로드 할. 나는이 코드를 실행하면 내 코드가,이[Spark Streaming] 새 메시지가 올 때마다 모델을로드하는 방법은 무엇입니까?
def loadingModel(@transient sc:SparkContext)={
val model=LogisticRegressionModel.load(sc, "/home/zefu/BIA800/LRModel")
model
}
var error=0.0
var size=0.0
implicit def bool2int(b:Boolean) = if (b) 1 else 0
def updateState(batchTime: Time, key: String, value: Option[String], state: State[Array[Double]]): Option[(String, Double,Double)] = {
val model=loadingModel(sc)
val parts = value.getOrElse("0,0,0,0").split(",").map { _.toDouble }
val pairs = LabeledPoint(parts(0), Vectors.dense(parts.tail))
val prediction = model.predict(pairs.features)
val wrong= prediction != pairs.label
error = state.getOption().getOrElse(Array(0.0,0.0))(0) + 1.0*(wrong:Int)
size=state.getOption().getOrElse(Array(0.0,0.0))(1) + 1.0
val output = (key, error,size)
state.update(Array(error,size))
Some(output)
}
val stateSpec = StateSpec.function(updateState _)
.numPartitions(1)
setupLogging()
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = List("test").toSet
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics).mapWithState(stateSpec)
처럼 보인다 이 같은 예외가 될 것입니다.
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
자세한 정보가 필요하면 알려주십시오. 감사합니다.
전체 스택 추적을 게시 할 수 있습니까? 또한지도, 필터 같은 스파크 변환 내에서 직렬화되지 않는 클래스를 사용하려고합니까? – Shankar
@Shankar 안녕하세요,'loadMode'l을 정의하지 않고'updateState' 외부에 모델 ('val model = LogisticRegressionModel.load (sc, "/ home/zefu/BIA800/LRModel")')을로드하면 잘 동작합니다. 나는 문제가'sc'와 함께 오는 것 같아 –
@Shankar와 내가 거기에 더 많은 코드를 추가했다 : P –