2017-01-31 3 views
0

예를 들어, 다음을 나타내는 org.opencv.core.Mat 객체 용 Kryo 시리얼 라이저를 구현하는 데 문제가 있습니다. 비디오 파일의 프레임. Akka ActorSystem A는 그 안에있는 물체를 탐지하기 위해 그레이 스케일의 비디오 프레임을 시스템 B로 보냅니다 (이 아니라 akka-cluster이 아님). 이를 위해 Mat 타입의 프레임을 직렬화하여 네트워크를 통해 전송해야합니다. 그러나 실제로는 전송되지 않으며 시스템을 중단시키는 원인이되는 하트 비트를 전송하는 행위자를 중지시킵니다.Akka Remoting : org.opencv.core.Mat 용 사용자 정의 Kryo 시리얼 라이저

아이디어가 있습니까? https://github.com/romix/akka-kryo-serialization/issues/110

관련,하지만 응답하지 :

는 또한 Github에서에 요청 kryo serialization over storm

configuration.conf을

serializers { 
    kryo = "com.romix.akka.serialization.kryo.KryoSerializer" 
} 
serialization-bindings { 
    "org.opencv.core.Mat" = kryo 
} 
kryo { 
    kryo-custom-serializer-init = "de.itd.util.KryoInit" 
    type = "nograph" 
    idstrategy = "explicit" 
    buffer-size = 4096 
    max-buffer-size = -1 
    use-manifests = true 
    use-unsafe = false 
    post-serialization-transformations = "lz4" 
    kryo-trace = true 
    resolve-subclasses = false 
} 

de.itd (시스템 A와 B에) .util.KryoInit.scala (시스템 A 및 B)

package de.itd.util 

import com.esotericsoftware.kryo.Kryo 
import org.opencv.core.Mat 

class KryoInit { 
    def customize(kryo: Kryo): Unit = { 
    kryo.addDefaultSerializer(classOf[Mat], classOf[MatKryoSerializer]) 
    kryo.register(classOf[Mat], 21) 
    } 
} 

de.itd.util.MatKryoSerializer가

package de.itd.util 

import com.esotericsoftware.kryo.{Kryo, Serializer} 
import com.esotericsoftware.kryo.io.{Input, Output} 
import org.opencv.core.{CvType, Mat} 

class MatKryoSerializer extends Serializer[Mat] { 
    override def write(kryo: Kryo, output: Output, m: Mat): Unit = { 
    val bufferSize: Int = m.rows * m.cols * m.channels 
    val arrayByte: Array[Byte] = new Array[Byte](bufferSize) 
    m.get(0, 0, arrayByte) 

    output.write(arrayByte) 
    } 

    override def read(kryo: Kryo, input: Input, `type`: Class[Mat]): Mat = { 
    val rows = 2160 
    val cols = 4096 
    val channels = 1 
    val bufferSize = rows * cols * channels 
    val arrayByte: Array[Byte] = new Array[Byte](bufferSize) 

    val frame = new Mat(rows, cols , CvType.CV_8U) 
    input.readBytes(arrayByte) 
    frame.put(0, 0, arrayByte) 

    frame 
    } 
} 

로그인 시스템 (A)의 (시스템 A 및 B에)

00:00 TRACE: [kryo] Registration required: true 
00:00 TRACE: [kryo] References: false 
00:00 TRACE: [kryo] Register class ID 21: org.opencv.core.Mat (de.itd.util.MatKryoSerializer) 
[INFO] [01/31/2017 12:31:48.390] [JavaFX Application Thread] [akka.remote.Remoting] Starting remoting 
[INFO] [01/31/2017 12:31:48.598] [JavaFX Application Thread] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552] 
[INFO] [01/31/2017 12:31:48.602] [JavaFX Application Thread] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552] 
12:31:48.633 [JavaFX Application Thread] INFO de.itd.controller.MainViewController - MainActorSystem started. 
12:31:49.168 [MainActorSystem-akka.actor.default-dispatcher-2] INFO de.itd.actor.local.MainActor - Detector DetectionActor-0 registered. 
12:31:54.788 [JavaFX Application Thread] INFO de.itd.controller.MainViewController - detectCars 
12:31:56.318 [MainActorSystem-akka.actor.default-dispatcher-4] INFO de.itd.actor.local.MainActor - DetectionActor-0 asked for a frame. 
00:08 TRACE: [kryo.FieldSerializerConfig] useAsm: false 
00:08 TRACE: [kryo.FieldSerializerConfig] useAsm: false 
00:08 TRACE: [kryo] Register class ID 0: int (com.esotericsoftware.kryo.serializers.DefaultSerializers$IntSerializer) 
00:08 TRACE: [kryo] Register class ID 1: String (com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer) 
00:08 TRACE: [kryo] Register class ID 2: float (com.esotericsoftware.kryo.serializers.DefaultSerializers$FloatSerializer) 
00:08 TRACE: [kryo] Register class ID 3: boolean (com.esotericsoftware.kryo.serializers.DefaultSerializers$BooleanSerializer) 
00:08 TRACE: [kryo] Register class ID 4: byte (com.esotericsoftware.kryo.serializers.DefaultSerializers$ByteSerializer) 
00:08 TRACE: [kryo] Register class ID 5: char (com.esotericsoftware.kryo.serializers.DefaultSerializers$CharSerializer) 
00:08 TRACE: [kryo] Register class ID 6: short (com.esotericsoftware.kryo.serializers.DefaultSerializers$ShortSerializer) 
00:08 TRACE: [kryo] Register class ID 7: long (com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer) 
00:08 TRACE: [kryo] Register class ID 8: double (com.esotericsoftware.kryo.serializers.DefaultSerializers$DoubleSerializer) 
00:08 TRACE: [kryo] Register class ID 9: void (com.esotericsoftware.kryo.serializers.DefaultSerializers$VoidSerializer) 
00:08 TRACE: [kryo] Register class ID 10: scala.Enumeration$Val (com.romix.scala.serialization.kryo.EnumerationSerializer) 
00:08 TRACE: [kryo] Register class ID 11: scala.Enumeration$Value (com.romix.scala.serialization.kryo.EnumerationSerializer) 
00:08 TRACE: [kryo] Registration required: true 
00:08 TRACE: [kryo] References: false 
00:08 TRACE: [kryo] Register class ID 21: org.opencv.core.Mat (de.itd.util.MatKryoSerializer) 
00:08 DEBUG: [kryo] Write: Mat [ 2160*4096*CV_8UC1, isCont=true, isSubmat=false, nativeObj=0x7feb070c0bd0, dataAddr=0x145725020 ] 
00:08 TRACE: [kryo] Object graph complete. 
[WARN] [01/31/2017 12:32:07.645] [MainActorSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://[email protected]:2552/system/remote-watcher] Detected unreachable: [akka.tcp://[email protected]:2553] 
[WARN] [01/31/2017 12:32:07.650] [MainActorSystem-akka.remote.default-remote-dispatcher-13] [akka.remote.Remoting] Association to [akka.tcp://[email protected]:2553] having UID [-664475844] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation. 
[INFO] [01/31/2017 12:32:08.288] [MainActorSystem-akka.actor.default-dispatcher-4] [akka://MainActorSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FRemoteActorSystem%4010.150.20.159%3A2553-1] Message [akka.remote.transport.AssociationHandle$InboundPayload] from Actor[akka://MainActorSystem/deadLetters] to Actor[akka://MainActorSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FRemoteActorSystem%4010.150.20.159%3A2553-1#-764637076] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 

로그 (시스템 B에 프레임을 전송하는) 시스템 B의 프레임 수신 (시스템 A에서 프레임을 수신해야 함)

00:00 TRACE: [kryo] Registration required: true 
00:00 TRACE: [kryo] References: false 
00:00 TRACE: [kryo] Register class ID 21: org.opencv.core.Mat (de.itd.util.MatKryoSerializer) 
[INFO] [01/31/2017 12:31:29.946] [main] [akka.remote.Remoting] Starting remoting 
[INFO] [01/31/2017 12:31:30.253] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2553] 
[INFO] [01/31/2017 12:31:30.255] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2553] 
12:31:30.272 [main] INFO de.itd.ui.Main$ - RemoteActorSystem started. 
12:31:47.050 [RemoteActorSystem-akka.actor.default-dispatcher-2] INFO de.itd.actor.remote.DetectionGroupActor - Receiving initialization message... 
12:31:54.308 [RemoteActorSystem-akka.actor.default-dispatcher-5] INFO de.itd.actor.remote.DetectionActor - Frame is available. 
[WARN] [01/31/2017 12:32:06.285] [RemoteActorSystem-akka.remote.default-remote-dispatcher-7] [akka.tcp://[email protected]:2553/system/remote-watcher] Detected unreachable: [akka.tcp://[email protected]:2552] 
[WARN] [01/31/2017 12:32:06.291] [RemoteActorSystem-akka.remote.default-remote-dispatcher-6] [akka.remote.Remoting] Association to [akka.tcp://[email protected]:2552] having UID [-946314302] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation. 
[INFO] [01/31/2017 12:32:06.365] [RemoteActorSystem-akka.actor.default-dispatcher-2] [akka://RemoteActorSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FMainActorSystem%4010.150.80.177%3A2552-2] Message [akka.remote.transport.AssociationHandle$InboundPayload] from Actor[akka://RemoteActorSystem/deadLetters] to Actor[akka://RemoteActorSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FMainActorSystem%4010.150.80.177%3A2552-2#-1039432132] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[WARN] [01/31/2017 12:32:06.367] [RemoteActorSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://[email protected]:2553/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FMainActorSystem%4010.150.80.177%3A2552-1/endpointWriter] AssociationError [akka.tcp://[email protected]:2553] -> [akka.tcp://[email protected]:2552]: Error [Invalid address: akka.tcp://[email protected]:2552] [ 
akka.remote.InvalidAssociation: Invalid address: akka.tcp://Main[email protected]:2552 
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system has a UID that has been quarantined. Association aborted. 
] 
[WARN] [01/31/2017 12:32:06.371] [RemoteActorSystem-akka.remote.default-remote-dispatcher-6] [akka.remote.Remoting] Tried to associate with unreachable remote address [akka.tcp://[email protected]:2552]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has a UID that has been quarantined. Association aborted.] 

답변

0

불행히도 더 이상 문제를 재현 할 수 없습니다. 몇 번의 시행 착오 끝에 그것은 잘 작동합니다 ... 지금까지, 나는 변화된 것을 단지 야생 추측으로 생각할 수 있습니다.

나중에 참조 할 수 있도록 현재 코드 스 니펫을 첨부하고 있습니다. 원래 게시물과 달리이 버전은 case class Frame(videoPos: Double, frame: Mat)의 객체를 직렬화합니다.

구성

akka { 
    extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$"] 
    actor { 
    provider = remote 
    warn-about-java-serializer-usage = no 
    enable-additional-serialization-bindings = on 
    serializers { 
     kryo = "com.romix.akka.serialization.kryo.KryoSerializer" 
    } 
    serialization-bindings { 
     "de.itd.actor.common.Message$Frame" = kryo 
    } 
    kryo { 
     kryo-custom-serializer-init = "de.itd.util.KryoInit" 
     type = "nograph" 
     idstrategy = "explicit" 
     buffer-size = 4096 
     max-buffer-size = -1 
     use-manifests = true 
     use-unsafe = false 
     post-serialization-transformations = "lz4" 
     kryo-trace = true 
     resolve-subclasses = false 
    } 
    } 
} 

KryoInit

class KryoInit { 
    def customize(kryo: Kryo): Unit = { 
    kryo.addDefaultSerializer(classOf[Frame], classOf[FrameKryoSerializer]) 
    kryo.register(classOf[Frame], 20) 
    } 
} 

KryoSerializer

class FrameKryoSerializer extends Serializer[Frame] { 
    override def write(kryo: Kryo, output: Output, frame: Frame): Unit = { 
    output.writeDouble(frame.videoPos) 
    val m: Mat = frame.frame 
    val rows = m.rows() 
    val cols = m.cols() 
    val channels = m.channels() 
    val matType = m.`type`() 
    output.writeInt(rows) 
    output.writeInt(cols) 
    output.writeInt(channels) 
    output.writeInt(matType) 

    val bufferSize: Int = rows * cols * channels 
    val arrayByte: Array[Byte] = new Array[Byte](bufferSize) 
    m.get(0, 0, arrayByte) 
    output.write(arrayByte) 
    } 

    override def read(kryo: Kryo, input: Input, `type`: Class[Frame]): Frame = { 
    val videoFramePos = input.readDouble() 

    val rows = input.readInt() 
    val cols = input.readInt() 
    val channels = input.readInt() 
    val matType = input.readInt() 

    val bufferSize = rows * cols * channels 
    val arrayByte: Array[Byte] = new Array[Byte](bufferSize) 
    val frame = new Mat(rows, cols , matType) 
    input.readBytes(arrayByte) 
    frame.put(0, 0, arrayByte) 

    Frame(videoFramePos, frame) 
    } 
} 
관련 문제