Flink를 처음 사용하고 DataSet API를 사용합니다. 마지막 단계로서 처리를 한 번 한 후에 값 중 하나를 최대 값으로 나눠서 정규화해야합니다. 그래서, 나는 .max()
연산자를 사용하여 최대 값을 얻었고 나중에 결과를 생성자의 인수로 MapFunction에 전달합니다.Flink에서 데이터 흐름을 두 번 실행합니다.
모든 작업이 두 번 수행되지만 작동합니다. 최대 값을 찾기 위해 하나의 작업이 실행되고 나중에 다른 작업이 실행되어 최종 결과가 생성됩니다 (처음부터 실행 시작). 전체 데이터 흐름을 한 번만 실행하는 해결 방법이 있습니까?
final List<Tuple6<...>> maxValues = result.max(2).collect();
assert maxValues.size() == 1;
result.map(new NormalizeAttributes(maxValues.get(0))).writeAsCsv(...)
@FunctionAnnotation.ForwardedFields("f0; f1; f3; f4; f5")
@FunctionAnnotation.ReadFields("f2")
private static class NormalizeAttributes implements MapFunction<Tuple6<...>, Tuple6<...>> {
private final Tuple6<...> maxValues;
public NormalizeAttributes(Tuple6<...> maxValues) {
this.maxValues = maxValues;
}
@Override
public Tuple6<...> map(Tuple6<...> value) throws Exception {
value.f2 /= maxValues.f2;
return value;
}
}
고맙습니다. ;) – kaser