2014-09-08 2 views
6

RDD를 통과 할 때 외부 (차단) 서비스를 호출하여 데이터 집합의 값을 계산해야합니까? 그게 어떻게 성취 될 수 있다고 생각하니?아파치 스파크 작업에서 IO 차단은 어떻게 수행합니까?

발 값 : Future[RDD[Double]] = Future sequence tasks

나는 선물의 목록을 만들려고했지만, RDD 아이디에 이동하지, Future.sequence는 적합하지 않습니다.

누구나 그런 문제가 있다면 궁금합니다. 어떻게 해결하셨습니까? 내가 달성하고자하는 것은 하나의 작업자 노드에서 병렬 처리를 얻는 것입니다. 따라서 외부 서비스를 번 초로 번 호출 할 수 있습니다.

아마도 단일 호스트에 여러 작업 노드가있는 것처럼 스파크에 더 적합한 다른 솔루션이있을 것입니다.

흥미로운 점은 어떻게 그러한 도전에 대처할 수 있습니까? 감사.

+0

어떤 종류의 가치를 계산해야합니까? 오프라인으로 계산하여 데이터 집합에 추가 할 수 있습니까? 또는 원격 코드를 항아리로 가져 와서 프로세스 내에서 계산할 수 있습니까? – DPM

+0

제공된 값과 RDD의 각 항목을 비교하여 값이 계산됩니다. 그래서 나는 RDD를 횡단하여 각 요소를 비교한다. Comarisson은 차단 호출이며, 기본 구성 요소에 숨겨져 있기 때문입니다. 그래서 내가 어떻게하는지, 그리고이 도전이 전혀 없다면 궁금합니다. 도움을 주셔서 감사합니다. –

답변

3

내 자신의 질문에 대한 답변입니다 :

val buckets = sc.textFile(logFile, 100) 
val tasks: RDD[Future[Object]] = buckets map { item => 
    future { 
    // call native code 
    } 
} 

val values = tasks.mapPartitions[Object] { f: Iterator[Future[Object]] => 
    val searchFuture: Future[Iterator[Object]] = Future sequence f 
    Await result (searchFuture, JOB_TIMEOUT) 
} 

여기 아이디어는 우리가 각 파티션은 특정 근로자에 ​​전송 작업의 작은 조각이다 파티션의 컬렉션을 얻을 것이다. 각 작업에는 원시 코드를 호출하고 해당 데이터를 전송하여 처리 할 수있는 데이터가 들어 있습니다.

'values'컬렉션에는 원시 코드에서 반환 된 데이터가 포함되어 있으며 해당 작업은 클러스터 전체에서 수행됩니다.

+0

우리는 지금 같은 문제에 직면 해 있습니다. 작업이 어떻게 사용되는지 예제가 있습니까? 감사, –

+0

spark는 자동으로 미래의 ExecutionContext를 제공합니까? – advait

1

블로킹 호출은 제공된 입력을 RDD의 개별 항목과 비교하는 것이므로 spark 프로세스의 일부로 실행할 수 있도록 java/scala에서 비교를 다시 작성하는 것이 좋습니다. 비교가 "순수한"기능이면 (부작용이없고 입력에만 의존 함) 리모컨을 사용하지 않아도되므로 스파크 프로세스의 복잡성이 감소하고 안정성이 향상되므로 다시 구현하는 것이 간단해야합니다. 전화는 아마도 그만한 가치가있을 것입니다.

원격 서비스가 초당 3000 통화를 처리 할 수 ​​없기 때문에 로컬 in-process 버전이 좋을 것입니다.

그 어떤 이유로 절대적으로 불가능한 경우에, 당신이 의사 코드에서, 미래의 RDD로 데이터를 켤 수있는 RDD 변환을 만들 수 있습니다

val callRemote(data:Data):Future[Double] = ... 

val inputData:RDD[Data] = ... 

val transformed:RDD[Future[Double]] = inputData.map(callRemote) 

그리고 그때부터 수행 당신의 미래 [Double] 객체들을 계산합니다.

원격 프로세스에서 처리 할 수있는 병렬 처리 정도를 알고 있다면 미래 모드를 포기하고 그것이 병목 자원임을 받아 들여야합니다.

val remoteParallelism:Int = 100 // some constant 

val callRemoteBlocking(data:Data):Double = ... 

val inputData:RDD[Data] = ... 

val transformed:RDD[Double] = inputData. 
    coalesce(remoteParallelism). 
    map(callRemoteBlocking) 

당신의 작업은 꽤 시간이 걸릴 것이지만 원격 서비스가 넘쳐서 끔찍한 일이 없어야합니다.

마지막 옵션은 입력 값이 합리적으로 예측 가능하고 결과 범위가 일정하고 합리적인 출력 수 (수백만 개)로 제한되는 경우 원격 서비스를 사용하여 모든 데이터를 미리 계산할 수 있고 조인을 사용하여 스파크 작업 시간에 그들을 찾으십시오. 여기

관련 문제