documentation to implement a KillSwitch
에 따라 프레임을 내보내는 사용자 지정 SourceShape의 킬 스위치이 간단한 예제를 작성하여 무한 수를 방출하는 소스를 중지 할 수있었습니다.Akka Streams : 비디오 파일
object KillSwitchSample extends App {
implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer()
val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource
val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)
val killSwitch = KillSwitches.shared("switch")
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val flow = builder.add(Flow[Int].map(_ * 2))
mySource.via(killSwitch.flow) ~> flow ~> Sink.foreach(println)
ClosedShape
}).run()
Thread.sleep(200)
killSwitch.shutdown()
}
class NumbersSource extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("NumbersSource")
override val shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private var counter = 1
setHandler(out, new OutHandler {
override def onPull(): Unit = {
push(out, counter)
counter += 1
}
})
}
}
소스는 OpenCV를 사용하여 비디오 파일의 프레임을 방출한다는 점에서 유스 케이스가 다릅니다. 업스트림이 취소되지 않는 이유는 무엇입니까? 내가 여기서 무엇을 놓치고 있니?
object KillSwitchMinimalMain extends App {
val libopencv_java = new File("lib").listFiles().map(_.getAbsolutePath).filter(_.contains("libopencv_java"))
System.load(libopencv_java(0))
implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer()
val videoFile = Video("Video.MOV")
val sourceGraph: Graph[SourceShape[Frame], NotUsed] = new VideoSource(videoFile)
val videoSource: Source[Frame, NotUsed] = Source.fromGraph(sourceGraph)
val killSwitch = KillSwitches.shared("switch")
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val matConversion: FlowShape[Frame, Image] = builder.add(Flow[Frame].map { el => MediaConversion.convertMatToImage(el.frame) })
videoSource.via(killSwitch.flow) ~> matConversion ~> Sink.foreach(println)
ClosedShape
}).run()
Thread.sleep(200)
killSwitch.shutdown()
}
class VideoSource(videoFile: Video) extends GraphStage[SourceShape[Frame]] {
val out: Outlet[Frame] = Outlet("VideoSource")
override val shape: SourceShape[Frame] = SourceShape(out)
val log: Logger = LoggerFactory.getLogger(getClass)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private val capture = new VideoCapture()
private val frame = new Mat()
private var videoPos: Double = _
override def preStart(): Unit = {
capture.open(videoFile.filepath)
readFrame()
}
setHandler(out, new OutHandler {
override def onPull(): Unit = {
push(out, Frame(videoPos, frame))
readFrame()
}
})
private def readFrame(): Unit = {
if (capture.isOpened) {
videoPos = capture.get(1)
log.info(s"reading frame $videoPos")
capture.read(frame)
}
}
}
}
@svezfaz에 의해 요청대로 콘솔 출력 :
13:17:00.046 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 0.0
13:17:00.160 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 1.0
[email protected]
13:17:00.698 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 2.0
[email protected]
13:17:00.826 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 3.0
[email protected]
13:17:00.969 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 4.0
[email protected]
13:17:01.137 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 5.0
[email protected]
// and so on ..
OpenCV 예제를 실행할 때 어떤 출력이 표시되는지 자세히 설명해 주시겠습니까? 한 프레임 당기는 데 얼마나 걸릴지 아십니까? –
프레임이 꽤 빨리 읽히는 것을 보여주는 로거로 질문을 업데이트했습니다 (물론 정수를 출력하는 것보다 훨씬 느립니다). 'javafx.scene.image.WritableImage @ xxxxxxxx'는 싱크의 println입니다. – Toaditoad