I가 클러스터없이 로컬로 실행 다음 코드를FLINK CEP는 결정되지
1,1
1,2
1,3
2,1
2,2
2,3
...
I :
val count = new AtomicInteger()
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val text: DataStream[String] = env.readTextFile("file:///flink/data2")
val mapped: DataStream[Map[String, Any]] = text.map((x: String) => Map("user" -> x.split(",")(0), "val" -> x.split(",")(1)))
val pattern: ...
CEP.pattern(mapped, pattern).select(eventMap => {
println("Found: " + (patternName, eventMap))
count.incrementAndGet()
})
env.execute()
println(count)
내 데이터는 다음과 같은 형식의 CSV 파일 (사용자, 발) 인 패턴의 이벤트를 감지하려고합니다. event(val=1) -> event(val=2) -> event(val=3)
. 내가 알고있는 일련의 이벤트가 스트림에 존재하는 대형 입력 스트림에서이 작업을 실행할 때 시스템의 이벤트 수보다 거의 항상 일치하는 이벤트 수가 감지됩니다. env.setParallelism(1)
(코드 3 행 에서처럼) 모든 이벤트가 감지됩니다.
병렬 처리가> 1 인 경우 여러 스레드가 스트림에서 이벤트를 처리하고 있다는 것을 가정합니다. 즉, 하나의 스레드가 event(val=1) -> event(val=2)
인 동안 event(val=3)
이 다른 스레드로 전송 될 수 있으며 전체 패턴이 얻어지지 않을 수 있음을 의미합니다 감지 됨.
여기에 누락 된 것이 있습니까? 스트림에서 패턴을 잃을 수는 없지만 병렬 처리를 1로 설정하면 Flink와 같은 시스템에서 이벤트를 감지하는 목적을 무력화시키는 것 같습니다.
업데이트 :이 서로 간섭 다른 사용자의 이벤트를 방지하지만
val mapped: KeyedStream[Map[String, Any]] = text.map(...).keyBy((m) => m.get("user"))
:
내가 사용하여 스트림 키잉 시도
1,1
2,2
1,3
이에서 FLINK을 방지하지 않습니다 노드에 순서대로 이벤트를 전송하는 것은 비 결정론이 여전히 존재 함을 의미합니다.
예, 사용해 보았습니다. 불행히도 그것은 도움이되지 않습니다. – Sriram
스트림을 키잉 한 후 문제가 발생 했습니까? 결과로 얻는 것은 무엇입니까? 어떤 결과를 기대 했습니까? – Claudi
여전히 결정적이지 않습니다. 1,1 2,2 1,3 이 감지되지 않더라도 키잉은 여러 스레드에서 순서대로 처리 노드로 이벤트가 이동하는 것을 계속 차단하지 않습니다. – Sriram