2016-09-26 2 views
4

필자의 요구 사항은 하루에 수백만 개의 레코드를 스트리밍하는 것이며 외부 구성 매개 변수에 큰 의존성이 있습니다. 예를 들어, 사용자는 웹 응용 프로그램에서 언제든지 필요한 설정을 변경할 수 있으며 변경 후에는 새로운 응용 프로그램 구성 매개 변수로 스트리밍해야합니다. 이는 앱 수준의 구성이며 각 데이터를 전달하고 필터링해야하는 일부 동적 제외 매개 변수도 있습니다.Flink : 플 링크에서 외부 앱 구성 변경을 처리하는 방법

나는 모든 작업 관리자와 하위 작업에서 공유되는 전역 상태가 없다는 것을 알았습니다. 중앙 집중식 캐시를 갖는 것은 옵션이지만 각 매개 변수에 대해 캐시에서 읽어야하므로 대기 시간이 길어집니다. 이러한 종류의 시나리오와 다른 응용 프로그램이이를 어떻게 처리하는지보다 나은 접근 방법에 대해 조언하십시오. 감사.

답변

3

실행중인 스트리밍 응용 프로그램의 구성을 업데이트하는 것이 공통 요구 사항입니다. Flink의 DataStream API에서이 작업은 두 개의 입력 스트림을 처리하는 이른바 CoFlatMapFunction을 사용하여 수행 할 수 있습니다. 스트림 중 하나는 데이터 스트림이고 다른 하나는 제어 스트림이 될 수 있습니다.

다음 예제에서는 특정 길이를 초과하는 문자열을 필터링하는 사용자 함수를 동적으로 적용하는 방법을 보여줍니다.

val data: DataStream[String] = ??? 
val control: DataStream[Int] = ??? 

val filtered: DataStream[String] = data 
    // broadcast all control messages to the following CoFlatMap subtasks 
    .connect(control.broadcast) 
    // process data and control messages 
    .flatMap(new DynLengthFilter) 


class DynLengthFilter extends CoFlatMapFunction[String, Int, String] with Checkpointed[Int] { 

    var length = 0 

    // filter strings by length 
    override def flatMap1(value: String, out: Collector[String]): Unit = { 
    if (value.length < length) { 
     out.collect(value) 
    } 
    } 

    // receive new filter length 
    override def flatMap2(value: Int, out: Collector[String]): Unit = { 
    length = value 
    } 

    override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Int = length 

    override def restoreState(state: Int): Unit = { 
    length = state 
    } 
} 

DynLengthFilter 사용자 기능은 필터 길이에 대한 Checkpointed 인터페이스를 구현한다. 장애가 발생하면이 정보가 자동으로 복원됩니다.

관련 문제