2016-09-01 4 views
0

응답 this question Odomontois는 전체 정렬을 메모리에 저장할 필요없이 미리 정렬 된 스트림을 키별로 그룹화 할 수있는 지연 그룹화 연산자를 구현하는 방법을 보여주었습니다. Akka의 스트림 (예 : 소스 개체)에서 이와 같은 작업을 수행 할 수있는 방법이 있습니까? 또는 Akama 소스에서 일반 Stream 객체를 가져올 수있는 방법이 있습니까? 그렇다면 Odomontois의 chopBy를 사용할 수 있습니까?Akka 스트림에서 chopBy 구현

다음은 작동하지 않습니다이 작업을 수행 할 수있는 완전히 실패한 시도의 :

implicit class SourceChopOps[T, NU](s: Source[T, NU]) { 
    def chopBy[U](f: T => U) = { 
     s.prefixAndTail(1) 
     .map(pt => (pt._1.head, pt._2)) 
     .map { 
      case (prefix, tail) => 
      // what to do with pulled off head??? 
      tail.takeWhile(e => f(e) == f(prefix)) ++ tail.dropWhile(e => f(e) == f(prefix)).chopBy(f) // fails here 
     } 
     } 
    } 
    } 
+0

공식 문서를 확인하셨습니까? http://doc.akka.io/docs/akka/2.4.9/scala/stream/stream-cookbook.html#implementing-reduce-by-key – fGo

+0

info @fGo에 감사드립니다. Akka 그룹은 어떻게해서 대부분의 중간 데이터를 메모리에 보관할 필요성을 배제합니까? 데이터를 반환하기 전에 모든 하위 스트림에 대한 데이터를 보유해야합니까? 아니면 정말 깔끔한 흐름 제어 기법으로이 작업을 수행하지 않아도됩니까? 이전의 조건은 chopBy의 주요한 자극이었습니다. 한 번에 하나의 키로 메모리에 데이터를 저장하기 만하면됩니다. – ChoppyTheLumberjack

답변

0

groupBy을 Akka 스트림에 메모리에 의해 그룹화 된 키를 유지하지만, 같은 스트림 영역 항상 "지연" 그들에게는 역압이 있기 때문에 제한된 메모리에서 실행됩니다. 다운 스트림에서 새 요소를 허용하지 않으면 업스트림에서 새로운 요소가 생성되지 않습니다. 그래서 예를 들면

:

case class Record(id: Int) 
Source.fromIterator(() => 
    Iterator 
     .fill(1000)(Iterator(1,2).map { n => println("creating"); Record(n) }) 
     .flatten) 
    .groupBy(maxSubstreams = 2, _.id) 
    .map { r => println("Consuming"); r } 
    .fold(0)((acc, _) => acc + 1) 
    .mergeSubstreams 
    .runForeach(println) 

그들이 선행 두 사람 서브 각각이 아니라 모두 소비 될 수 Record 인스턴스가 빠른 속도로 생산하는 방법을 보여줍니다.