2012-07-03 2 views
0

이벤트를 처리 할 동시 코드를 작성하려고합니다. 이 처리에는 오랜 시간이 걸릴 수 있습니다.Executor/Queue 프로세스가 마지막으로 알려진 작업 만

이벤트가 처리되는 동안 들어오는 이벤트를 기록한 다음 다시 실행하도록 요청할 때 마지막 들어오는 이벤트를 처리해야합니다. (다른 이벤트는 버려 질 수 있습니다.) 이것은 FILO 큐와 조금 비슷하지만 큐에 하나의 요소 만 저장하면됩니다.

필자는 아래에 표시된 내 이벤트 처리 아키텍처에 새로운 Executor를 연결하고 싶습니다.

public class AsyncNode<I, O> extends AbstractNode<I, O> { 
    private static final Logger log = LoggerFactory.getLogger(AsyncNode.class); 
    private Executor executor; 

    public AsyncNode(EventHandler<I, O> handler, Executor executor) { 
     super(handler); 
     this.executor = executor; 
    } 

    @Override 
    public void emit(O output) { 
     if (output != null) { 
      for (EventListener<O> node : children) { 
       node.handle(output); 
      } 
     } 
    } 

    @Override 
    public void handle(final I input) { 

     executor.execute(new Runnable() { 

      @Override 
      public void run() { 
       try{ 
       emit(handler.process(input)); 
       }catch (Exception e){ 
        log.error("Exception occured whilst processing input." ,e); 
        throw e; 
       } 

      } 
     }); 

    } 

}

+0

이 실행자가 정상적으로 제출 된 다른 작업을 처리하도록 하시겠습니까? 아니면이 집행자는 행사를 처리하는 데에만 사용됩니까? – shams

+0

항상 알려진 최신 작업을 실행하고 대기 중이지만 실행되지 않은 중간 작업을 무시해야합니다. 나는 내 솔루션을 제시했지만 아직 테스트하지 않았다. –

+0

그렇다면 실행 프로그램을 실행 한 스레드의 수에 관계없이 실행 프로그램이 한 번에 하나의 작업 만 실행 중임을 의미합니까? – shams

답변

0
public class LatestTaskExecutor implements Executor { 
    private final AtomicReference<Runnable> lastTask =new AtomicReference<>(); 
    private final Executor executor; 

    public LatestTaskExecutor(Executor executor) { 
     super(); 
     this.executor = executor; 
    } 

    @Override 
    public void execute(Runnable command) { 
     lastTask.set(command); 
     executor.execute(new Runnable() { 
      @Override 
      public void run() { 
       Runnable task=lastTask.getAndSet(null); 
       if(task!=null){ 
        task.run(); 
       } 
      } 
     }); 

    } 
} 

@RunWith(MockitoJUnitRunner.class) 
public class LatestTaskExecutorTest { 

    @Mock private Executor executor; 
    private LatestTaskExecutor latestExecutor; 
    @Before 
    public void setup(){ 
     latestExecutor=new LatestTaskExecutor(executor); 
    } 
    @Test 
    public void testRunSingleTask() { 
     Runnable run=mock(Runnable.class); 
     latestExecutor.execute(run); 
     ArgumentCaptor<Runnable> captor=ArgumentCaptor.forClass(Runnable.class); 
     verify(executor).execute(captor.capture()); 
     captor.getValue().run(); 
     verify(run).run(); 
    } 

    @Test 
    public void discardsIntermediateUpdates(){ 
     Runnable run=mock(Runnable.class); 
     Runnable run2=mock(Runnable.class); 
     latestExecutor.execute(run); 
     latestExecutor.execute(run2); 
     ArgumentCaptor<Runnable> captor=ArgumentCaptor.forClass(Runnable.class); 
     verify(executor,times(2)).execute(captor.capture()); 
     for (Runnable runnable:captor.getAllValues()){ 
      runnable.run(); 
     } 
     verify(run2).run(); 
     verifyNoMoreInteractions(run); 
    } 
} 
2

나도하지 않을 것입니다. 내가 처리하고자하는 이벤트에 대한 AtomicReference가 있고 그것을 파괴적인 방식으로 처리하기위한 작업을 추가 할 것입니다. 집행자 키 입력 이벤트를 데 (다른 것들에 사용할 수 있습니다) 집행자 또는 큐

이것은 또한 확장을 정의하지 않고, 무료 때

final AtomicReference<Event> eventRef = 

public void processEvent(Event event) { 
    eventRef.set(event); 
    executor.submit(new Runnable() { 
     public vodi run() { 
      Event e = eventRef.getAndSet(null); 
      if (e == null) return; 
      // process event 
     } 
    } 
} 

이것은 오직 다음 이벤트를 처리 할 원하는 즉, 키의 마지막 이벤트를 처리합니다.

+0

나는이 문제를 해결하기 위해 Executor 구현을 래핑하는 것을 고려 중이므로 어떤 코드도 작성하지 않고도 여러 시나리오에 대해 Executor를 재사용 할 수 있습니다. 다른 이벤트 유형과 다른 처리에 대해이 작업을 상당히 자주 수행해야합니다. –

+0

다른 유형의 경우 AtomicReference 값에 대한 EventType의 맵을 가질 수 있습니다. –

+0

나는이 로직을 프로세싱 로직과 완전히 분리하고자한다고 말하고자했다. –

0

이 답변은 불필요한 작업 제출을 최소화하는 DD에서 수정 된 버전입니다.

atomic reference은 최신 이벤트를 추적하는 데 사용됩니다. 이벤트를 잠재적으로 처리하기 위해 사용자 정의 작업이 대기열에 제출되고, 최신 이벤트를 읽는 작업 만 실제로 진행되고 null에 대한 원자 참조를 지우기 전에 유용한 작업을 수행합니다. 다른 작업을 실행할 기회가 생겨도 처리 할 수있는 이벤트가 없다면 아무 것도하지 않고 조용히 버립니다. 불필요한 작업 제출은 대기열에서 사용 가능한 작업 수를 추적하여 피할 수 있습니다. 대기열에 대기중인 작업이 하나 이상 있으면 이미 대기중인 작업이 대기열에서 제외 될 때 이벤트가 처리되므로 작업 제출을 피할 수 있습니다.

import java.util.concurrent.Executor; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.concurrent.atomic.AtomicReference; 

public class EventExecutorService implements Executor { 

    private final Executor executor; 
    // the field which keeps track of the latest available event to process 
    private final AtomicReference<Runnable> latestEventReference = new AtomicReference<>(); 
    private final AtomicInteger activeTaskCount = new AtomicInteger(0); 

    public EventExecutorService(final Executor executor) { 
     this.executor = executor; 
    } 

    @Override 
    public void execute(final Runnable eventTask) { 
     // update the latest event 
     latestEventReference.set(eventTask); 
     // read count _after_ updating event 
     final int activeTasks = activeTaskCount.get(); 

     if (activeTasks == 0) { 
      // there is definitely no other task to process this event, create a new task 
      final Runnable customTask = new Runnable() { 
       @Override 
       public void run() { 
        // decrement the count for available tasks _before_ reading event 
        activeTaskCount.decrementAndGet(); 
        // find the latest available event to process 
        final Runnable currentTask = latestEventReference.getAndSet(null); 
        if (currentTask != null) { 
         // if such an event exists, process it 
         currentTask.run(); 
        } else { 
         // somebody stole away the latest event. Do nothing. 
        } 
       } 
      }; 
      // increment tasks count _before_ submitting task 
      activeTaskCount.incrementAndGet(); 
      // submit the new task to the queue for processing 
      executor.execute(customTask); 
     } 
    } 
} 
+0

사전 최적화가 유죄입니까? –

+0

@DD. 작업이 장기간 실행되고 잠재적으로 꽤 많은 이벤트가 생략된다는 정보를 기반으로합니다. – shams

관련 문제