2017-11-22 1 views
1

S3 (수천 개의 직렬화 된 객체)에 Kryo 직렬화 된 이진 데이터를 저장했습니다.Alpakka - S3에서 Kryo 직렬화 된 객체를 읽습니다.

Alpakkadata: Source[ByteString, NotUsed]으로 내용을 읽을 수 있습니다. 그러나 Kryo 형식에서는 구분 기호가 사용되지 않으므로 data.via(Framing.delimiter(...))을 사용하여 각 직렬화 된 개체를 ByteString으로 분리 할 수 ​​없습니다.

따라서 Kryo는 실제로 개체가 종료 될 때 데이터를 읽어야하며 스트리밍 친화적 인 것으로 보지 않습니다.

스트리밍 방식으로이 사례를 구현하여 하루 종일 Source[MyObject, NotUsed]이 될 수 있습니까?

답변

1

다음은이를 수행하는 그래프 단계입니다. 직렬화 된 객체가 2 바이트 문자열에 걸쳐있는 경우를 처리합니다. 객체가 크고 (유스 케이스가 아닌) 객체를 개선해야하고 Source[ByteString, NotUsed]에 두 개 이상의 바이트 문자열을 사용할 수 있어야합니다.

object KryoReadStage { 
    def flow[T](kryoSupport: KryoSupport, 
       `class`: Class[T], 
       serializer: Serializer[_]): Flow[ByteString, immutable.Seq[T], NotUsed] = 
    Flow.fromGraph(new KryoReadStage[T](kryoSupport, `class`, serializer)) 
} 

final class KryoReadStage[T](kryoSupport: KryoSupport, 
          `class`: Class[T], 
          serializer: Serializer[_]) 
    extends GraphStage[FlowShape[ByteString, immutable.Seq[T]]] { 

    override def shape: FlowShape[ByteString, immutable.Seq[T]] = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { 
    new GraphStageLogic(shape) { 

     setHandler(in, new InHandler { 

     override def onPush(): Unit = { 
      val bytes = 
      if (previousBytes.length == 0) grab(in) 
      else ByteString.fromArrayUnsafe(previousBytes) ++ grab(in) 

      Managed(new Input(new ByteBufferBackedInputStream(bytes.asByteBuffer))) { input => 
      var position = 0 
      val acc = ListBuffer[T]() 

      kryoSupport.withKryo { kryo => 
       var last = false 

       while (!last && !input.eof()) { 
       tryRead(kryo, input) match { 
        case Some(t) => 
        acc += t 
        position = input.total().toInt 
        previousBytes = EmptyArray 
        case None => 
        val bytesLeft = new Array[Byte](bytes.length - position) 

        val bb = bytes.asByteBuffer 
        bb.position(position) 
        bb.get(bytesLeft) 

        last = true 
        previousBytes = bytesLeft 
       } 
       } 

       push(out, acc.toList) 
      } 
      } 
     } 

     private def tryRead(kryo: Kryo, input: Input): Option[T] = 
      try { 
      Some(kryo.readObject(input, `class`, serializer)) 
      } catch { 
      case _: KryoException => None 
      } 

     }) 

     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      pull(in) 
     } 
     }) 

     private val EmptyArray: Array[Byte] = Array.empty 

     private var previousBytes: Array[Byte] = EmptyArray 

    } 
    } 

    override def toString: String = "KryoReadStage" 

    private lazy val in: Inlet[ByteString] = Inlet("KryoReadStage.in") 
    private lazy val out: Outlet[immutable.Seq[T]] = Outlet("KryoReadStage.out") 

} 

사용 예제 :

client.download(BucketName, key) 
    .via(KryoReadStage.flow(kryoSupport, `class`, serializer)) 
    .flatMapConcat(Source(_)) 

그것은 아래에 몇 가지 추가 도우미를 사용합니다.

ByteBufferBackedInputStream :

class ByteBufferBackedInputStream(buf: ByteBuffer) extends InputStream { 

    override def read: Int = { 
    if (!buf.hasRemaining) -1 
    else buf.get & 0xFF 
    } 

    override def read(bytes: Array[Byte], off: Int, len: Int): Int = { 
    if (!buf.hasRemaining) -1 
    else { 
     val read = Math.min(len, buf.remaining) 
     buf.get(bytes, off, read) 
     read 
    } 
    } 

} 

를 관리 :

object Managed { 

    type AutoCloseableView[T] = T => AutoCloseable 

    def apply[T: AutoCloseableView, V](resource: T)(op: T => V): V = 
    try { 
     op(resource) 
    } finally { 
     resource.close() 
    } 
} 

KryoSupport :

trait KryoSupport { 
    def withKryo[T](f: Kryo => T): T 
} 

class PooledKryoSupport(serializers: (Class[_], Serializer[_])*) extends KryoSupport { 

    override def withKryo[T](f: Kryo => T): T = { 
    pool.run(new KryoCallback[T] { 
     override def execute(kryo: Kryo): T = f(kryo) 
    }) 
    } 

    private val pool = { 
    val factory = new KryoFactory() { 
     override def create(): Kryo = { 
     val kryo = new Kryo 

     (KryoSupport.ScalaSerializers ++ serializers).foreach { 
      case ((clazz, serializer)) => 
      kryo.register(clazz, serializer) 
     } 

     kryo 
     } 
    } 

    new KryoPool.Builder(factory).softReferences().build() 
    } 

} 
관련 문제