2016-08-23 3 views
1

Flink를 처음 사용하고 DataSet API를 사용합니다. 마지막 단계로서 처리를 한 번 한 후에 값 중 하나를 최대 값으로 나눠서 정규화해야합니다. 그래서, 나는 .max() 연산자를 사용하여 최대 값을 얻었고 나중에 결과를 생성자의 인수로 MapFunction에 전달합니다.Flink에서 데이터 흐름을 두 번 실행합니다.

모든 작업이 두 번 수행되지만 작동합니다. 최대 값을 찾기 위해 하나의 작업이 실행되고 나중에 다른 작업이 실행되어 최종 결과가 생성됩니다 (처음부터 실행 시작). 전체 데이터 흐름을 한 번만 실행하는 해결 방법이 있습니까?

final List<Tuple6<...>> maxValues = result.max(2).collect(); 
    assert maxValues.size() == 1; 
    result.map(new NormalizeAttributes(maxValues.get(0))).writeAsCsv(...) 

@FunctionAnnotation.ForwardedFields("f0; f1; f3; f4; f5") 
@FunctionAnnotation.ReadFields("f2") 
private static class NormalizeAttributes implements MapFunction<Tuple6<...>, Tuple6<...>> { 

    private final Tuple6<...> maxValues; 

    public NormalizeAttributes(Tuple6<...> maxValues) { 
     this.maxValues = maxValues; 
    } 

    @Override 
    public Tuple6<...> map(Tuple6<...> value) throws Exception { 
     value.f2 /= maxValues.f2; 
     return value; 
    } 
} 

답변

0

collect() 즉시 collect() 요청한 데이터 세트까지 프로그램의 실행을 트리거한다. 나중에 env.execute() 또는 collect()으로 다시 전화하면 프로그램이 다시 실행됩니다.

실행의 부작용 이외에도 collect()을 사용하여 값을 후속 변환에 배포하면 데이터가 클라이언트로 전송 된 다음 나중에 클러스터로 다시 전송된다는 단점이 있습니다. 플 링크 (Flink)는 브로드 캐스트 (Broadcast) 변수를 제공하여 DataSet을 측면 입력으로 다른 변형에 제공합니다.

DataSet maxValues = result.max(2); 
result 
    .map(new NormAttrs()).withBroadcastSet(maxValues, "maxValues") 
    .writeAsCsv(...); 

NormAttrs 기능은 다음과 같을 것이다 : 다음과 같이 프로그램

사용하여 방송 변수가 보일 것

private static class NormAttr extends RichMapFunction<Tuple6<...>, Tuple6<...>> { 

    private Tuple6<...> maxValues; 

    @Override 
    public void open(Configuration config) { 
    maxValues = (Tuple6<...>)getRuntimeContext().getBroadcastVariable("maxValues").get(1); 
    } 

    @Override 
    public PredictedLink map(Tuple6<...> value) throws Exception { 
    value.f2 /= maxValues.f2; 
    return value; 
    } 
} 

당신은 documentation에서 방송 변수에 대한 자세한 정보를 찾을 수 있습니다.

+0

고맙습니다. ;) – kaser

관련 문제