2016-12-16 4 views
0

documentation to implement a KillSwitch에 따라 프레임을 내보내는 사용자 지정 SourceShape의 킬 스위치이 간단한 예제를 작성하여 무한 수를 방출하는 소스를 중지 할 수있었습니다.Akka Streams : 비디오 파일

object KillSwitchSample extends App { 
    implicit val actorSystem = ActorSystem() 
    implicit val materializer = ActorMaterializer() 

    val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource 
    val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph) 

    val killSwitch = KillSwitches.shared("switch") 

    RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 
    val flow = builder.add(Flow[Int].map(_ * 2)) 
    mySource.via(killSwitch.flow) ~> flow ~> Sink.foreach(println) 
    ClosedShape 
    }).run() 

    Thread.sleep(200) 

    killSwitch.shutdown() 
} 

class NumbersSource extends GraphStage[SourceShape[Int]] { 
    val out: Outlet[Int] = Outlet("NumbersSource") 
    override val shape: SourceShape[Int] = SourceShape(out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 
     private var counter = 1 

     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      push(out, counter) 
      counter += 1 
     } 
     }) 
    } 
} 

소스는 OpenCV를 사용하여 비디오 파일의 프레임을 방출한다는 점에서 유스 케이스가 다릅니다. 업스트림이 취소되지 않는 이유는 무엇입니까? 내가 여기서 무엇을 놓치고 있니?

object KillSwitchMinimalMain extends App { 
    val libopencv_java = new File("lib").listFiles().map(_.getAbsolutePath).filter(_.contains("libopencv_java")) 
    System.load(libopencv_java(0)) 

    implicit val actorSystem = ActorSystem() 
    implicit val materializer = ActorMaterializer() 

    val videoFile = Video("Video.MOV") 

    val sourceGraph: Graph[SourceShape[Frame], NotUsed] = new VideoSource(videoFile) 
    val videoSource: Source[Frame, NotUsed] = Source.fromGraph(sourceGraph) 

    val killSwitch = KillSwitches.shared("switch") 

    RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 

    val matConversion: FlowShape[Frame, Image] = builder.add(Flow[Frame].map { el => MediaConversion.convertMatToImage(el.frame) }) 

    videoSource.via(killSwitch.flow) ~> matConversion ~> Sink.foreach(println) 

    ClosedShape 
    }).run() 

    Thread.sleep(200) 

    killSwitch.shutdown() 
} 

class VideoSource(videoFile: Video) extends GraphStage[SourceShape[Frame]] { 
    val out: Outlet[Frame] = Outlet("VideoSource") 
    override val shape: SourceShape[Frame] = SourceShape(out) 
    val log: Logger = LoggerFactory.getLogger(getClass) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 
     private val capture = new VideoCapture() 
     private val frame = new Mat() 
     private var videoPos: Double = _ 

     override def preStart(): Unit = { 
     capture.open(videoFile.filepath) 
     readFrame() 
     } 

     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      push(out, Frame(videoPos, frame)) 
      readFrame() 
     } 
     }) 

     private def readFrame(): Unit = { 
     if (capture.isOpened) { 
      videoPos = capture.get(1) 
      log.info(s"reading frame $videoPos") 
      capture.read(frame) 
     } 
     } 
    } 
} 

@svezfaz에 의해 요청대로 콘솔 출력 :

13:17:00.046 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 0.0 
13:17:00.160 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 1.0 
[email protected] 
13:17:00.698 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 2.0 
[email protected] 
13:17:00.826 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 3.0 
[email protected] 
13:17:00.969 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 4.0 
[email protected] 
13:17:01.137 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 5.0 
[email protected] 
// and so on .. 
+0

OpenCV 예제를 실행할 때 어떤 출력이 표시되는지 자세히 설명해 주시겠습니까? 한 프레임 당기는 데 얼마나 걸릴지 아십니까? –

+0

프레임이 꽤 빨리 읽히는 것을 보여주는 로거로 질문을 업데이트했습니다 (물론 정수를 출력하는 것보다 훨씬 느립니다). 'javafx.scene.image.WritableImage @ xxxxxxxx'는 싱크의 println입니다. – Toaditoad

답변

1

문제는 당신이 사용자 정의 단계에서 차단 소개합니다 것입니다. OpenCV API에 대해서는 잘 모르지만, capture.read(frame)으로 전화하면 추측됩니다. 별도의 지시가없는 한 그래프는 하나의 액터에서 실행되므로 무대에서 블로킹하면 전체 액터가 차단됩니다.

소스가 트릭을 수행 한 후에 async 경계를 강제로 지정하십시오.

여기서도 GraphDSL이 필요하지 않으며, via/to DSL을 사용하여 모든 것을 컴팩트하게 실행할 수 있습니다. Akka 스트림이 blogpost을 읽을 수를 기본 동시성 모델에 대한 추가 정보를 원하시면

object KillSwitchMinimalMain extends App { 
    val libopencv_java = new File("lib").listFiles().map(_.getAbsolutePath).filter(_.contains("libopencv_java")) 

    System.load(libopencv_java(0)) 
    implicit val actorSystem = ActorSystem() 
    implicit val materializer = ActorMaterializer() 

    val videoFile = Video("Video.MOV") 

    val killSwitch = KillSwitches.shared("switch") 
    val matConversion = Flow[ByteString].map { _.utf8String } 

    Source.fromGraph(new VideoSource()) 
    .async 
    .via(killSwitch.flow) 
    .via(matConversion) 
    .runForeach(println) 

    Thread.sleep(200) 

    killSwitch.shutdown() 
} 

아래

해결 시도.

+0

이 설명에 감사드립니다. [documentation] (http://doc.akka.io/docs/akka/2.4/scala/stream/stream-flows-and-basics.html#Operator_Fusion)에서 이에 대해 읽은 것을 기억하지만 분명히 실제로 적용 할 수는 없습니다. 그것. GraphDSL에 대해서는 여기서는 필요 없다는 것을 알고 있지만 게시 된 코드는 내 방송과 병합 단계를 생략 한 차분한 버전입니다. 그러나 당신이 언급 한 이후로 : GraphDSL을 사용하는 다른 단점은 좀더 장황하지 않은가? – Toaditoad

+0

또 다른 단점은 GraphDSL이 조금 더 부서지기 쉽고 런타임 오류가 발생하기 쉽다는 것입니다. 입구와 출구가 모두 제대로 연결되어 있지 않은 곳이면 어디든 있습니다. 성능상 현저한 차이는 없습니다. –

+0

감사합니다. 그건 의미가 있습니다. – Toaditoad

관련 문제