2016-08-10 4 views
1

I가 클러스터없이 로컬로 실행 다음 코드를FLINK CEP는 결정되지

1,1 
1,2 
1,3 
2,1 
2,2 
2,3 
... 

I :

val count = new AtomicInteger() 
val env = StreamExecutionEnvironment.getExecutionEnvironment 
env.setParallelism(1) 
val text: DataStream[String] = env.readTextFile("file:///flink/data2") 
val mapped: DataStream[Map[String, Any]] = text.map((x: String) => Map("user" -> x.split(",")(0), "val" -> x.split(",")(1))) 
val pattern: ... 
CEP.pattern(mapped, pattern).select(eventMap => { 
    println("Found: " + (patternName, eventMap)) 
    count.incrementAndGet() 
}) 

env.execute() 
println(count) 

내 데이터는 다음과 같은 형식의 CSV 파일 (사용자, 발) 인 패턴의 이벤트를 감지하려고합니다. event(val=1) -> event(val=2) -> event(val=3). 내가 알고있는 일련의 이벤트가 스트림에 존재하는 대형 입력 스트림에서이 작업을 실행할 때 시스템의 이벤트 수보다 거의 항상 일치하는 이벤트 수가 감지됩니다. env.setParallelism(1) (코드 3 행 에서처럼) 모든 이벤트가 감지됩니다.

병렬 처리가> 1 인 경우 여러 스레드가 스트림에서 이벤트를 처리하고 있다는 것을 가정합니다. 즉, 하나의 스레드가 event(val=1) -> event(val=2) 인 동안 event(val=3)이 다른 스레드로 전송 될 수 있으며 전체 패턴이 얻어지지 않을 수 있음을 의미합니다 감지 됨.

여기에 누락 된 것이 있습니까? 스트림에서 패턴을 잃을 수는 없지만 병렬 처리를 1로 설정하면 Flink와 같은 시스템에서 이벤트를 감지하는 목적을 무력화시키는 것 같습니다.

업데이트 :이 서로 간섭 다른 사용자의 이벤트를 방지하지만

val mapped: KeyedStream[Map[String, Any]] = text.map(...).keyBy((m) => m.get("user")) 

:

내가 사용하여 스트림 키잉 시도

1,1 
2,2 
1,3 

이에서 FLINK을 방지하지 않습니다 노드에 순서대로 이벤트를 전송하는 것은 비 결정론이 여전히 존재 함을 의미합니다.

답변

0

스트림을 사용자 ID (첫 번째 값)로 입력하는 것에 대해 생각해 보셨습니까? Flink는 하나의 키에 대한 모든 이벤트가 동일한 처리 노드에 전달되도록합니다. 사용자 당 val = 1-> val = 2-> val = 3의 패턴을 감지하려는 경우에만 도움이됩니다.

+0

예, 사용해 보았습니다. 불행히도 그것은 도움이되지 않습니다. – Sriram

+0

스트림을 키잉 한 후 문제가 발생 했습니까? 결과로 얻는 것은 무엇입니까? 어떤 결과를 기대 했습니까? – Claudi

+0

여전히 결정적이지 않습니다. 1,1 2,2 1,3 이 감지되지 않더라도 키잉은 여러 스레드에서 순서대로 처리 노드로 이벤트가 이동하는 것을 계속 차단하지 않습니다. – Sriram

0

아마도지도 연산자 뒤에 keyBy 연산자를 적용하는 것이 문제 일 수 있습니다.

그래서, 대신 :

val mapped: KeyedStream[Map[String, Any]] = text.map(...).keyBy((m) => m.get("user")) 

이 있어야한다 :

val mapped: KeyedStream[Map[String, Any]] = text.keyBy((m) => m.get("user")).map(...) 

나는이 오래된 질문 알아요,하지만 어쩌면 누군가를하는 데 도움이됩니다.