2017-03-10 4 views
1

Akka Stream에 사용자 지정 Source[ByteSting]을 구현하고 싶습니다. 이 소스는 제공된 파일에서 제공된 바이트 범위 내에서 데이터를 읽고 다운 ​​스트림으로 전파해야합니다.ActorPublisher를 기반으로 사용자 지정 Akka Streams 소스 구현

처음에는 ActorPublisher에 섞인 Actor를 구현하여이 작업을 수행 할 수 있다고 생각했습니다. 이 구현은 주어진 바이트의 데이터 만 대신 공급 경로에서 전체 파일을 읽고 akka.stream.impl.io.FilePublisher 유사하다의 범위 :

val fileSource = Source.actorPublisher(FilePublisher.props(pathToFile, 0, fileLength)) 
val future = fileSource.runWith(Sink.seq) 
:

import java.nio.ByteBuffer 
import java.nio.channels.FileChannel 
import java.nio.file.{Path, StandardOpenOption} 

import akka.actor.{ActorLogging, DeadLetterSuppression, Props} 
import akka.stream.actor.ActorPublisher 
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request} 
import akka.util.ByteString 

import scala.annotation.tailrec 
import scala.util.control.NonFatal 

class FilePublisher(pathToFile: Path, startByte: Long, endByte: Long) extends ActorPublisher[ByteString] 
    with ActorLogging{ 

    import FilePublisher._ 

    private val chunksToBuffer = 10 
    private var bytesLeftToRead = endByte - startByte + 1 
    private var fileChannel: FileChannel = _ 
    private val buffer = ByteBuffer.allocate(8096) 

    private var bufferedChunks: Vector[ByteString] = _ 

    override def preStart(): Unit = { 
    try { 
     log.info("Starting") 
     fileChannel = FileChannel.open(pathToFile, StandardOpenOption.READ) 
     bufferedChunks = readAhead(Vector.empty, Some(startByte)) 
     log.info("Chunks {}", bufferedChunks) 
    } catch { 
     case NonFatal(ex) => onErrorThenStop(ex) 
    } 
    } 

    override def postStop(): Unit = { 

    log.info("Stopping") 
    if (fileChannel ne null) 
     try fileChannel.close() catch { 
     case NonFatal(ex) => log.error(ex, "Error during file channel close") 
    } 
    } 

    override def receive: Receive = { 
    case Request => 
     readAndSignalNext() 
     log.info("Got request") 
    case Continue => 
     log.info("Continuing reading") 
     readAndSignalNext() 
    case Cancel => 
     log.info("Cancel message got") 
     context.stop(self) 
    } 

    private def readAndSignalNext() = { 

    log.info("Reading and signaling") 
    if (isActive) { 
     bufferedChunks = readAhead(signalOnNext(bufferedChunks), None) 
     if (isActive && totalDemand > 0) self ! Continue 
    } 
    } 

    @tailrec 
    private def signalOnNext(chunks: Vector[ByteString]): Vector[ByteString] = { 

    if (chunks.nonEmpty && totalDemand > 0) { 
     log.info("Signaling") 
     onNext(chunks.head) 
     signalOnNext(chunks.tail) 
    } else { 
     if (chunks.isEmpty && bytesLeftToRead > 0) { 
     onCompleteThenStop() 
     } 
     chunks 
    } 
    } 

    @tailrec 
    private def readAhead(currentlyBufferedChunks: Vector[ByteString], startPosition: Option[Long]): Vector[ByteString] = { 

    if (currentlyBufferedChunks.size < chunksToBuffer) { 

     val bytesRead = readDataFromChannel(startPosition) 
     log.info("Bytes read {}", bytesRead) 
     bytesRead match { 
     case Int.MinValue => Vector.empty 
     case -1 => 
      log.info("EOF reached") 
      currentlyBufferedChunks // EOF reached 
     case _ => 
      buffer.flip() 
      val chunk = ByteString(buffer) 
      buffer.clear() 

      bytesLeftToRead -= bytesRead 
      val trimmedChunk = if (bytesLeftToRead >= 0) chunk else chunk.dropRight(bytesLeftToRead.toInt) 
      readAhead(currentlyBufferedChunks :+ trimmedChunk, None) 
     } 

    } else { 
     currentlyBufferedChunks 
    } 
    } 

    private def readDataFromChannel(startPosition: Option[Long]): Int = { 
    try { 
     startPosition match { 
     case Some(position) => fileChannel.read(buffer, position) 
     case None => fileChannel.read(buffer) 
     } 
    } catch { 
     case NonFatal(ex) => 
     log.error(ex, "Got error reading data from file channel") 
     Int.MinValue 
    } 
    } 
} 

object FilePublisher { 

    private case object Continue extends DeadLetterSuppression 

    def props(path: Path, startByte: Long, endByte: Long): Props = Props(classOf[FilePublisher], path, startByte, endByte) 
} 

는하지만 Source을 구체화 할 때이처럼 내 FilePublisher의 지원 밝혀

아무 것도 발생하지 않으며 원본에서 데이터를 더 다운 스트림으로 전파하지 않습니다.

FilePublisher을 기반으로 Source을 구체화하는 다른 올바른 방법이 있습니까? 아니면이 API를 사용하지 말고 here과 같은 맞춤 처리 단계를 구현해야합니까?

CustomStage 접근법의 문제점은 간단한 구현이이 단계에서 IO를 바로 수행한다는 것입니다. 스테이지에서 사용자 지정 스레드 풀 또는 액터로 IO를 이동할 수 있지만이 작업을 수행하려면 스테이지와 액터간에 동기화가 필요합니다. 감사합니다. .

답변

0

문제는 receive있어서의 패턴 매칭에 실수로 인한되었다 : 사실 생각만큼 단일 매개 변수 (final case class Request(n: Long))가 아닌 경우에 개체의 경우 클래스 대신 case Request(_)Request 때문에 이 광고 case Request =>이 있어야한다.

0

현재 IO 작업을 위해 별도의 디스패처를 사용하고 있지 않은 것으로 나타났습니다. Here's 왜 이렇게하지 않는지 설명하는 docs 섹션은 응용 프로그램에서 불쾌한 차단을 초래할 수 있습니다.

Akka Streams는 특정 스레드 풀 기반의 디스패처를 사용하여 FileSourceFilePublisher을 래핑합니다. 영감을 얻기 위해 코드 here을 확인할 수 있습니다.

+0

감사합니다. 올바른 지적입니다. 앞으로 별도의 디스패처 사용을 추가 할 계획입니다. – thereisnospoon

관련 문제