2016-07-27 4 views
0

CSV 파일을로드하고 사용자 정의 맵 기능을 사용하여 모든 행을 POJO로 변환합니다. 내 프로그램 논리에 대한 모든 POJO 0에서 n 고유 번호 (여기서 n 총 줄 번호) 필요합니다. 제 질문은, 변환 기능을 사용하여 모든 POJO에 고유 한 ID (예 : 초기 행 번호)를 할당 할 수 있습니까? 이상적인 방법은 UDF에서 Iterable을 가져 와서 입력 튜플을 반복하면서 변수를 증가시키고 마지막으로 해당 POJO를 출력하는 것입니다. 내 코드는 현재 다음과 같습니다.Apache Flink - 입력에 고유 ID 할당

DataSet<MyType> input = env.readCsvFile("/path/file.csv") 
    .includeFields("1111") 
    .types(String.class, Double.class, Double.class,Double.class) 
    .map(new ParseData()); 

여기서 ParseData는 Tuples를 MyType POJO로 변환합니다. 이 작업을 달성하기위한 어떤 모범 사례

이 있습니까?

답변

2

까다로운 부분은 따라서 귀하의 ParseData 함수의 병렬 인스턴스가 서로 독립적으로 실행되고, 분산 시스템에서 코드를 실행하는 것이있다.

당신은 여전히 ​​ParseData에 로컬 ID 카운터를 사용하여 고유 ID를 할당 할 수 있습니다. 중복을 피하기위한 트릭은 올바른 초기화 및 카운터 증분입니다. 병렬 처리를 4로 가정하면 ParseData 개의 인스턴스가 4 개 있습니다 (PD1 ... PD4). 다음과 같은 ID 할당을 할 것입니다 :

PD1: 0, 4, 8, 12, ... 
PD2: 1, 5, 9, 13, ... 
PD3, 2, 6, 10, 14, ... 
PD4: 3, 7, 11, 15, ... 

서로 다른 값 (아래 세부 사항)와 병렬 인스턴스를 초기화하여이 작업을 수행 할 수 및 병렬 처리에 의해 각 인스턴스의 수를 증가 (즉, ID += parallelism).

Flink에서 병렬 함수로 인스턴스화 된 모든 것은 자동으로 고유 한 번호 (작업 인덱스)가 할당됩니다. 이 번호를 사용하여 ID 카운터를 초기화 할 수 있습니다. 작업 색인은 RuntimeContext.getIndexOfThisSubtask()을 통해 얻을 수 있습니다. 또한 RuntimeContextParseData를 구현하고 open()getRuntimeContext()를 호출 할 RichMapFunction를 사용 얻으려면 RuntimeContext.getNumberOfParallelSubtasks()

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RuntimeContext.html

을 통해 운영자/기능 병렬 처리를받을 수 있습니다. 이 같은

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RichFunction.html

뭔가 (관련 방법을 보여에만 해당) :

class ParseDate extends RichMapFunction { 
    private long parallelism; 
    private long idCounter; 

    public void open(Configuration parameters) { 
     RuntimeContext ctx = getRuntimeContext(); 
     parallelism = ctx.getNumberOfParallelSubtasks(); 
     idCounter = ctx.getIndexOfThisSubtask(); 
    } 

    public OutputDataType map(InputDataType value) { 
     OutputDataType output = new OutputDataType(); 
     output.setID(idCounter); 
     idCounter += parallelism; 
     // further processing 
     return output; 
    } 
} 
+0

감사합니다, 날 위해 일했습니다. 작동을 위해'public void open (Configuration parameters)'를 추가해야했습니다. 그러나, 이런 식으로 마지막 ID 연속되지 않습니다 (모든 실행하는 동안 그들은 다르게 할당됩니다),하지만 이것은 각 인스턴스에 할당 된 요소의 수와 관련이있는 것 같아요. –

+0

내 대답에 열린 방식을 고쳤습니다 - 지적 해 주셔서 고맙습니다. 예, 데이터가 균등하게 분배되지 않으면 공유 된 전역 상태 (성능에 어려움을 겪을 수 있음)가 필요하므로 매우 어려운 연속적인 ID를 얻지 못할 수도 있습니다. 나는 당신의 질문에서이 세부 사항을 간과했다. –