2011-11-02 5 views
7

I 폭 (10)의 고정 스레드 풀 ExecutorService, 100 Callable 's의 목록, 각각 20 초 동안 대기하고 자신의 인터럽트 기록이있다.자바 ExecutorService를 invokeAll을() 중단

나는이 목록을 별도의 스레드에서 호출하고 거의 즉시이 스레드를 인터럽트하려고합니다. ExecutorService 실행이 예상대로 중단되지만 Callable으로 기록 된 실제 인터럽트 수는 10-20-40 정도입니다. ExecutorService이 10 개 이상의 스레드를 동시에 실행할 수없는 경우 그 이유는 무엇입니까?

전체 소스 :

@Test 
public void interrupt3() throws Exception{ 
    int callableNum = 100; 
    int executorThreadNum = 10; 
    final AtomicInteger interruptCounter = new AtomicInteger(0); 
    final ExecutorService executorService = Executors.newFixedThreadPool(executorThreadNum); 
    final List <Callable <Object>> executeds = new ArrayList <Callable <Object>>(); 
    for (int i = 0; i < callableNum; ++i) { 
     executeds.add(new Waiter(interruptCounter)); 
    } 
    Thread watcher = new Thread(new Runnable() { 

     @Override 
     public void run(){ 
      try { 
       executorService.invokeAll(executeds); 
      } catch(InterruptedException ex) { 
       // NOOP 
      } 
     } 
    }); 
    watcher.start(); 
    Thread.sleep(200); 
    watcher.interrupt(); 
    Thread.sleep(200); 
    assertEquals(10, interruptCounter.get()); 
} 

// This class just waits for 20 seconds, recording it's interrupts 
private class Waiter implements Callable <Object> { 
    private AtomicInteger interruptCounter; 

    public Waiter(AtomicInteger interruptCounter){ 
     this.interruptCounter = interruptCounter; 
    } 

    @Override 
    public Object call() throws Exception{ 
     try { 
      Thread.sleep(20000); 
     } catch(InterruptedException ex) { 
      interruptCounter.getAndIncrement(); 
     } 
     return null; 
    } 
} 

사용 WINXP 32 비트 오라클 JRE 1.6.0_27 및 JUnit4

나는 가설에 동의하지
+0

흠 ... 주된 방법으로 프로그램으로 변환하면, 나는 항상 10을 얻고있다. (Windows 7의 Java 7) –

+0

동일한 결과는 37 (1.6.0_27, Windows XP)이다. 테스트 할 Java 7이 없으므로 다른 사람이 확인할 수 있습니까? –

+0

직장에서 해보겠습니다. 어쩌면 자바 6 버그 일 수도 ... –

답변

4

을 (당신으로 ​​인해 동시성에 한 번 더 그 그것을 실행해야 할 수도 있습니다) 10 회의 인터럽트 만 받아 들여야합니다.

Assume the CPU has 1 core. 
1. Main thread starts Watcher and sleeps 
2. Watcher starts and adds 100 Waiters then blocks 
3. Waiter 1-10 start and sleep in sequence 
4. Main wakes and interrupts Watcher then sleeps 
5. Watcher cancels Waiter 1-5 then is yielded by the OS (now we have 5 interrupts) 
6. Waiter 11-13 start and sleep 
7. Watcher cancels Waiter 6-20 then is yielded by the OS (now we have 13 interrupts) 
8. Waiter 14-20 are "started" resulting in a no-op 
9. Waiter 21-24 start and sleep 
.... 

기본적으로 내 인수는 감시자 스레드가 타임 슬라이스를 산출하고, ExecutorService를의 작업자 스레드가 더 시작할 수 있도록하는 전에 100 "웨이터"RunnableFuture 인스턴스를 취소하도록 허용 될 것이라는 보장이 없다는 것입니다 웨이터 작업.

업데이트 :보기 코드 AbstractExecutorService

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
    throws InterruptedException { 
    if (tasks == null) 
     throw new NullPointerException(); 
    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 
    boolean done = false; 
    try { 
     for (Callable<T> t : tasks) { 
      RunnableFuture<T> f = newTaskFor(t); 
      futures.add(f); 
      execute(f); 
     } 
     for (Future<T> f : futures) { 
      if (!f.isDone()) { 
       try { 
        f.get(); //If interrupted, this is where the InterruptedException will be thrown from 
       } catch (CancellationException ignore) { 
       } catch (ExecutionException ignore) { 
       } 
      } 
     } 
     done = true; 
     return futures; 
    } finally { 
     if (!done) 
      for (Future<T> f : futures) 
       f.cancel(true); //Specifying "true" is what allows an interrupt to be sent to the ExecutorService's worker threads 
    } 
} 

에서 마지막으로 인터럽트가 현재 실행중인 작업에 전파 될 때 어떤 f.cancel(true)을 포함 차단합니다. 보시다시피, 이것은 단단한 루프입니다. 그러나 루프를 실행하는 스레드가 한 번에 모든 Future 인스턴스를 반복 할 수 있다고 보장 할 수는 없습니다. 이 블록을 추가 동일한 동작을

ArrayList<Runnable> runnables = new ArrayList<Runnable>(); 
    executorService.getQueue().drainTo(runnables); 

를 달성하려면

+0

그렇다면'invokeAll()'의 중단은 대기중인 작업이 중단되기 전에 대기중인 모든 작업이 즉시 취소됨을 의미하지 않는다고 말하니? 나를 위해, 그것은 최소한의 놀람 원리의 직접적인 중단처럼 보인다. –

+1

수정. 작업을 처리하는 작업자 스레드는'invokeAll()'을 실행하는 스레드와 구별됩니다. 한 스레드에서 인터럽트를 호출해도 다른 스레드가 인터럽트되지는 않는다는 것을 의미하지 않으므로 때로는 10 스레드 이상의 인터럽트를받을 수 있다는 사실에 전혀 놀랐습니다.내가 게시 한 주석 코드에서 언급했듯이 인터럽트는'Future.cancel' 메소드에 전달 된 부울 인수의 미덕으로 작업을 처리하는 작업자 스레드로만 전송됩니다. –

0
PowerMock.mockStatic (Executors.class); 
EasyMock.expect (Executors.newFixedThreadPool (9)).andReturn (executorService); 

Future<MyObject> callableMock = (Future<MyObject>) 
EasyMock.createMock (Future.class); 
EasyMock.expect (callableMock.get (EasyMock.anyLong(), EasyMock.isA (TimeUnit.class))).andReturn (ccs).anyTimes(); 

List<Future<MyObject>> futures = new ArrayList<Future<MyObject>>(); 
futures.add (callableMock); 
EasyMock.expect (executorService.invokeAll (EasyMock.isA (List.class))).andReturn (futures).anyTimes(); 

executorService.shutdown(); 
EasyMock.expectLastCall().anyTimes(); 

EasyMock.expect (mock.getMethodCall ()).andReturn (result).anyTimes(); 

PowerMock.replayAll(); 
EasyMock.replay (callableMock, executorService, mock); 

Assert.assertEquals (" ", answer.get (0)); 
PowerMock.verifyAll(); 
1

전에 스레드를 중단.

대기 대기열을 모두 새 목록으로 배출합니다.

따라서 실행중인 스레드 만 인터럽트합니다.

관련 문제