2017-03-10 4 views
1

BlockingQueueRunnable입니다. TaskExecutor 구현 중 하나를 사용하여 모든 작업을 간단하게 실행할 수 있으며 모두 병렬로 실행됩니다. 그러나 일부 Runnable은 다른 사람에 따라 다르므로 Runnable이 완료 될 때까지 기다려야한다는 것을 의미하므로 실행될 수 있습니다.Spring TaskExecutor 프레임 워크에서 스레드를 관리하는 방법

규칙은 매우 간단합니다 : 모든 Runnable 코드가 있습니다. 동일한 코드를 가진 두 개의 Runnable은 동시에 실행할 수 없지만 코드가 다른 경우 병렬로 실행해야합니다. 다른 말로하면, 실행중인 Runnable은 다른 코드를 가질 필요가 있습니다. 모든 "duplicates"는 기다려야합니다.

문제는 스레드가 종료 될 때 무엇이든지/아무 이벤트/방법이 없습니다 것입니다. 내가 할 수 모든 Runnable에 같은 통지를 내장,하지만 그냥 스레드 종료하기 전에 수행 할 수 있기 때문에이

java.util.concurrent.ThreadPoolExecutor 종료 방법 afterExecute을 가지고있어하지 후에 나는이 방법을 좋아하지 않는다, 그러나 그것은 구현 될 필요가 - 봄 기본 구현 만 사용하고이 메서드는 무시됩니다.

Runnable이 이미 실행 중이므로 (구현시이 정보에 대한 액세스 권한이 부여되지 않음) 2 개의 추가 모음을 추적해야하기 때문에 복잡해집니다. 코드가 중복되어 연기가 지연됩니다. 더 폴링이 없기 때문에

나는 새로운 무언가가 큐에있을 때 스레드가 단순히 활성화의 BlockingQueue 접근 방식을 좋아한다. 하지만 Runnable 사이의 종속성을 관리하는 더 좋은 방법이있을 수 있으므로 BlockingQueue을 포기하고 다른 전략을 사용해야합니까? 떠오르는

답변

0

한 대체 전략은 가능한 각 코드에 대한 별도의 단일 스레드의 집행을하는 것입니다. 그런 다음 새로운 Runnable을 제출하려면 올바른 실행 프로그램을 찾아 코드에 사용하고 작업을 제출하십시오.

이 할 수있다, 또는 당신이 얼마나 많은 서로 다른 코드에 따라 좋은 해결책이 될 수 없습니다. 고려해야 할 주요 사항은 실행중인 동시 스레드 수가 사용자가 보유한 다른 코드 수만큼 높을 수 있다는 것입니다. 여러 다른 코드가있는 경우 문제가 될 수 있습니다.

은 물론, 당신은 동시에 실행중인 작업의 수를 제한 할 Semaphore을 사용할 수 있습니다; 코드 당 하나의 쓰레드를 만들지 만 제한된 수만 실제로 동시에 실행할 수 있습니다. 예를 들어,이 동시에 실행하는 세 가지 코드까지 허용 코드에 의해 작업에는 직렬화 것 :

public class MultiPoolExecutor { 
    private final Semaphore semaphore = new Semaphore(3); 

    private final ConcurrentMap<String, ExecutorService> serviceMap 
      = new ConcurrentHashMap<>(); 

    public void submit(String code, Runnable job) { 
     ExecutorService executorService = serviceMap.computeIfAbsent(
       code, (k) -> Executors.newSingleThreadExecutor()); 
     executorService.submit(() -> { 
      semaphore.acquireUninterruptibly(); 
      try { 
       job.run(); 
      } finally { 
       semaphore.release(); 
      } 
     }); 
    } 
} 

또 다른 방법은 잠금을 해제하고 완료시 실행할 수있는 작업을 확인하기 위해 Runnable을 수정하는 것입니다 (따라서 폴링을 피하십시오.) -이 예제와 같이, 모든 작업을 제출할 수있을 때까지 목록에 보관합니다. 부울 래치는 각 코드에 대해 한 번에 하나의 작업 만 스레드 풀에 제출하도록합니다. 새로운 작업이 도착하거나 실행중인 코드가 완료 될 때마다 코드는 제출할 수있는 새로운 작업을 다시 확인합니다 (CodedRunnable은 단순히 코드 등록 정보를 갖는 Runnable의 확장입니다).

public class SubmissionService { 
    private final ExecutorService executorService = Executors.newFixedThreadPool(5); 
    private final ConcurrentMap<String, AtomicBoolean> locks = new ConcurrentHashMap<>(); 
    private final List<CodedRunnable> jobs = new ArrayList<>(); 

    public void submit(CodedRunnable codedRunnable) { 
     synchronized (jobs) { 
      jobs.add(codedRunnable); 
     } 
     submitWaitingJobs(); 
    } 

    private void submitWaitingJobs() { 
     synchronized (jobs) { 
      for(Iterator<CodedRunnable> iter = jobs.iterator(); iter.hasNext();) { 
       CodedRunnable nextJob = iter.next(); 
       AtomicBoolean latch = locks.computeIfAbsent(
         nextJob.getCode(), (k) -> new AtomicBoolean(false)); 
       if(latch.compareAndSet(false, true)) { 
        iter.remove(); 
        executorService.submit(() -> { 
         try { 
          nextJob.run(); 
         } finally { 
          latch.set(false); 
          submitWaitingJobs(); 
         } 
        }); 
       } 
      } 
     } 
    } 
} 

이 접근 방식의 단점은 각 작업이 완료된 후 코드가 대기중인 작업 목록 전체를 스캔해야한다는 것입니다.물론이 작업을보다 효율적으로 수행 할 수 있습니다. 완료 작업은 실제로 동일한 코드로 다른 작업을 확인하기 만하면되기 때문에 빠른 처리를 위해 작업을 Map<String, List<Runnable>> 구조에 저장할 수 있습니다.

+0

나는이 첫 번째 접근법을 좋아하지만, 필요하지 않을 때 ExecutorService가 파괴되면 더 좋을 것입니다. – Marx

1

다른 코드 수가 많지 않으면 BarrySW19에서 제공되는 가능한 코드마다 별도의 단일 스레드 실행 프로그램을 사용하는 것이 좋습니다. 스레드의 전체 수는 용납 될 경우 대신 단일 스레드 집행자의, 다음, 우리는 (Akka에서 또는 다른 유사한 라이브러리) 배우 사용할 수 있습니다 WorkerActor 초 동안, 원래의 솔루션으로 ActorRef

public class WorkerActor extends UntypedActor { 
    public void onReceive(Object message) { 
    if (message instanceof Runnable) { 
     Runnable work = (Runnable) message; 
     work.run(); 
    } else { 
     // report an error 
    } 
    } 
} 

을 HashMap에 수집됩니다. 주어진 코드에 해당하는 ActorRef workerActorRef이 획득 (검색 또는 생성)되면 Runnable jobworkerActorRef.tell(job)과 함께 실행에 제출됩니다. 당신이 배우 라이브러리에 대한 종속성이하지 않으려는 경우

, 당신은 처음부터 WorkerActor을 프로그램 할 수 있습니다 : 주어진 코드에 해당하는 WorkerActor worker 취득 (취득 또는 생성)되어

public class WorkerActor implements Runnable, Executor { 
    Executor executor=ForkJoinPool.commonPool(); // or can by assigned in constructor 
    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueu<>(); 
    boolean running = false; 

    public synchronized void execute(Runnable job) { 
    queue.put(job); 
    if (!running) { 
     executor.execute(this); // execute this worker, not job! 
     running=true; 
    } 

    public void run() { 
    for (;;) { 
     Runnable work=null; 
     synchronized (this) { 
     work = queue.poll(); 
     if (work==null) { 
      running = false; 
      return; 
     } 
     } 
     work.run(); 
    } 
    } 
} 

Runnable jobworker.execute(job)과 함께 실행됩니다.

+0

동일한 코드를 연속으로 여러 작업을 제출 한 경우 가장 일반적인 풀 스레드가 첫 번째 스레드가 빠져 나오기를 기다리는 루프 일 뿐이며 나중에 제출 된 작업은 다른 코드가 차단됩니까? – BarrySW19

+0

@ BarrySW19 각 사용자의 작업은 해당 코드로 WorkerActor에 대기하므로 풀 스레드를 차지하지 않습니다. 액터 자체는 최대 하나의 스레드에서만 사용할 수 있습니다. 따라서 처음에 동일한 코드를 가진 많은 작업이 제출되면 풀 스레드에서만 WorkerActor로 점차 점유되어 작업을 순차적으로 처리합니다. –

+0

아, 알겠습니다. 작업이 제출 될 때마다 큐가 비워 질 때까지 Actor가 반복됩니다. 작업하는 동안 제출 된 더 많은 작업은 분명히 해당 프로세스의 일부로 선택됩니다. 이렇게하면 두 번째 제안에서 다음 작업을 선택하는 데 사용되는 덜 효율적인 검색을 피할 수 있습니다. – BarrySW19

관련 문제