, 당신은 RichAsyncFunction
를 사용할 수 있습니다. 다음은 샘플 코드입니다 (그것이 작동되도록하는 추가 업데이트 할 필요가있다)
AsyncDataStream.orderedWait(ds, new RichAsyncFunction<Tuple2<String,MyEvent>, String>() {
transient private RedisClient client;
transient private RedisAsyncCommands<String, String> commands;
transient private ExecutorService executor;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
client = RedisClient.create("redis://localhost");
commands = client.connect().async();
executor = Executors.newFixedThreadPool(10);
}
@Override
public void close() throws Exception {
// shut down the connection and thread pool.
client.shutdown();
executor.shutdown();
super.close();
}
public void asyncInvoke(Tuple2<String, MyEvent> input, final AsyncCollector<String> collector) throws Exception {
// eg.g get something from redis in async
final RedisFuture<String> future = commands.get("key");
future.thenAccept(new Consumer<String>() {
@Override
public void accept(String value) {
collector.collect(Collections.singletonList(future.get()));
}
});
}
}, 1000, TimeUnit.MILLISECONDS);
Jedis 당신은 "RichAsyncFunction"를 사용하여 자신의 커넥터 IMPL을 쓸 수있는 비동기 인터페이스 – Dexter
이 – Dexter
가 @Dexter 당신이 간단한 예제를주지 수있다? – kingluo