까다로운 부분은 따라서 귀하의 ParseData
함수의 병렬 인스턴스가 서로 독립적으로 실행되고, 분산 시스템에서 코드를 실행하는 것이있다.
당신은 여전히 ParseData
에 로컬 ID 카운터를 사용하여 고유 ID를 할당 할 수 있습니다. 중복을 피하기위한 트릭은 올바른 초기화 및 카운터 증분입니다. 병렬 처리를 4로 가정하면 ParseData
개의 인스턴스가 4 개 있습니다 (PD1 ... PD4
). 다음과 같은 ID 할당을 할 것입니다 :
PD1: 0, 4, 8, 12, ...
PD2: 1, 5, 9, 13, ...
PD3, 2, 6, 10, 14, ...
PD4: 3, 7, 11, 15, ...
서로 다른 값 (아래 세부 사항)와 병렬 인스턴스를 초기화하여이 작업을 수행 할 수 및 병렬 처리에 의해 각 인스턴스의 수를 증가 (즉, ID += parallelism
).
Flink에서 병렬 함수로 인스턴스화 된 모든 것은 자동으로 고유 한 번호 (작업 인덱스)가 할당됩니다. 이 번호를 사용하여 ID 카운터를 초기화 할 수 있습니다. 작업 색인은 RuntimeContext.getIndexOfThisSubtask()
을 통해 얻을 수 있습니다. 또한 RuntimeContext
이 ParseData
를 구현하고 open()
에 getRuntimeContext()
를 호출 할 RichMapFunction
를 사용 얻으려면 RuntimeContext.getNumberOfParallelSubtasks()
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RuntimeContext.html
을 통해 운영자/기능 병렬 처리를받을 수 있습니다. 이 같은
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RichFunction.html
뭔가 (관련 방법을 보여에만 해당) :
class ParseDate extends RichMapFunction {
private long parallelism;
private long idCounter;
public void open(Configuration parameters) {
RuntimeContext ctx = getRuntimeContext();
parallelism = ctx.getNumberOfParallelSubtasks();
idCounter = ctx.getIndexOfThisSubtask();
}
public OutputDataType map(InputDataType value) {
OutputDataType output = new OutputDataType();
output.setID(idCounter);
idCounter += parallelism;
// further processing
return output;
}
}
감사합니다, 날 위해 일했습니다. 작동을 위해'public void open (Configuration parameters)'를 추가해야했습니다. 그러나, 이런 식으로 마지막 ID 연속되지 않습니다 (모든 실행하는 동안 그들은 다르게 할당됩니다),하지만 이것은 각 인스턴스에 할당 된 요소의 수와 관련이있는 것 같아요. –
내 대답에 열린 방식을 고쳤습니다 - 지적 해 주셔서 고맙습니다. 예, 데이터가 균등하게 분배되지 않으면 공유 된 전역 상태 (성능에 어려움을 겪을 수 있음)가 필요하므로 매우 어려운 연속적인 ID를 얻지 못할 수도 있습니다. 나는 당신의 질문에서이 세부 사항을 간과했다. –