2016-12-08 1 views
0

추가 요청이 업스트림으로 이동하는 사용자 정의 Flow에서 버그를 추적하려고합니다. 나는 내가 보았던 행동과 벌목을 감안할 때 실패 했어야하는 테스트 케이스를 썼다.왜 Akka Streams TestSource는 모든 요청을 요청으로 받아들이지 않습니까?

나는 명백한 여분의 잡아 당김과 더불어 사소한 흐름을 만들었다. 그러나 테스트는 아직도 실패하지 않았다. 여기

class EagerFlow extends GraphStage[FlowShape[String, String]] { 

    val in: Inlet[String] = Inlet("EagerFlow.in") 
    val out: Outlet[String] = Outlet("EagerFlow.out") 

    override def shape: FlowShape[String, String] = FlowShape(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { 
    setHandler(in, new InHandler { 
     override def onPush() = { 
     println("EagerFlow.in.onPush") 
     println("EagerFlow.out.push") 
     push(out, grab(in)) 
     println("EagerFlow.in.pull") 
     pull(in) 
     } 
    }) 
    setHandler(out, new OutHandler { 
     override def onPull() = { 
     println("EagerFlow.out.onPull") 
     println("EagerFlow.in.pull") 
     pull(in) 
     } 
    }) 
    } 
} 

통과해야 시험, 즉 별도의 요구가 있는지 확인이다 : 여기

는 일례이다. 표준 출력에서 ​​

"Eager Flow" should "pull on every push" in { 
    val sourceProbe = TestSource.probe[String] 
    val sinkProbe = TestSink.probe[String] 

    val eagerFlow = Flow.fromGraph(new EagerFlow) 

    val (source, sink) = sourceProbe. 
    via(eagerFlow). 
    toMat(sinkProbe)(Keep.both). 
    run 

    sink.request(1) 
    source.expectRequest() 
    source.expectNoMsg() 

    source.sendNext("hello") 
    sink.expectNext() 
    source.expectRequest() 
} 

당신이 볼 수

0C3E08A8 PULL DownstreamBoundary -> [email protected] ([email protected]) [[email protected]] 
EagerFlow.out.onPull 
EagerFlow.in.pull 
0C3E08A8 PULL [email protected] -> UpstreamBoundary (BatchingActorInputBoundary(id=0, fill=0/16, completed=false, canceled=false)) [UpstreamBoundary] 
EagerFlow.in.onPush 
EagerFlow.out.push 
EagerFlow.in.pull 
0C3E08A8 PULL [email protected] -> UpstreamBoundary (BatchingActorInputBoundary(id=0, fill=0/16, completed=false, canceled=false)) [UpstreamBoundary] 

assertion failed: timeout (3 seconds) during expectMsg: 
    java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg: 
    at scala.Predef$.assert(Predef.scala:170) 
at akka.testkit.TestKitBase$class.expectMsgPF(TestKit.scala:368) 
at akka.testkit.TestKit.expectMsgPF(TestKit.scala:737) 
at akka.stream.testkit.StreamTestKit$PublisherProbeSubscription.expectRequest(StreamTestKit.scala:665) 
at akka.stream.testkit.TestPublisher$Probe.expectRequest(StreamTestKit.scala:172) 

왜이 작업을 수행합니다 (이것은 잘 실제로 인쇄 점하지만 효과적으로 같은 일을 사용 에서 디버그 로깅입니다) UpstreamBoundary에서 멈추고 TestSource.probe으로 되돌아 가지 않으십니까?

나는 올바른 배압을 보장하기 위해 expectNoMsg()을 사용하는 테스트를 꽤 많이하고 있지만 위식도가 잘못된 것처럼 보입니다. 이것이 작동하지 않으면 어떻게 테스트해야합니까?

답변

1

는 기본 크기는 1

val settings = ActorMaterializerSettings(system) 
    .withInputBuffer(initialSize = 1, maxSize = 1) 
implicit val mat = ActorMaterializer(settings) 

크기로 당신의 흐름 버퍼를 설정해야합니다 테스트 적용 만들려면 :

# Initial size of buffers used in stream elements 
    initial-input-buffer-size = 4 
    # Maximum size of buffers used in stream elements 
    max-input-buffer-size = 16 

이 의미 Akka 열심히 것 어쨌든을 요청하고 최대 16 개의 원소를 섭취하십시오. 내부 버퍼에 대한 자세한 내용은 here을 참조하십시오.

+0

네가 맞습니다. 'BatchingActorInputBoundary'가 16 개의 엘리먼트에 대해'tryRequest'를 호출하고있는 것을 볼 수 있습니다. 처음에는 비동기 경계가 어떻게 생성되는지 조금 뒤로 보입니다. TestKit이 개별 단계를 좀 더 직관적으로 테스트 할 수있는 무언가를 제공 할 것이라고 생각했을 것입니다. 내가 깨닫지 못했던 또 다른 점은'Probe.expectRequest()'가 multiple을 허용하고 그 수를 반환한다는 것입니다. 이것이 복수형 이름을 가진다면 더 명백 할 것입니다. – Steiny

관련 문제