2016-11-17 4 views
2

저는 Akka Streams 프레임 워크의 초보자이며 Akka Stream의 목표에 대한 문제를 해결할 수있는 가능성을 알고 싶습니다. 여기에 문제가 있다고 상상해보십시오.Akka Streams - 일부 술어를 기반으로 들어오는 소스 데이터를 분할합니다.

  1. 몇 가지 큰 iteratee 소스가 있습니다. 아마도 큰 데이터를 가진 3-4 큰 파일 같은 데이터;
  2. 각 파일을 처리해야합니다. 구문 분석, 변환 된 평균값 계산;
  3. 파일의 데이터는 일부 조건 자로 분할 된 다음 파티션별로 처리해야합니다. 파티션 조건부는 실행마다 동적 일 수 있습니다.
  4. 모든 파티션은 다른 파일이나 스트림에 저장해야합니다.

Akka Streams로 해결할 수 있습니까?

답변

0

빅터가 말했듯이, 파티션이 당신을 위해 이것을 할 것입니다. Akka 단위 테스트에서 예제를 발견했습니다.

val (s1, s2, s3) = RunnableGraph.fromGraph(GraphDSL.create(Sink.seq[Int], Sink.seq[Int], Sink.seq[Int])(Tuple3.apply) { implicit b ⇒ (sink1, sink2, sink3) ⇒ 
     val partition = b.add(Partition[Int](3, { 
      case g if (g > 3) ⇒ 0 
      case l if (l < 3) ⇒ 1 
      case e if (e == 3) ⇒ 2 
     })) 
     Source(List(1, 2, 3, 4, 5)) ~> partition.in 
     partition.out(0) ~> sink1.in 
     partition.out(1) ~> sink2.in 
     partition.out(2) ~> sink3.in 
     ClosedShape 
     }).run() 
관련 문제