실행중인 스트리밍 응용 프로그램의 구성을 업데이트하는 것이 공통 요구 사항입니다. Flink의 DataStream API에서이 작업은 두 개의 입력 스트림을 처리하는 이른바 CoFlatMapFunction
을 사용하여 수행 할 수 있습니다. 스트림 중 하나는 데이터 스트림이고 다른 하나는 제어 스트림이 될 수 있습니다.
다음 예제에서는 특정 길이를 초과하는 문자열을 필터링하는 사용자 함수를 동적으로 적용하는 방법을 보여줍니다.
val data: DataStream[String] = ???
val control: DataStream[Int] = ???
val filtered: DataStream[String] = data
// broadcast all control messages to the following CoFlatMap subtasks
.connect(control.broadcast)
// process data and control messages
.flatMap(new DynLengthFilter)
class DynLengthFilter extends CoFlatMapFunction[String, Int, String] with Checkpointed[Int] {
var length = 0
// filter strings by length
override def flatMap1(value: String, out: Collector[String]): Unit = {
if (value.length < length) {
out.collect(value)
}
}
// receive new filter length
override def flatMap2(value: Int, out: Collector[String]): Unit = {
length = value
}
override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Int = length
override def restoreState(state: Int): Unit = {
length = state
}
}
DynLengthFilter
사용자 기능은 필터 길이에 대한 Checkpointed
인터페이스를 구현한다. 장애가 발생하면이 정보가 자동으로 복원됩니다.