정기적 인 작업을 수행하기 위해 hazelcast ScheduledExecutorService를 사용하려고합니다. 나는 hazelcast 3.8.1을 사용하고 있습니다.hazelcast ScheduledExecutorService가 노드 종료 후 작업을 잃었습니다.
하나의 노드를 시작한 다음 다른 노드를 시작하고 작업이 두 노드간에 분산되어 제대로 실행됩니다.
첫 번째 노드를 종료하면 두 번째 노드는 이전 노드의 이전주기 태스크를 실행하기 시작합니다.
문제는 첫 번째 노드 대신 두 번째 노드를 중지하면 해당 노드의 작업이 첫 번째 노드로 재조정되지 않는다는 것입니다. 이것은 노드가 더 많은 경우에도 발생합니다. 작업을 수신하기 위해 마지막 노드를 종료하면 해당 작업이 손실됩니다.
셧다운
는 항상 Ctrl 키를 함께 이루어집니다 + C 내가 hazelcast 예제에서 몇 가지 예제 코드와 내가 웹에서 찾은 코드의 일부 조각으로, 테스트 응용 프로그램을 만든. 나는이 응용 프로그램의 두 인스턴스를 시작합니다.
public class MasterMember {
/**
* The constant LOG.
*/
final static Logger logger = LoggerFactory.getLogger(MasterMember.class);
public static void main(String[] args) throws Exception {
Config config = new Config();
config.setProperty("hazelcast.logging.type", "slf4j");
config.getScheduledExecutorConfig("scheduler").
setPoolSize(16).setCapacity(100).setDurability(1);
final HazelcastInstance instance = Hazelcast.newHazelcastInstance(config);
Runtime.getRuntime().addShutdownHook(new Thread() {
HazelcastInstance threadInstance = instance;
@Override
public void run() {
logger.info("Application shutdown");
for (int i = 0; i < 12; i++) {
logger.info("Verifying whether it is safe to close this instance");
boolean isSafe = getResultsForAllInstances(hzi -> {
if (hzi.getLifecycleService().isRunning()) {
return hzi.getPartitionService().forceLocalMemberToBeSafe(10, TimeUnit.SECONDS);
}
return true;
});
if (isSafe) {
logger.info("Verifying whether cluster is safe.");
isSafe = getResultsForAllInstances(hzi -> {
if (hzi.getLifecycleService().isRunning()) {
return hzi.getPartitionService().isClusterSafe();
}
return true;
});
if (isSafe) {
System.out.println("is safe.");
break;
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
threadInstance.shutdown();
}
private boolean getResultsForAllInstances(
Function<HazelcastInstance, Boolean> hazelcastInstanceBooleanFunction) {
return Hazelcast.getAllHazelcastInstances().stream().map(hazelcastInstanceBooleanFunction).reduce(true,
(old, next) -> old && next);
}
});
new Thread(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
IScheduledExecutorService scheduler = instance.getScheduledExecutorService("scheduler");
scheduler.scheduleAtFixedRate(named("1", new EchoTask("1")), 5, 10, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(named("2", new EchoTask("2")), 5, 10, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(named("3", new EchoTask("3")), 5, 10, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(named("4", new EchoTask("4")), 5, 10, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(named("5", new EchoTask("5")), 5, 10, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(named("6", new EchoTask("6")), 5, 10, TimeUnit.SECONDS);
}).start();
new Thread(() -> {
try {
// delays init
Thread.sleep(20000);
while (true) {
IScheduledExecutorService scheduler = instance.getScheduledExecutorService("scheduler");
final Map<Member, List<IScheduledFuture<Object>>> allScheduledFutures =
scheduler.getAllScheduledFutures();
// check if the subscription already exists as a task, if so, stop it
for (final List<IScheduledFuture<Object>> entry : allScheduledFutures.values()) {
for (final IScheduledFuture<Object> objectIScheduledFuture : entry) {
logger.info(
"TaskStats: name {} isDone() {} isCanceled() {} total runs {} delay (sec) {} other statistics {} ",
objectIScheduledFuture.getHandler().getTaskName(), objectIScheduledFuture.isDone(),
objectIScheduledFuture.isCancelled(),
objectIScheduledFuture.getStats().getTotalRuns(),
objectIScheduledFuture.getDelay(TimeUnit.SECONDS),
objectIScheduledFuture.getStats());
}
}
Thread.sleep(15000);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}).start();
while (true) {
Thread.sleep(1000);
}
// Hazelcast.shutdownAll();
}
}
그리고 작업 내가 뭔가를 잘못하고 있어요
public class EchoTask implements Runnable, Serializable {
/**
* serialVersionUID
*/
private static final long serialVersionUID = 5505122140975508363L;
final Logger logger = LoggerFactory.getLogger(EchoTask.class);
private final String msg;
public EchoTask(String msg) {
this.msg = msg;
}
@Override
public void run() {
logger.info("--> " + msg);
}
}
? 미리
에
감사 - 수정 -
코드 대신 System.out에의 기록 사용하도록 수정 (이상 갱신). 작업 통계의 로깅 및 Config 객체의 수정 된 사용을 추가했습니다.
로그는 :
내가 모든 작업이 두 번째를 시작하기 전에 첫 번째 노드에서 실행 될 때까지 기다리는 것이 언급하는 것을 잊었다.
고맙습니다. 3.9가 나왔을 때이 수정본을 사용할 수 있습니까? 출시 예정일이 있습니까? –
이것을 3.8.3으로 역 이식하지만 어느 릴리스에 대해서도 날짜가 없습니다. 그들은 매우 빨리 예상됩니다. –