2016-11-29 1 views
3

저는 Spark 내의 각 작업자에 대해 시스템 프로세스를 실행하고자하는 상황이 있습니다. 이 프로세스를 한 번씩 각 시스템에서 실행하고 싶습니다. 특히이 프로세스는 나머지 프로그램을 실행하기 전에 실행해야하는 데몬을 시작합니다. 이상 적으로는 데이터를 읽어야합니다.Apache Spark 내의 모든 작업자에게 명령을 실행할 수 있습니까?

저는 Spark 2.0.2에서 동적 할당을 사용하고 있습니다.

+0

중복 : http://stackoverflow.com/questions/37343437/how-to-run-a-function-on-all-spark-workers-before-processing-data-in-pyspark –

답변

5

당신은 지연 발사와 스파크 방송의 조합으로 이것을 달성 할 수 있습니다. 그것은 아래처럼 될 것입니다. 당신이 어떤 변형을하기 전에

object ProcessManager { 
    lazy val start = // start your process here. 
} 

당신은 응용 프로그램의 시작이 객체를 방송 할 수있다 (코드 아래에 컴파일하지 않았, 당신은 몇 가지를 변경해야 할 수도 있습니다).

val pm = sc.broadcast(ProcessManager) 

이제 다른 브로드 캐스트 변수와 마찬가지로 레이지 val을 호출하는 것처럼 변형 내에서이 객체에 액세스 할 수 있습니다. 의

rdd.mapPartition(itr => { 
    pm.value.start 
    // Other stuff here. 
} 
+0

이 작업은 파티션 당 한 번만 수행되며 작업자 당 한 번만 실행되지 않습니까? – Jon

+0

당신이 옳습니다, 그것은 단지 예일뿐입니다. 그러나 그것은 느린 가치이며 ProcessManager는 "객체"이기 때문에 실행 프로그램에서 한 번만 실행됩니다. – Jegan

+0

해당 개체를 브로드 캐스트하는 것은 약간 이상합니다. 코드가 아닌 데이터를 브로드 캐스트해야합니다. 객체를 가지고 시작 변수에 액세스하는 것만으로 충분합니다. 그렇게하면 직렬화 할 수있는 ProcessManager 객체가 필요하지 않습니다. – Atreys

2

시스템 프로세스를 호출하는 정적 초기화가있는 object은 트릭을 수행해야합니다.

object SparkStandIn extends App { 
    object invokeSystemProcess { 
    import sys.process._ 
    val errorCode = "echo Whatever you put in this object should be executed once per jvm".! 

    def doIt(): Unit = { 
     // this object will construct once per jvm, but objects are lazy in 
     // another way to make sure instantiation happens is to check that the errorCode does not represent an error 
    } 
    } 
    invokeSystemProcess.doIt() 
    invokeSystemProcess.doIt() // even if doIt is invoked multiple times, the static initialization happens once 
} 
+0

하지만 모든 변환에서 호출을 반복하지 않고 실제로 초기화되도록하려면 어떻게해야합니까? –

관련 문제