2017-12-14 8 views
0

나는 두 가지 방법이 문제가 발생하고있어, 처음 예상대로 작동하지 :폐쇄는 scala.concurrent.Future에 대한 루프에서 두 번째로 다음과 같은 <code>Future</code>을 만드는 두 번째를 호출

public class WorkersCoordinator { 
    private static Logger LOGGER = 
     LoggerFactory.getLogger(WorkersCoordinator.class); 

    private final Timeout timeout; 

    private final ActorSystem system; 

    private final List<Class<? extends BaseWorker>> workers; 

    private final Map<Class, ActorRef> refMap; 

    private final WorkResultPackageQueue pkgQueue; 

    private final ActorFactory actorFactory; 

    @Autowired 
    public WorkersCoordinator(final ApplicationConfiguration config, 
          final ActorSystem system, 
          final ActorFactory actorFactory, 
          final WorkerFactory workerFactory, 
          final WorkResultPackageQueue pkgQueue) { 
     timeout = new Timeout(config.getTimeoutInMilliseconds(), 
           TimeUnit.MILLISECONDS); 

     this.system = system; 
     this.actorFactory = actorFactory; 
     this.pkgQueue = pkgQueue; 

     refMap = Map.newHashMap(); 
     workers = Lists.newArrayList(workerFactory.getAllAvailableWorkers()); 
    } 

    public void delegateWorkers() { 
     for (Class<? extends BaseWorker> worker : workers) { 
      if (refMap.containsKey(worker) continue; 
      sendWork(worker); 
     } 
    } 

    private void sendWork(Class<? extends BaseWorker> worker) { 
     // GetDataActor extends AbstractActor 
     ActorRef actorRef = actorFactory.create(GetDataActor.class); 
     Future<Object> responseRef = Patterns.ask(actorRef, worker, timeout); 

     responseRef.onFailure(new OnFailure() { 
      @Override 
      public void onFailure(Throwable failure) throws Throwable { 
       LOGGER.error("Worker {} encountered a problem - cancelling.", 
          worker.getSimpleName()); 
       if (refMap.containsKey(worker)) { 
        refMap.remove(worker); 
       } 
      } 
     }, system.dispatcher()); 

     responseRef.onSuccess(new OnSuccess<Object>() { 
      @Override 
      public void onSuccess(Object msg) throws Throwable { 
       if (msg instanceof WorkResultPackage) { 
        final WorkResultPackage reportPackage = (WorkResultPackage) msg; 
        LOGGER.info(
         "Received AssetDataPackage from {}, containing {} items", 
         reportPackage.getWorkReport().getFromWorker().getSimpleName(), 
         reportPackage.getPkg().getData().size()); 

        pkgQueue.enqueue(reportPackage); 
       } else { 
        LOGGER.eror(
         "Expected to receive WorkResultPackage Object but received: {}", 
         msg.getClass()); 
         throw new UnknownObjectReceived(msg); 
       } 
      } 
     }, system.dispatcher()); 

     refMap.put(worker, actorRef); 
    } 
} 

내가 생각하기에 클로저가 responseRef.onFailure이 내가 기대했던 것처럼 행동하지 않는다는 것이 문제입니다. 내가 3 명의 노동자와 함께 이것을 부르면 그 중 하나는 실패로 처리되지만, 실패한 것으로 표시된 작업자는 일관되게 실패한 것으로보고되는 작업자에 관해서는 로깅이 불확실합니다. 저는이 기술 스택 (Java, Scala Futures 및 AKKA)에 익숙하지 않으며이 설정된 패턴을 찾을 수있는 확립 된 코드 기반을 사용하므로 Java/Scala Futures에서 클로저를 간과하거나 오해하고 있는지 여부를 알 수 없습니다. 여기서 주목해야 할 점은 어떤 작업자가 실패했는지보고하고 더 이상 프로세스에서 고려하지 않도록 refMap에서 제거해야한다는 것입니다. 심지어 낯선 사람은 잘못된 작업자가 실패한 것으로보고 된 동안에도 모든 근로자가 완료되어 refMap에서 제거 된 것처럼 보입니다.

업데이트 : 폐쇄가 작동하지 않는 이유에 대한 답을 얻기에 운이없는, 나는 몇 가지 조사를했고, 자바 (8)도 폐쇄 지원하는지 여부를 응답 또 다른 포스트 발견 후 :

Does Java 8 Support Closures?

짧은 대답을, 나는 그것을 믿는다. 그러나 final 또는 효과적으로 final 변수라고했습니다. 따라서 다음과 같이 코드를 업데이트했습니다. 다행히도 클로저를 이해하는 사람들이 익숙해 져서 왜 작동하지 않는지 이해하는 데 도움이되기를 바랍니다 (C# 및 JavaScript). 나는 아무런 노력을하지 않으려 고 노력한 것을 강조하기 위해서만 sendWork(...)에 대한 업데이트를 게시하고 있습니다.

private void sendWork(Class<? extends BaseWorker> worker) { 
    // GetDataActor extends AbstractActor 
    ActorRef actorRef = actorFactory.create(GetDataActor.class); 
    Future<Object> responseRef = Patterns.ask(actorRef, worker, timeout); 

    Consumer<Throwable> onFailureClosure = (ex) -> { 
      LOGGER.error("Worker {} encountered a problem - cancelling.", 
         worker.getSimpleName()); 
      if (refMap.containsKey(worker)) { 
       refMap.remove(worker); 
      } 
    } 

    responseRef.onFailure(new OnFailure() { 
     @Override 
     public void onFailure(Throwable failure) throws Throwable { 
      onFailureClosure.accept(failure); 
     } 
    }, system.dispatcher()); 

    responseRef.onSuccess(new OnSuccess<Object>() { 
     @Override 
     public void onSuccess(Object msg) throws Throwable { 
      if (msg instanceof WorkResultPackage) { 
       final WorkResultPackage reportPackage = (WorkResultPackage) msg; 
       LOGGER.info(
        "Received AssetDataPackage from {}, containing {} items", 
        reportPackage.getWorkReport().getFromWorker().getSimpleName(), 
        reportPackage.getPkg().getData().size()); 

       pkgQueue.enqueue(reportPackage); 
      } else { 
       LOGGER.eror(
        "Expected to receive WorkResultPackage Object but received: {}", 
        msg.getClass()); 
        throw new UnknownObjectReceived(msg); 
      } 
     } 
    }, system.dispatcher()); 

    refMap.put(worker, actorRef); 
} 
+0

이것은 무엇 [: completable - 미래 태그] 함께 할 수 있는가? – shmosel

+0

아마도 scala.concurrent.Future에 대한 태그가 없었기 때문에 그 주제에서 확실히 호출해야했습니다. 당신이 그것에 대해 강하게 생각한다면 그것을 편집하여 삭제하십시오. –

+0

실험적으로 생각해 보면, 앞서 언급 한 Java 문서를 가리키는 훌륭한 사람들이 어떻게 이해 했는가에도 불구하고 클로저는 Java로 구현되지 않았으며, 적어도 완전히는 구현되지 않았습니다. 오히려 가변적 인 리프팅이 있지만 '최종'요구 사항에도 불구하고 비동기를 지원하지 않습니다. –

답변

0

당신이보고있는 행동에 공헌 할 수있는 코드에 근본적인 문제가 있습니다 : 코드는 데이터에 대한 보호없이 동시 환경에서 데이터를 돌연변이됩니다. 미래 콜백은 언제든지 실행될 수 있으며 잠재적으로 병렬로 실행될 수 있습니다. 같은 데이터를 조사하고 변형시키는 미래의 콜백이 여러 개 있으면 이상한 행동이 발생할 수 있습니다.

변경 가능한 데이터에 대한 동시 액세스를 처리하는 Java의 일반적인 접근 방식은 동기화 및 잠금을 사용하는 것입니다. 다행히 Akka를 사용하고 있기 때문에 더 나은 접근 방식이 있습니다. 기본적으로 리팩토링 자 WorkersCoordinator이 액터가되며 순차 메시지 처리의 액터 동작을 safely handle the mutable state에 사용합니다.

더 간단한 문제를 해결하려면이 경우 ask을 사용하지 말고 tell을 사용하여 액터간에 통신하십시오. 오류를 잡으려고 미래에 여기에 미래가 사용 된 것 같지만 오류 처리에 더 나은 접근 방법은 Akka의 supervisor strategy입니다. 즉, WorkersCoordinator이 액터이고 각 GetDataActor 인스턴스가 WorkersCoordinator의 자식 인 경우 후자는 전자에서 발생하는 오류를 처리하는 방법을 결정할 수 있습니다. 예를 들어 GetDataActor에 예외가 발생하면 코디네이터는 오류를 기록한 다음 하위를 중지 할 수 있습니다., Props 일부 사용자 정의 클래스 것으로 보인다

  • 대신 ActorFactory를 사용 : 위의 코드에 대해

    public class WorkersCoordinator extends AbstractActor { 
        private static Logger LOGGER = ... 
        private final List<Class<? extends BaseWorker>> workers; 
        private final Map<ActorRef, Class> refMap; 
        private final WorkResultPackageQueue pkgQueue; 
    
        public WorkersCoordinator(final WorkerFactory workerFactory, 
              final WorkResultPackageQueue pkgQueue) { 
        this.pkgQueue = pkgQueue; 
        this.refMap = Map.newHashMap(); 
        this.workers = Lists.newArrayList(workerFactory.getAllAvailableWorkers()); 
        } 
    
        static Props props(WorkerFactory factory, WorkResultPackageQueue queue) { 
        return Props.create(WorkersCoordinator.class, factory, queue); 
        } 
    
        static public class Delegate {} 
    
        private static SupervisorStrategy strategy = 
        new OneForOneStrategy(10, Duration.create(1, TimeUnit.MINUTES), DeciderBuilder. 
         matchAny(t -> { 
         ActorRef failedChild = getSender(); 
         LOGGER.error("This child failed: {}", failedChild); 
         refMap.remove(failedChild); 
         stop(); 
         }) 
         .build()); 
    
        @Override 
        public SupervisorStrategy supervisorStrategy() { 
        return strategy; 
        } 
    
        @Override 
        public void preStart() { 
        for (worker : workers) { 
         ActorRef child = getContext().actorOf(Props.create(GetDataActor.class)); 
         refMap.put(child, worker); 
        } 
        } 
    
        @Override 
        public Receive createReceive() { 
        return receiveBuilder() 
         .match(Delegate.class, d -> { 
         refMap.forEach((actor, msg) -> actor.tell(msg, getSelf())); 
         }) 
         .match(WorkResultPackage.class, p -> { 
         LOGGER.info("Received AssetDataPackage from {}, containing {} items", 
            reportPackage.getWorkReport().getFromWorker().getSimpleName(), 
            reportPackage.getPkg().getData().size()); 
    
         pkgQueue.enqueue(p); 
         ActorRef dataActor = getSender(); 
         refMap.remove(dataActor); 
         }) 
         .matchAny(
         msg -> LOGGER.eror("Expected to receive WorkResultPackage Object but received: {}", msg) 
        ) 
         .build(); 
        } 
    } 
    

    일부 노트 :

    다음은 위의 아이디어를 통합하는 대안 WorkersCoordinator입니다 대신 사용됩니다.

  • refMap이 반전되어 이제 ActorRef이 키가되며 작업의 클래스 유형이 값이됩니다. 이를 통해 ActorRef을 기반으로 refMap에서 항목을 제거 할 수 있습니다. 자식 액터의 응답이 성공한 경우와 예외가 발생하는 경우 모두 가능합니다.
  • 간결함을 위해 @Autowired 주석을 삭제했습니다. 액터에 대한 의존성 삽입에 대한 자세한 내용은 here입니다.
  • WorkersCoordinator을 만들고 시작하려면 props 메서드를 호출하십시오. 작업을 시작하기 위해 배우는 사용자 정의 Delegate 메시지를 기대합니다. 액터가이 메시지를 수신하면 액터는 refMap을 반복하고 각 아이에게 해당 아이와 연관된 작업 단위를 보냅니다.
 
WorkerFactory factory = ... 
WorkResultPackageQueue queue = ... 
ActorRef coordinator = actorSystem.actorOf(WorkersCoordinator.props(factory, queue)); 

Delegate doWork = new Delegate(); 
coordinator.tell(doWork, ActorRef.noSender()); 
+0

이것이 Akka 대답 인 것으로 의심되어 감독자 전략을 사용하기 위해 리팩터링해야했습니다. 그러나 현재 아키텍쳐는 ActorRef를 감싸고 있으며 우리가 리팩토링 할 때까지는 지원하지 않을 것입니다. 나는 너의 답을 그 답으로 표시 할 것이다. 그러나, 나는 한 가지를 궁금해. 'refMap'의 변이가 동시성 문제를 갖지 않도록하기 위해서'newHashMap()'대신'Map.newConcurrentMap()'을 사용할 필요가 없으며, 왜 왜'worker'가 연속 내 클로저로 끝나지 않는가? ? 그런 식으로,'onFailure'가 호출되면 정의 된 값을 가지게됩니다. –

관련 문제