2017-01-06 1 views
1

다른 데이터가있는 두 개의 파일이 있습니다. 나는 2 diff RDDs &에서 그 (것)들을 읽는 것을 시도하고 그 (것)들을 Dataframe & 하이브로 삽입으로 그 (것)들 변환하십시오. 나는이 정상적인 코드를 할 수있었습니다.하지만 스파크는 한 번에 RDD 계산을 처리했습니다. 그래서 두 번째 사람은 클러스터에 충분한 리소스가 있지만 1 번을 넘기를 기다리고있었습니다. RDD 계산이 비동기 메서드를 사용하여 병렬화 될 수 있다는 것을 배웠습니다. 그래서 foreachPartitionAsync를 시도하고 있습니다. 하지만 디버깅 할 수없는 오류가 발생합니다. 샘플 코드 :foreachPartitionAsync throw가 중지 된 SparkContext에서 메서드를 호출 할 수 없습니다.

object asynccode { 
    def main(args: Array[String]) = { 
    val conf = new SparkConf() 
     .setAppName("Parser") 
    val sc = new SparkContext(conf) 
    val hiveContext = new HiveContext(sc) 
    import hiveContext.implicits._ 

    val ercs = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file1.txt") 
    val test = ercs.map { k => 
     var rc = method1(k._2, k._1).toSeq 
     rc 
    } 
     .flatMap(identity) 
     .foreachPartitionAsync { f => 
     f.toSeq.toDF() 
      .write.insertInto("dbname.tablename1") 
     } 

    val ercs2 = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file2.txt") 
    val test2 = ercs2.map { k => 
     var rs = method2(k._2, k._1) 
     rs 
    } 
     .flatMap(identity) 
     .foreachPartitionAsync(f => f.toSeq.toDF() 
     .write.insertInto("dbname.tablename2") 

    ) 
    sc.stop() 
    } 

    def method1 = ??? 
    def method2 = ??? 
} 

하지만 아래 오류 메시지가 표시됩니다. foreachPartitionAsync를 코드에서 제거하면 올바르게 작동합니다. foreachPartitionAsync와 관련하여 내가 뭘 잘못했는지는 확실치 않습니다.

작업 직렬화에 실패했습니다 : java.lang.IllegalStateException : 중지 된 SparkContext에서 메소드를 호출 할 수 없습니다.

업데이트 : 제안 해 주셔서 감사합니다. 나는 그것을 아래와 같이 업데이트했다. 하지만 지금은 아무 것도하지 않습니다. 스파크 웹 UI를 통해 무대가 트리거되고 있음을 알 수 있습니다 (비어 있음). 내 테이블도 업데이트되지 않습니다. 그러나 작업이 오류없이 완료되었습니다.

val ercs = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file1.txt") 
    val test = ercs.map { k => 
     var rc = method1(k._2, k._1).toSeq 
     rc 
    } 
     .flatMap(identity) 
    toDF() 
    val f1 = Future(test.write.insertInto("dbname.tablename1")) 
     } 

    val ercs2 = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file2.txt") 
    val test2 = ercs2.map { k => 
     var rs = method2(k._2, k._1) 
     rs 
    } 
     .flatMap(identity) 
     toSeq.toDF() 

val f2 = Future(test2.write.insertInto("dbname.tablename2")) 

    ) 
     Future.sequence(Seq(f1,f2)).onComplete(_ => sc.stop) 

내가 놓친 것은 무엇입니까?

답변

1

FutureActions이 완료 될 때까지 기다리지 않고 SparkContext을 중지하십시오. 당신은 응답 문맥을 완료하고 정지 작업을 기다려야합니다

import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.Future 
import scala.util.{Success, Failure} 

val f1: Future[Unit] = sc.range(1, 200).foreachAsync(_ => Thread.sleep(10)) 
val f2: Future[Unit] = sc.range(1, 200).foreachAsync(_ => Thread.sleep(10)) 

Future.sequence(Seq(f1, f2)).onComplete { 
    case Success(_) => sc.stop 
    case Failure(e) => 
    e.printStackTrace // or some other appropriate actions 
    sc.stop 
} 

그것은 당신의 코드는 우리가 비동기 작업을 무시하는 경우에도 유효 말했다되고. 비동기 쓰기 작업을 직접 Futures를 사용

.foreachPartitionAsync(
    f => f.toSeq.toDF().write.insertInto("dbname.tablename2") 
) 

당신이 원하는 경우 : 당신은 는 액션이나 변환 내부 데이터 구조를 분산 사용할 수 없습니다

val df1: Dataframe = ??? 
val df2: Dataframe = ??? 

val f1: Future[Unit] = Future(df1.write.insertInto("dbname.tablename1")) 
val f2: Future[Unit] = Future(df2.write.insertInto("dbname.tablename2")) 

및 행동이 s로 완료 될 때까지 기다린다. 위에 hown.

관련 문제