2012-07-25 6 views
2

작업 대기열과 작업 대기열을 몇 초 내에 한 번씩 볼 수있는 스레드가 있습니다.알 수없는 비동기 작업 대기

다른 코드 섹션 (물론 다른 스레드에서)이 루프에서 작업을 생성하고 (루프 외부에서 미리 작업 수를 알 수 없음) 큐에 삽입합니다. 작업에는 '결과'개체가 포함되어 있으며 외부 스레드 (해당 작업을 만든)는 모든 작업이 완료 될 때까지 기다렸다가 마지막으로 결과를 가져와야합니다. 문제는 Java Semaphore \ CountDownLatch 등을 결과 객체에 전달할 수 없다는 것입니다. 왜냐하면 미리 모니터의 수를 모르기 때문입니다. 또한 invokeAll을 사용하는 Executor를 사용할 수 없으며 작업이 동기화되지 않았기 때문에 Future 객체를 기다릴 수 없습니다. 외부 스레드는 작업을 대기열에 넣고 다른 스레드는 작업을 실행할 때 대기열을 실행합니다.

내가 생각한 유일한 해결책은 결과 집합과 모니터 카운터를 포함하는 '반전 된 세마포어'클래스를 만드는 것입니다. 답이 yes이면 어떤 로크 대상에게 통지한다 대향 == 0 인 경우 상기 getResult를 기능 확인 것이며, getResult를 기능이 잠금 기다리는 각 addTask 방법 semaphore.acquire를 호출

public class InvertedSemaphore<T> { 
    Set<T> resultSet; 
    int usages; 
    final Object c; 

    public InvertedSemaphore() { 
     resultSet = Collections.synchronizedSet(new HashSet<T>()); 
     usages = 0; 
     c = new Object(); 
    } 

    public void addResult(T result) { 
     resultSet.add(result); 
    } 

    public void addResults(Set<T> result) { 
     resultSet.addAll(result); 
    } 


    public void acquire() { 
     usages++; 
    } 

    public void release() { 
     synchronized (c) { 
      if (--usages == 0) { 
       c.notify(); 
      } 
     } 
    } 

    public Set<T> getResults() { 
     synchronized (c) { 
      try { 
       while (usages > 0) { 
        c.wait(); 
       } 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
     return resultSet; 
    } 

} 

및 각 (동기화되지 않은) 작업은 작업이 끝나면 semaphore.release를 호출합니다.

꽤 복잡해 보입니다. 자바 동시 라이브러리 나 다른 라이브러리에서 더 나은 해결책이 있다는 것을 확신합니다.

어떤 생각이 appriciated됩니다 :)

+0

'CountUpLatch'가 필요한 것 같습니다.:) – corsiKa

답변

3

작업이 순서대로 처리 할 필요가없는 경우, 더 일반적으로 ExecutorCompletionService

를 사용, 순서에 ExecutorServiceinvokeAll를 사용할 필요가 없습니다 결과에 Future이 표시됩니다. 이 목적으로 ExecutorService#submit을 사용할 수도 있고, 선택적으로 만들려는 작업이 Future을 구현할 수 있으므로 나중에 작성자가 결과를 요청할 수 있습니다.

일부 코드 :

class MyTask { 
    AtomicReference<?> result = new AtomicReference<?>(); 

    void run() { 
     //do stuff here 
     result.set(/* the result of the calculation */); 
    } 

    boolean resultReady() { 
     return result.get()!=null; 
    } 

    ? get() { 
     return result.get(); 
    } 
} 

... 다른 코드에서

void createTasks() { 
    Collection<MyTask> c = new ...; 

    while(indeterminable condition) { 
     MyTask task = new MyTask(); 
     c.add(task); 
     mysteryQueue.add(task); 
    } 

    while(haven't received all results) { 
     MyTask task = c.get(...); //or iterate or whatever 
     ? result = task.get(); 
     if (result!=null) { 
      //do stuff, probably remove the task from the collection c would be smart 
     } 
    } 
} 
+0

하지만 Executors의 문제는 executer가 수행 할 '작업'이 실제 작업을 일부 대기열에 넣는 것입니다. 이러한 작업은 다른 스레드에서 우연히 실행됩니다. '실제'작업 결과가 나올 때까지 기다려야 결과가 이미 있으면 잠시 후에 확인할 수 있지만 결과가 나오는지조차 알지 못합니다. 모든 비동기 작업이 완료된 시점을 알고 나서야 결과를 얻으려고합니다. – axelrod

+0

그건 그냥 바보예요. 하지만 당신이'Future'를 직접 구현하거나 "실제 작업"의 인터페이스를 통해 결과를 얻는 어떤 방법을 제공하는 것을 막을 수는 없습니다. –

1

하나 개의 아이디어는 결과에 대해 별도의 대기열을 사용하는 것입니다.
따라서 스레드 A이 스레드 B에 대한 작업을 할당하고 이로 인해 생산자 - 소비자 접근 방식을 갖는 하나의 차단 대기열을 가지며 각 작업이 완료되면 두 번째 결과 대기열에 소비자 생산자 역할을 뒤집어 놓을 수 있습니다 원래 작업을 생성 한 스레드 A은 두 번째 큐의 결과를 소비하기 때문입니다.

+0

그래,하지만 결과에 대해 몇 가지 작업을해야하고 알지 못합니다. 결과 큐가 '가득 참'으로 간주 될 때 – axelrod

+0

무슨 뜻인지 확실하지 않습니다. 스레드'A'는 스레드'B'가 소비자 인 큐에 얼마나 많은 작업을 삽입했는지 알고 있습니다. 이제 결과 대기열에 작업을 소비하는 한 'A'를 스레드합니다. 작업이없고 아직 모든 결과를 소비하지 않은 경우 (첫 번째 대기열에 얼마나 많이 놓여 있는지 알고 있으므로'A'는 소모량을 알고 있습니다) 결과를 기다리거나 다른 처리를 수행합니다 – Cratylus

+0

@axelrod, 그래, 당신은 당신이 반대 세마포어에 대한 당신의 아이디어에서 카운터를 체크하는 것과 같은 방법으로 할 수있다. 무언가는 초기 값을 거기에두고 무언가는 그것을 감소시킵니다. 이것은 일정한 수의 작업의 완료를 기반으로 다른 실행을 계속할 수 있음을 입증하는 방법을 알고 있음을 의미합니다. – Vitaliy

1

다음을 수행 할 수 있습니다. 각 제작자는 자체 큐를 보유합니다. 생산자는이 대기열에 작업 자체를보고하는 수단을 전달합니다. 작업 실행이 끝나면 결과는이 대기열에 대기합니다.

class Result{} 

interface IResultCallback{ 
    void resultReady(Result r); // this is an abstraction of the queue 
} 

class Producer implements IResultCallback{ 
    // the producer needs to pass itself to the constructor of the task, 
    // the task will only see its "resultReady" facade and will be able to report to it. 
    // the producer can aggragte the results at it will and execute its own computation as 
     // as soon it is ready 

    Queue<Result> results; // = init queue 

    @Override 
    public void resultReady(Result r) { 
     results.add(r); 

     if(results.size() == 9){ 
      operate(); 
     } 
     results.clear(); 
    } 

    public void operate(){ 
     // bla bla 
    } 
} 

public class Task { 
    IResultCallback callback; 

    public Task(IResultCallback callback){ 
     this.callback = callback; 
    } 
    public void execute(){ 
     // bla bla 

     Result r = null; // init result; 
     callback.resultReady(r); 
    } 
} 
+0

나는이 접근 방법을 시도 할 것이다. thx : – axelrod

+0

@axelrod 행운을 빌어 요! StackOverflow의 좋은 거주자이며 투표/대답을 수락하는 것을 잊지 마세요 :-) – Vitaliy

+0

@axelrod 다음에 오신 것을 환영합니다! 그것은 * 좋은 질문입니다! – Vitaliy