2012-01-25 1 views
1

어려운 문제가 있습니다.appengine blob을 비동기 적으로 작성하고 모든 작업이 완료 될 때 완료하십시오.

날짜로 매개 변수화 된 URL 집합을 반복하여 가져오고 있습니다.

somewebservice.com?start=01-01-2012 & 끝 = 때때로

2012년 1월 10일는 내용이 URL이 잘립니다에서 반환 (임의 누락 예를 들어, 여기에 하나의 예입니다 너무 큰 범위를 정의했기 때문에 '잘린 오류'메시지가 표시된 결과) 두 URL로 쿼리를 분할해야합니다.

somewebservice.com?start=01-01-2012 & end = 01- 05-2012

somewebservice.com?start=01-06-2012 & end = 01-10-2012

결과가 더 이상 잘리지 않을 때까지이 작업을 반복적으로 수행 한 다음 BLOB에 쓰기 작업을 수행하여 동시 쓰기 작업을 허용합니다.

이러한 URL 가져 오기 호출/blob 쓰기는 각각 별도의 작업 대기열 작업에서 처리됩니다.

문제는 모든 작업이 완료된 시점을 알기위한 계획을 고안해서는 안된다는 것입니다. 샤드 드 카운터를 사용해 보았지만 재귀가 어렵습니다. 누군가 제가 Pipeline API를 사용할 것을 제안 했으므로 Slatkin 대화를 3 번 ​​보았습니다. 그것은 재귀와 함께 작동하는 것처럼 보이지 않습니다 (하지만 나는 여전히 lib를 완전히 이해하지 못함을 인정합니다).

작업 대기열 작업 (및 재귀 적으로 생성되는 자식) 집합이 완료되어서 내 BLOB를 완료하고 그 작업을 수행 할 수있는 시점을 알 수 있습니까?

덕분에, 존

답변

0

모든 권리, 그래서 여기에 내가 한 일이 있습니다. 나는 Mitch의 솔루션을 조금 수정해야했지만, 그는 즉각적인 가치 대신 미래의 가치를 되돌려 줄 충고와 함께 올바른 방향으로 나를 확실히 데려갔습니다.

은 내가 당신을 감사 WAITFOR

List<FutureValue<Void>> dummies = new ArrayList<FutureValue<Void>>(); 
for (Interval in : ins) { 
    dummies.add(futureCall(new DataFetcher(), immediate(file), immediate(in.getStart()), 
     immediate(in.getEnd()))); 
} 

FutureValue<Void> fv = futureCall(new DummyJob(), futureList(dummies)); 

return futureCall(new DataWriter(), immediate(file), waitFor(fv)); 

에서의 Blob 종료 자에 DummyJob의 출력을 제출,

public static class DummyJob extends Job1<Void, List<Void>> { 
     @Override 
     public Value<Void> run(List<Void> dummies) { 
     return null; 
     } 
    } 

그런 다음 재귀의 출력을 취하는 intermidate의 DummyJob을 만들어야했습니다 미치와 닉 !!

2

당신이 Pipelines Getting Started 문서를 읽을 수 있나요? 파이프 라인은 다른 파이프 라인을 만들고 그들을 기다리고, 그래서 당신이 원하는 일을하는 것은 매우 간단 할 수 있습니다 RecursiveCombiningPipeline 단순히 두 개의 하위 파이프 라인의 값에 대한 수신기 역할을

class RecursivePipeline(pipeline.Pipeline): 
    def run(self, param): 
    if some_condition: # Too big to process in one 
     p1 = yield RecursivePipeline(param1) 
     p2 = yield RecursivePipeline(param2) 
     yield RecursiveCombiningPipeline(p1, p2) 

.

+0

나는 확실히 그것에 대해 당신의 말을들을 것입니다. 감사.Java API를 사용하고 있습니다. 그 점을 알았습니까? –

+0

@JohnWheeler 저는 Java Pipeline API에 대해 잘 알고 있지는 않지만 같은 방식으로 작동한다고 생각합니다. –

0

다음은 Java Pipeline을 사용한 예입니다.

package com.example;

import com.google.appengine.tools.pipeline.FutureValue; 
import com.google.appengine.tools.pipeline.Job1; 
import com.google.appengine.tools.pipeline.Job2; 
import com.google.appengine.tools.pipeline.Value; 

public class PipelineRecursionDemo { 

    /** 
    * A Job to count the number of letters in a word 
    * using recursion 
    */ 
    public static class LetterCountJob extends Job1<Integer, String> { 

    public Value<Integer> run(String word) { 
     int length = word.length(); 
     if (length < 2) { 
     return immediate(word.length()); 
     } else { 
     int mid = length/2; 
     FutureValue<Integer> first = futureCall(new LetterCountJob(), 
      immediate(word.substring(0, mid))); 
     FutureValue<Integer> second = futureCall(new LetterCountJob(), 
      immediate(word.substring(mid, length))); 
     return futureCall(new SumJob(), first, second); 
     } 
    } 
    } 

    /** 
    * An immediate Job to add two integers 
    */ 
    public static class SumJob extends Job2<Integer, Integer, Integer> { 

    public Value<Integer> run(Integer x, Integer y) { 
     return immediate(x + y); 
    } 
    } 
} 
+0

고마워. 그것은 내가 생각했던 것보다 조금 나아졌지만, 나의 경우 FinalizationBlobJob이 SumJob을 대체하기 때문에 재귀가 끝날 때 얼룩을 마무리하려고하기 때문에 작동하지 않을 것이다. 그것은 멱등수가되도록 설계 할 수 없습니다 (일단 모든 쓰기가 완료되면 완료 만 할 수 있습니다). 다시 말하면 재귀 문자 계산이 끝날 때 합계를 수행하는 대신 얼룩을 마무리하고 해당 호출 스택을 상상하는 것처럼 가장합니다. 재귀 *는 작동하지만, 제 경우에는 그렇지 않을 것입니다. –

+0

실제로, 멱등자가되는 것은 그것과 아무 상관이 없습니다. BLOB가 이미 완성되지 않았는지 확인하여 짝수 화를 완성 할 수있었습니다. 문제는, SumJob이 각 재귀의 끝에 한 번 실행되는 반면, 맨 끝에 한 번 실행하려면 FinalizeBlob가 필요하다는 것입니다. –

+0

거룩한 암소 나는 그것을 얻었다, 나는 그것을 얻었다 !!! 나는 아래의 해결책을 추가 할 것이다! –

관련 문제