2017-12-22 4 views
0

Flink 1.4 릴리스에서 FlinkKafkaConsumer011에는 주제에 대한 정규식을 rad-feature btw로 전달하는 기능이 있습니다. 이제는 단일 Flink 응용 프로그램이 BucketingSink를 사용하여 각 주제 (Avro 메시지)를 싱크하여 s3의 위치를 ​​구분할 수있는 방법이 있는지 궁금합니다. 예 :다른 경로에서 여러 카프카 주제를 s3으로 싱크하십시오.

s3://bucket/topic_1 
s3://bucket/topic_2 
s3://bucket/topic_3 
. 
. 
. 
s3://bucket/topic_n 

이 작업을 수행하는 방법에 대한 모든 정보는 매우 감사하겠습니다.

+0

이 도움말을 확인하십시오. https://stackoverflow.com/questions/41473343/unable-to-write-to-s3-using-s3-sink-using-streamexecutionenvironment-apache-fl – soheil

답변

0

루트 버킷은 다른 당신은 "그냥"당신이 할 수있는 각 주제에 대한 다른 경로하려면 다음

  1. 는 메타 데이터 항목을 얻을 카프카
  2. 에서 메시지를 역 직렬화 할 때 요소에 저장을
  3. 전체 처리를 통해 요소를 전달하십시오.
  4. 전달 된 요소에서 주제를 쿼리하여 getBucketPath 메소드를 재정 의하여 topic과 경로를 리턴하는 BasePathBucketer의 서브 클래스를 제공하십시오.
관련 문제