다른 데이터가있는 두 개의 파일이 있습니다. 나는 2 diff RDDs &에서 그 (것)들을 읽는 것을 시도하고 그 (것)들을 Dataframe & 하이브로 삽입으로 그 (것)들 변환하십시오. 나는이 정상적인 코드를 할 수있었습니다.하지만 스파크는 한 번에 RDD 계산을 처리했습니다. 그래서 두 번째 사람은 클러스터에 충분한 리소스가 있지만 1 번을 넘기를 기다리고있었습니다. RDD 계산이 비동기 메서드를 사용하여 병렬화 될 수 있다는 것을 배웠습니다. 그래서 foreachPartitionAsync를 시도하고 있습니다. 하지만 디버깅 할 수없는 오류가 발생합니다. 샘플 코드 :foreachPartitionAsync throw가 중지 된 SparkContext에서 메서드를 호출 할 수 없습니다.
object asynccode {
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("Parser")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
import hiveContext.implicits._
val ercs = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file1.txt")
val test = ercs.map { k =>
var rc = method1(k._2, k._1).toSeq
rc
}
.flatMap(identity)
.foreachPartitionAsync { f =>
f.toSeq.toDF()
.write.insertInto("dbname.tablename1")
}
val ercs2 = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file2.txt")
val test2 = ercs2.map { k =>
var rs = method2(k._2, k._1)
rs
}
.flatMap(identity)
.foreachPartitionAsync(f => f.toSeq.toDF()
.write.insertInto("dbname.tablename2")
)
sc.stop()
}
def method1 = ???
def method2 = ???
}
하지만 아래 오류 메시지가 표시됩니다. foreachPartitionAsync를 코드에서 제거하면 올바르게 작동합니다. foreachPartitionAsync와 관련하여 내가 뭘 잘못했는지는 확실치 않습니다.
작업 직렬화에 실패했습니다 : java.lang.IllegalStateException : 중지 된 SparkContext에서 메소드를 호출 할 수 없습니다.
업데이트 : 제안 해 주셔서 감사합니다. 나는 그것을 아래와 같이 업데이트했다. 하지만 지금은 아무 것도하지 않습니다. 스파크 웹 UI를 통해 무대가 트리거되고 있음을 알 수 있습니다 (비어 있음). 내 테이블도 업데이트되지 않습니다. 그러나 작업이 오류없이 완료되었습니다.
val ercs = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file1.txt")
val test = ercs.map { k =>
var rc = method1(k._2, k._1).toSeq
rc
}
.flatMap(identity)
toDF()
val f1 = Future(test.write.insertInto("dbname.tablename1"))
}
val ercs2 = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file2.txt")
val test2 = ercs2.map { k =>
var rs = method2(k._2, k._1)
rs
}
.flatMap(identity)
toSeq.toDF()
val f2 = Future(test2.write.insertInto("dbname.tablename2"))
)
Future.sequence(Seq(f1,f2)).onComplete(_ => sc.stop)
내가 놓친 것은 무엇입니까?