2016-11-07 3 views
1

구성이있는 스트리밍 환경을 만들고 RichMapFunctionopen() 메서드에서이 구성에 액세스하려고했습니다.DataStream 프로그램에서 사용자 기능을 구성하는 방법은 무엇입니까?

예 : open() 방법을 디버깅 할 때

Configuration conf = new Configuration(); 
    conf.setBoolean("a", true); 
    StreamExecutionEnvironment env = 
     StreamExecutionEnvironment.createLocalEnvironment(8, conf); 

    DataStreamSource<Integer> source = env.fromElements(5,5,5,5,5); 
    source.map(new RichMapFunction<Integer, Integer>() { 

     @Override 
     public void open(Configuration parameters) throws Exception { 
      boolean a = parameters.getBoolean("a", false); 
      super.open(parameters); 
     } 

     @Override 
     public Integer map(Integer value) throws Exception { 
      return value; 
     } 
    }).print(); 

    env.execute(); 

그러나, 나는 구성이 비어있는 것을 찾을 수 있습니다.

내가 뭘 잘못하고 있니? 스트리밍 환경에서 RichFunction에 올바르게 구성을 전달하려면 어떻게해야합니까?

답변

1

Flink의 DataStream 및 DataSet API는 예제에서 RichMapFucntion과 같은 사용자 함수 인터페이스를 공유합니다.

open 메서드의 Flink 's RichFunctionConfiguration 매개 변수는 DataSet API의 첫 번째 버전의 레거시이며 DataStream API에서는 사용되지 않습니다. Flink는 사용자가 map() 호출로 제공 한 객체를 직렬화하여 병렬 작업자에게 전달합니다. 따라서 개체의 매개 변수를 일반 필드로 직접 설정할 수 있습니다.

+0

이 정보로 설명서를 업데이트하는 것이 좋습니다. 최신의 모든 문서는'withParameters' 메서드가 사용 가능하다는 것을 보여 주지만 찾을 수있는 곳은 없습니다. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html –

+0

예, 좋은 지적입니다. 그것은 다음 버전 (1.4) https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/best_practices.html을 위해 이미 업데이트되었습니다. 수정본이 1.3 분기로 백 포트되지 않은 이유를 잘 모릅니다. –

관련 문제