2016-06-09 4 views
1

Flink Streaming API를 처음 사용하고 다음과 같은 간단한 (IMO) 작업을 수행하고 싶습니다. 두 개의 스트림이 있으며 개수 기반의 창을 사용하여 이들과 합류하고 싶습니다. 지금까지 가지고있는 코드는 다음과 같습니다.카운트 기반 창을 사용하여 두 스트림을 결합하십시오.

public class BaselineCategoryEquiJoin { 

private static final String recordFile = "some_file.txt"; 

private static class ParseRecordFunction implements MapFunction<String, Tuple2<String[], MyRecord>> { 
    public Tuple2<String[], MyRecord> map(String s) throws Exception { 
     MyRecord myRecord = parse(s); 
     return new Tuple2<String[], myRecord>(myRecord.attributes, myRecord); 
    } 
} 

public static void main(String[] args) throws Exception { 
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment(); 
    ExecutionConfig config = environment.getConfig(); 
    config.setParallelism(8); 
    DataStream<Tuple2<String[], MyRecord>> dataStream = environment.readTextFile(recordFile) 
      .map(new ParseRecordFunction()); 
    DataStream<Tuple2<String[], MyRecord>> dataStream1 = environment.readTextFile(recordFile) 
      .map(new ParseRecordFunction()); 
    DataStreamSink<Tuple2<String[], String[]>> joinedStream = dataStream1 
      .join(dataStream) 
      .where(new KeySelector<Tuple2<String[],MyRecord>, String[]>() { 
       public String[] getKey(Tuple2<String[], MyRecord> recordTuple2) throws Exception { 
        return recordTuple2.f0; 
       } 
      }).equalTo(new KeySelector<Tuple2<String[], MyRecord>, String[]>() { 
       public String[] getKey(Tuple2<String[], MyRecord> recordTuple2) throws Exception { 
        return recordTuple2.f0; 
       } 
      }).window(TumblingProcessingTimeWindows.of(Time.seconds(1))) 
      .apply(new JoinFunction<Tuple2<String[],MyRecord>, Tuple2<String[],MyRecord>, Tuple2<String[], String[]>>() { 
       public Tuple2<String[], String[]> join(Tuple2<String[], MyRecord> tuple1, Tuple2<String[], MyRecord> tuple2) throws Exception { 
        return new Tuple2<String[], String[]>(tuple1.f0, tuple1.f0); 
       } 
      }).print(); 
    environment.execute(); 
} 
} 

내 코드는 오류없이 작동하지만 결과가 나오지 않습니다. 실제로 apply 메서드에 대한 호출은 호출되지 않습니다 (디버그 모드에서 중단 점을 추가하여 확인). 제 생각에, 이전의 주된 이유는 제 데이터에 시간 속성이 없다는 것입니다. 따라서 윈도우 화 (window 통해 구체화) 제대로 수행되지 않습니다. 그러므로, 제 질문은 카운트 윈도우를 기반으로 제 가입을 원한다는 것을 어떻게 나타낼 수 있을까요? 예를 들어, 조인은 각 스트림에서 100 개의 튜플을 구체화합니다. Flink에서 이전 버전이 가능합니까? 그렇다면 코드를 변경하려면 무엇을 변경해야합니까?

이 시점에서 나는 countWindow() 메서드를 호출하려했으나 어떤 이유로 Flink의 JoinedStreams에 의해 제공되지 않습니다.

고맙습니다.

답변

2

개수 기반 조인은 지원되지 않습니다. "이벤트 시간"의미론을 사용하여 카운트 기반 윈도우를 에뮬레이트하고 각 레코드에 고유 한 seq-id를 타임 스탬프로 적용 할 수 있습니다. 따라서 "5"의 시간 윈도우는 실제로 5의 카운트 윈도우가됩니다.

+0

https://stackoverflow.com/questions/46282692/match-based-on- 두 개의 속성 사이의 두 데이터 스트림 및 수집 모두 기반의 m – Kumar

관련 문제