저는 Spark 내의 각 작업자에 대해 시스템 프로세스를 실행하고자하는 상황이 있습니다. 이 프로세스를 한 번씩 각 시스템에서 실행하고 싶습니다. 특히이 프로세스는 나머지 프로그램을 실행하기 전에 실행해야하는 데몬을 시작합니다. 이상 적으로는 데이터를 읽어야합니다.Apache Spark 내의 모든 작업자에게 명령을 실행할 수 있습니까?
저는 Spark 2.0.2에서 동적 할당을 사용하고 있습니다.
저는 Spark 내의 각 작업자에 대해 시스템 프로세스를 실행하고자하는 상황이 있습니다. 이 프로세스를 한 번씩 각 시스템에서 실행하고 싶습니다. 특히이 프로세스는 나머지 프로그램을 실행하기 전에 실행해야하는 데몬을 시작합니다. 이상 적으로는 데이터를 읽어야합니다.Apache Spark 내의 모든 작업자에게 명령을 실행할 수 있습니까?
저는 Spark 2.0.2에서 동적 할당을 사용하고 있습니다.
당신은 지연 발사와 스파크 방송의 조합으로 이것을 달성 할 수 있습니다. 그것은 아래처럼 될 것입니다. 당신이 어떤 변형을하기 전에
object ProcessManager {
lazy val start = // start your process here.
}
당신은 응용 프로그램의 시작이 객체를 방송 할 수있다 (코드 아래에 컴파일하지 않았, 당신은 몇 가지를 변경해야 할 수도 있습니다).
val pm = sc.broadcast(ProcessManager)
이제 다른 브로드 캐스트 변수와 마찬가지로 레이지 val을 호출하는 것처럼 변형 내에서이 객체에 액세스 할 수 있습니다. 의
rdd.mapPartition(itr => {
pm.value.start
// Other stuff here.
}
이 작업은 파티션 당 한 번만 수행되며 작업자 당 한 번만 실행되지 않습니까? – Jon
당신이 옳습니다, 그것은 단지 예일뿐입니다. 그러나 그것은 느린 가치이며 ProcessManager는 "객체"이기 때문에 실행 프로그램에서 한 번만 실행됩니다. – Jegan
해당 개체를 브로드 캐스트하는 것은 약간 이상합니다. 코드가 아닌 데이터를 브로드 캐스트해야합니다. 객체를 가지고 시작 변수에 액세스하는 것만으로 충분합니다. 그렇게하면 직렬화 할 수있는 ProcessManager 객체가 필요하지 않습니다. – Atreys
시스템 프로세스를 호출하는 정적 초기화가있는 object
은 트릭을 수행해야합니다.
object SparkStandIn extends App {
object invokeSystemProcess {
import sys.process._
val errorCode = "echo Whatever you put in this object should be executed once per jvm".!
def doIt(): Unit = {
// this object will construct once per jvm, but objects are lazy in
// another way to make sure instantiation happens is to check that the errorCode does not represent an error
}
}
invokeSystemProcess.doIt()
invokeSystemProcess.doIt() // even if doIt is invoked multiple times, the static initialization happens once
}
하지만 모든 변환에서 호출을 반복하지 않고 실제로 초기화되도록하려면 어떻게해야합니까? –
중복 : http://stackoverflow.com/questions/37343437/how-to-run-a-function-on-all-spark-workers-before-processing-data-in-pyspark –