2017-12-08 3 views
2

invoke 싱크대 방법은 비동기식 io를 만드는 방법이 아닙니다. 예 : Future을 반환합니까?플립크 싱크가 바이오 만 지원합니까?

는 예를 들어, 레디 스 커넥터 동기 레디 스 명령을 실행하는 LIB jedis 사용

https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java

은 그 다음 명령 당 레디 스 서버에서 네트워크 응답을 기다리는 FLINK의 작업 스레드를 차단하는 것이다! 싱크가있는 동일한 스레드에서 실행중인 다른 연산자가 가능합니까? 그렇다면, 그것도 그들을 막을까요?

필자는 flink에 asyncio API가 있음을 알고 있지만 싱크 impl에 사용 된 것처럼 보이지 않습니까? @Dexter가 언급 한 바와 같이

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html

+0

Jedis 당신은 "RichAsyncFunction"를 사용하여 자신의 커넥터 IMPL을 쓸 수있는 비동기 인터페이스 – Dexter

+1

이 – Dexter

+0

가 @Dexter 당신이 간단한 예제를주지 수있다? – kingluo

답변

1

, 당신은 RichAsyncFunction를 사용할 수 있습니다. 다음은 샘플 코드입니다 (그것이 작동되도록하는 추가 업데이트 할 필요가있다)

AsyncDataStream.orderedWait(ds, new RichAsyncFunction<Tuple2<String,MyEvent>, String>() { 
     transient private RedisClient client; 
     transient private RedisAsyncCommands<String, String> commands; 
     transient private ExecutorService executor; 

     @Override 
     public void open(Configuration parameters) throws Exception { 
      super.open(parameters); 

      client = RedisClient.create("redis://localhost"); 
      commands = client.connect().async(); 
      executor = Executors.newFixedThreadPool(10); 
     } 

     @Override 
     public void close() throws Exception { 
      // shut down the connection and thread pool. 
      client.shutdown(); 
      executor.shutdown(); 

      super.close(); 
     } 

     public void asyncInvoke(Tuple2<String, MyEvent> input, final AsyncCollector<String> collector) throws Exception { 
      // eg.g get something from redis in async 
      final RedisFuture<String> future = commands.get("key"); 
      future.thenAccept(new Consumer<String>() { 
       @Override 
       public void accept(String value) { 
        collector.collect(Collections.singletonList(future.get())); 
       } 
      }); 
     } 
    }, 1000, TimeUnit.MILLISECONDS); 
+0

'thenAccept'와'addListener'의 차이점은 무엇입니까? – kingluo

+0

여기서'addListener'는 GUAVA의'ListenableFuture' 출신이고,'thenAccept'은 어디서 왔는지 모르겠습니다.'ListenableFuture'에서'addListener'와 비슷한 기능을 가진 Java 8의'CompletableFuture'를 참조하십니까? – David

+0

redis 클라이언트로 lettuce를 사용합니까? 반환 된 미래에는 완료 콜백을 바인드하는 'thenAccept' 메소드가 있습니다. https://github.com/lettuce-io/lettuce-core/wiki/Asynchronous-API#consuming-futures – kingluo

관련 문제