버킷 파일에 스트림을 쓰려고하는 싱크를 만들려고합니다. 특정 조건 (시간, 파일 크기 등)에 도달하면 현재 출력 스트림이 닫힙니다. 새 버킷 파일이 열립니다.scalaz-stream의 버킷 싱크
io
개체에서 다른 싱크가 어떻게 생성되었는지 확인했지만 예제는 많지 않습니다. 그래서 나는 resource
과 chunkW
이 어떻게 쓰여지는지 지켜 보았습니다. 다음 코드 비트로 끝났습니다. 단순화를 위해 버킷은 지금은 Int
으로 표시되지만 결국에는 일부 출력 스트림이됩니다.
step
시작에 한 번 통을 전달하고이 결코 재귀 동안 변경되지 :val buckets: Channel[Task, String, Int] = { //recursion to step through the stream def go(step: Task[String => Task[Int]]): Process[Task, String => Task[Int]] = { // Emit the value and repeat def next(msg: String => Task[Int]) = Process.emit(msg) ++ go(step) Process.await[Task, String => Task[Int], String => Task[Int]](step)( next , Process.halt // TODO ??? , Process.halt) // TODO ??? } //starting bucket val acquire: Task[Int] = Task.delay { val startBuck = nextBucket(0) println(s"opening bucket $startBuck") startBuck } //the write step def step(os: Int): Task[String => Task[Int]] = Task.now((msg: String) => Task.delay { write(os, msg) val newBuck = nextBucket(os) if (newBuck != os) { println(s"closing bucket $os") println(s"opening bucket $newBuck") } newBuck }) //start the Channel Process.await(acquire)( buck => go(step(buck)) , Process.halt, Process.halt) } def write(bucket: Int, msg: String) { println(s"$bucket\t$msg") } def nextBucket(b: Int) = b+1
이이에 문제의 수입니다. 이전 작업에서 버킷 (Int)을 사용할 새
step
작업을 생성하는 재귀적인 방법이 무엇인지 잘 모르겠습니다. 이전 작업에서 얻으려는 String을 제공해야합니다.await
호출의fallback
및cleanup
은rcv
(있는 경우)의 결과를받지 못합니다.io.resource
함수에서는 리소스가 고정되어 있기 때문에 정상적으로 작동하지만 내 경우에는 리소스가 어떤 단계에서 변경 될 수 있습니다. 현재 오픈 버킷에 대한 참조를이 콜백에 전달하려면 어떻게해야합니까?
괜찮습니다. 그러는 동안 나는'resource'와 함께 사용할 수있는 자신 만의'BucketedWriter extends Writer '를 만들었지 만, (자바 API를 구현하는 것) 매우 중요합니다. – Mortimer