2017-09-15 3 views
0

내 플립크 패턴의 타임 아웃이 올바르게 작동하는지 테스트 할 수 있습니다. 나는 이것에 대한 FLINK의 스펙터를 사용하고 난 다음 테스트 케이스가 :JunitTest 타입의 데이터 스트림 플링 섹터가있는 경우

@Test 
public void SameDoor_TwoStatuses_OneSecondTimeoutPattern() { 
    // Arrange 
    long now = new Date().getTime(); 
    DoorEvent event1 = new DoorEvent(); 
    event1.setId(123); 
    event1.getDoor().setId(1); 
    event1.getDoor().setStatus("statusaaaaaa"); 
    event1.setTimestamp(now); 

    EventTimeInputBuilder<DoorEvent> builder = EventTimeInputBuilder.startWith(event1, event1.getTimestamp()); 
    DataStream<DoorEvent> stream = createTestStream(builder).assignTimestampsAndWatermarks(new TestTimestampExtractor<DoorEvent>()); 

    // Act 
    Pattern<DoorEvent, ?> pattern = StatusNotFollowedByAnotherStatusPattern.getPatternForSameDoor(1, "firstevent", "statusaaaaaa","secondevent", "status2"); 
    PatternStream<DoorEvent> pStream = CEP.pattern(stream, pattern); 

    DataStream<Either<Integer,Tuple2<Integer,Integer>>> patterns = pStream.select(getEventIdOfTimeoutEvent(),selectEventIdsOfPatterns()).forward(); 
    patterns.print(); // prints Left(123) 

    ExpectedRecords<Either<Integer,Tuple2<Integer,Integer>>> expectedRecords = 
     new ExpectedRecords<Either<Integer,Tuple2<Integer,Integer>>>() 
      .expect(new Left<Integer, Tuple2<Integer,Integer>>(123)); 

    expectedRecords.refine().sameFrequency(); 

    // Assert 
    assertStream(patterns, expectedRecords); 
} 

private PatternSelectFunction<DoorEvent, Tuple2<Integer, Integer>> selectEventIdsOfPatterns(){ 
    return new PatternSelectFunction<DoorEvent, Tuple2<Integer,Integer>>() { 
     private static final long serialVersionUID = 3830508947015151715L; 
     @Override 
     public Tuple2<Integer,Integer> select(Map<String, List<DoorEvent>> pattern) throws Exception { 
      Tuple2<Integer,Integer> t = new Tuple2<Integer,Integer>(); 
      t.f0 = pattern.get("firstevent").get(0).getId(); 
      t.f1 = pattern.get("secondevent").get(0).getId(); 
      return t; 
     } 
    }; 
} 

private PatternTimeoutFunction<DoorEvent, Integer> getEventIdOfTimeoutEvent(){ 
    return new PatternTimeoutFunction<DoorEvent, Integer>() { 
     private static final long serialVersionUID = 1L; 

     @Override 
     public Integer timeout(Map<String, List<DoorEvent>> arg0, long arg1) throws Exception { 
      int id = arg0.get("firstevent").get(0).getId(); 
      System.out.println("Timeout triggered on eventstatus " + arg0.get("firstevent").get(0).getDoor().getStatus()); 
      return id; 
     } 

    }; 
} 

내 코드가 patternTimeoutFunction에서, 패턴에 내 첫 번째 이벤트의 상태 인 status statusaaaaaa를 인쇄 않습니다를. 두 번째 상태는 정의 된 시간 내에 감지되지 않으므로 시간 초과가 호출되고 패턴 스트림에 정수가 추가됩니다. ExpectedRecords에서 123의 값으로 왼쪽을 기대한다고 어떻게 말합니까?

편집
내가 현재 가지고있는 오류는 다음과 같습니다

org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) 
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) 
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) 
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) 
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) 
    at java.lang.Thread.run(Unknown Source) 
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530) 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) 
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) 
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) 
    at org.apache.flink.cep.operator.TimeoutKeyedCEPPatternOperator.emitTimedOutSequences(TimeoutKeyedCEPPatternOperator.java:77) 
    at org.apache.flink.cep.operator.TimeoutKeyedCEPPatternOperator.advanceTime(TimeoutKeyedCEPPatternOperator.java:68) 
    at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:242) 
    at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) 
    at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) 
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) 
    ... 7 more 
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530) 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575) 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536) 
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) 
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) 
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) 
    ... 18 more 
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type extraction is not possible on Either type as it does not contain information about the 'left' type. 
    at org.apache.flink.api.java.typeutils.EitherTypeInfoFactory.createTypeInfo(EitherTypeInfoFactory.java:37) 
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromFactory(TypeExtractor.java:1233) 
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForObject(TypeExtractor.java:2054) 
    at org.apache.flink.api.java.typeutils.TypeExtractor.getForObject(TypeExtractor.java:2044) 
    at io.flinkspector.datastream.functions.TestSink.invoke(TestSink.java:82) 
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41) 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) 
    ... 26 more 

답변

0

문제는 spectors 버그 TestSink 기능입니다. TestSink 함수는 런타임에 Left의 제네릭 매개 변수를 추출하려고 시도하지만 불가능합니다. 대신 올바른 유형의 serializer를 만들기 위해이 정보를 인스턴스화 할 때 TestSink 함수에이 정보를 전달해야합니다. 개발자가 알 수 있도록 해당 문제를 Github 저장소에서 열어주십시오.

+0

은 github에서 문제가 발생했습니다. 회신을위한 Thx :) –