2017-05-15 1 views
0

정기적 인 작업을 수행하기 위해 hazelcast ScheduledExecutorService를 사용하려고합니다. 나는 hazelcast 3.8.1을 사용하고 있습니다.hazelcast ScheduledExecutorService가 노드 종료 후 작업을 잃었습니다.

하나의 노드를 시작한 다음 다른 노드를 시작하고 작업이 두 노드간에 분산되어 제대로 실행됩니다.

첫 번째 노드를 종료하면 두 번째 노드는 이전 노드의 이전주기 태스크를 실행하기 시작합니다.

문제는 첫 번째 노드 대신 두 번째 노드를 중지하면 해당 노드의 작업이 첫 번째 노드로 재조정되지 않는다는 것입니다. 이것은 노드가 더 많은 경우에도 발생합니다. 작업을 수신하기 위해 마지막 노드를 종료하면 해당 작업이 손실됩니다.

셧다운

는 항상 Ctrl 키를 함께 이루어집니다 + C

내가 hazelcast 예제에서 몇 가지 예제 코드와 내가 웹에서 찾은 코드의 일부 조각으로, 테스트 응용 프로그램을 만든

. 나는이 응용 프로그램의 두 인스턴스를 시작합니다.

public class MasterMember { 

/** 
* The constant LOG. 
*/ 
final static Logger logger = LoggerFactory.getLogger(MasterMember.class); 

public static void main(String[] args) throws Exception { 

    Config config = new Config(); 
    config.setProperty("hazelcast.logging.type", "slf4j"); 
    config.getScheduledExecutorConfig("scheduler"). 
    setPoolSize(16).setCapacity(100).setDurability(1); 

    final HazelcastInstance instance = Hazelcast.newHazelcastInstance(config); 

    Runtime.getRuntime().addShutdownHook(new Thread() { 

     HazelcastInstance threadInstance = instance; 

     @Override 
     public void run() { 
      logger.info("Application shutdown"); 

      for (int i = 0; i < 12; i++) { 
       logger.info("Verifying whether it is safe to close this instance"); 
       boolean isSafe = getResultsForAllInstances(hzi -> { 
        if (hzi.getLifecycleService().isRunning()) { 
         return hzi.getPartitionService().forceLocalMemberToBeSafe(10, TimeUnit.SECONDS); 
        } 
        return true; 
       }); 

       if (isSafe) { 
        logger.info("Verifying whether cluster is safe."); 
        isSafe = getResultsForAllInstances(hzi -> { 
         if (hzi.getLifecycleService().isRunning()) { 
          return hzi.getPartitionService().isClusterSafe(); 
         } 
         return true; 
        }); 

        if (isSafe) { 
         System.out.println("is safe."); 
         break; 
        } 
       } 

       try { 
        Thread.sleep(1000); 
       } catch (InterruptedException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 
      } 

      threadInstance.shutdown(); 

     } 

     private boolean getResultsForAllInstances(
       Function<HazelcastInstance, Boolean> hazelcastInstanceBooleanFunction) { 

      return Hazelcast.getAllHazelcastInstances().stream().map(hazelcastInstanceBooleanFunction).reduce(true, 
        (old, next) -> old && next); 
     } 
    }); 

    new Thread(() -> { 

     try { 
      Thread.sleep(10000); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

     IScheduledExecutorService scheduler = instance.getScheduledExecutorService("scheduler"); 
     scheduler.scheduleAtFixedRate(named("1", new EchoTask("1")), 5, 10, TimeUnit.SECONDS); 
     scheduler.scheduleAtFixedRate(named("2", new EchoTask("2")), 5, 10, TimeUnit.SECONDS); 
     scheduler.scheduleAtFixedRate(named("3", new EchoTask("3")), 5, 10, TimeUnit.SECONDS); 
     scheduler.scheduleAtFixedRate(named("4", new EchoTask("4")), 5, 10, TimeUnit.SECONDS); 
     scheduler.scheduleAtFixedRate(named("5", new EchoTask("5")), 5, 10, TimeUnit.SECONDS); 
     scheduler.scheduleAtFixedRate(named("6", new EchoTask("6")), 5, 10, TimeUnit.SECONDS); 
    }).start(); 

    new Thread(() -> { 

     try { 
      // delays init 
      Thread.sleep(20000); 

      while (true) { 

       IScheduledExecutorService scheduler = instance.getScheduledExecutorService("scheduler"); 
       final Map<Member, List<IScheduledFuture<Object>>> allScheduledFutures = 
         scheduler.getAllScheduledFutures(); 

       // check if the subscription already exists as a task, if so, stop it 
       for (final List<IScheduledFuture<Object>> entry : allScheduledFutures.values()) { 
        for (final IScheduledFuture<Object> objectIScheduledFuture : entry) { 
         logger.info(
           "TaskStats: name {} isDone() {} isCanceled() {} total runs {} delay (sec) {} other statistics {} ", 
           objectIScheduledFuture.getHandler().getTaskName(), objectIScheduledFuture.isDone(), 
           objectIScheduledFuture.isCancelled(), 
           objectIScheduledFuture.getStats().getTotalRuns(), 
           objectIScheduledFuture.getDelay(TimeUnit.SECONDS), 
           objectIScheduledFuture.getStats()); 
        } 
       } 

       Thread.sleep(15000); 

      } 

     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

    }).start(); 

    while (true) { 
     Thread.sleep(1000); 
    } 
    // Hazelcast.shutdownAll(); 
} 
} 

그리고 작업 내가 뭔가를 잘못하고 있어요

public class EchoTask implements Runnable, Serializable { 

/** 
* serialVersionUID 
*/ 
private static final long serialVersionUID = 5505122140975508363L; 

final Logger logger = LoggerFactory.getLogger(EchoTask.class); 

private final String msg; 

public EchoTask(String msg) { 
    this.msg = msg; 
} 

@Override 
public void run() { 
    logger.info("--> " + msg); 
} 
} 

? 미리


감사 - 수정 -

코드 대신 System.out에의 기록 사용하도록 수정 (이상 갱신). 작업 통계의 로깅 및 Config 객체의 수정 된 사용을 추가했습니다.

로그는 :

Node1_log

Node2_log

내가 모든 작업이 두 번째를 시작하기 전에 첫 번째 노드에서 실행 될 때까지 기다리는 것이 언급하는 것을 잊었다.

답변

0

브루노, 이것을보고 해 주셔서 감사합니다. 정말 버그입니다. 불행하게도 여러 개의 노드를 사용하는 것은 그다지 명백하지 않았습니다. 당신이 대답으로 생각한대로, 그 일을 잃지 않고, 오히려 이주 후에 취소 된 상태로 유지하십시오. 그러나 태스크가 취소되고 동시에 미래가 null 일 수 있으므로 수정이 안전하지 않습니다. 예 : 마스터 복제본을 취소하면 미래가 없었던 백업이 결과를 얻습니다. 수정 사항은 사용자가 수행 한 작업과 매우 유사하므로 prepareForReplication()에있는 경우 migrationMode에서 결과를 설정하지 마십시오. 나는 곧 그 문제를 해결할 것이며, 단지 몇 가지 테스트를 더 실시 할 것이다. 이 기능은 마스터 및 이후 버전에서 사용할 수 있습니다.

발견 한 내용에 문제가 기록되었지만 괜찮 으면 https://github.com/hazelcast/hazelcast/issues/10603 상태를 추적 할 수 있습니다.

+0

고맙습니다. 3.9가 나왔을 때이 수정본을 사용할 수 있습니까? 출시 예정일이 있습니까? –

+0

이것을 3.8.3으로 역 이식하지만 어느 릴리스에 대해서도 날짜가 없습니다. 그들은 매우 빨리 예상됩니다. –

0

hazelcast 프로젝트 (3.8.1 소스 코드 사용)의 ScheduledExecutorContainer 클래스 인 promoteStash() 메소드를 변경하여이 문제를 신속하게 해결할 수있었습니다. 기본적으로 데이터 이전 마이그레이션에서 작업이 취소 된 경우에 대한 조건을 추가했습니다. 나는이 변화의 가능한 부작용이 아니며, 이것이 최선의 방법이라면!

void promoteStash() { 
    for (ScheduledTaskDescriptor descriptor : tasks.values()) { 
     try { 
      if (logger.isFinestEnabled()) { 
       logger.finest("[Partition: " + partitionId + "] " + "Attempt to promote stashed " + descriptor); 
      } 

      if (descriptor.shouldSchedule()) { 
       doSchedule(descriptor); 
      } else if (descriptor.getTaskResult() != null && descriptor.getTaskResult().isCancelled() 
        && descriptor.getScheduledFuture() == null) { 
       // tasks that were already present in this node, once they get sent back to this node, since they 
       // have been cancelled when migrating the task to other node, are not rescheduled... 
       logger.fine("[Partition: " + partitionId + "] " + "Attempt to promote stashed canceled task " 
         + descriptor); 

       descriptor.setTaskResult(null); 
       doSchedule(descriptor); 
      } 

      descriptor.setTaskOwner(true); 
     } catch (Exception e) { 
      throw rethrow(e); 
     } 
    } 
} 
관련 문제