2017-01-22 9 views
2

나는 Alpakka-FTP을 사용하고 있지만 어쩌면 나는 일반적인 akka-stream 패턴을 찾고있다. 는 FTP 커넥터 파일을 나열하거나 검색 할 수 있습니다Akka Streams, 다른 출처로 항목을 보냅니 까?

def ls(host: String): Source[FtpFile, NotUsed] 
def fromPath(host: String, path: Path): Source[ByteString, Future[IOResult]] 

를 이상적으로, 나는이 같은 스트림을 생성하고 싶습니다 :

LIST 
    .FETCH_ITEM 
    .FOREACH(do something) 

을하지만 두 함수와 같은 스트림을 만들 수 없습니다 해요 나는 위에 썼다. 나는, 나는 Flow를 사용하여이 얻을 수 있어야처럼 단지 lsfromPath 기능 위에서 주어진

Ftp.ls 
    .via(some flow that uses the Ftp.fromPath above) 
    .runWith(Sink.foreach(do something)) 

같은이 가능를 느낌?

편집 :

내가 한 배우와 mapAsync를 사용하여 해결할 수 있어요,하지만 난 여전히 더 간단합니다 생각합니다.

class Downloader extends Actor { 
    override def receive = { 
    case ftpFile: FtpFile => 
     Ftp.fromPath(Paths.get(ftpFile.path), settings) 
     .toMat(FileIO.toPath(Paths.get("testHDF.txt")))(Keep.right) 
     .run() pipeTo sender 
    } 
} 

val downloader = as.actorOf(Props(new Downloader)) 

Ftp.ls("test_path", settings) 
    .mapAsync(1)(ftpFile => (downloader ? ftpFile) (3.seconds).mapTo[IOResult]) 
    .runWith(Sink.foreach(res => println("got it!" + res))) 

답변

0

이 목적으로 flatMapConcat을 사용할 수 있습니다. 귀하의 구체적인 예

Ftp.ls("test_path", settings).flatMapConcat{ ftpFile => 
    Ftp.fromPath(Paths.get(ftpFile.path), settings) 
}.runWith(FileIO.toPath(Paths.get("testHDF.txt"))) 

문서 here에 다시 기록 할 수있다.

관련 문제