2014-03-31 4 views
6

버킷 파일에 스트림을 쓰려고하는 싱크를 만들려고합니다. 특정 조건 (시간, 파일 크기 등)에 도달하면 현재 출력 스트림이 닫힙니다. 새 버킷 파일이 열립니다.scalaz-stream의 버킷 싱크

io 개체에서 다른 싱크가 어떻게 생성되었는지 확인했지만 예제는 많지 않습니다. 그래서 나는 resourcechunkW이 어떻게 쓰여지는지 지켜 보았습니다. 다음 코드 비트로 끝났습니다. 단순화를 위해 버킷은 지금은 Int으로 표시되지만 결국에는 일부 출력 스트림이됩니다.

  1. step 시작에 한 번 통을 전달하고이 결코 재귀 동안 변경되지 :

    val buckets: Channel[Task, String, Int] = { 
    
        //recursion to step through the stream 
        def go(step: Task[String => Task[Int]]): Process[Task, String => Task[Int]] = { 
    
         // Emit the value and repeat 
         def next(msg: String => Task[Int]) = 
         Process.emit(msg) ++ 
          go(step) 
    
    
         Process.await[Task, String => Task[Int], String => Task[Int]](step)(
         next 
         , Process.halt // TODO ??? 
         , Process.halt) // TODO ??? 
        } 
    
        //starting bucket 
        val acquire: Task[Int] = Task.delay { 
         val startBuck = nextBucket(0) 
         println(s"opening bucket $startBuck") 
         startBuck 
        } 
    
        //the write step 
        def step(os: Int): Task[String => Task[Int]] = 
         Task.now((msg: String) => Task.delay { 
         write(os, msg) 
         val newBuck = nextBucket(os) 
         if (newBuck != os) { 
          println(s"closing bucket $os") 
          println(s"opening bucket $newBuck") 
         } 
         newBuck 
         }) 
    
        //start the Channel 
        Process.await(acquire)(
         buck => go(step(buck)) 
         , Process.halt, Process.halt) 
        } 
    
    def write(bucket: Int, msg: String) { println(s"$bucket\t$msg") } 
    def nextBucket(b: Int) = b+1 
    

    이이에 문제의 수입니다. 이전 작업에서 버킷 (Int)을 사용할 새 step 작업을 생성하는 재귀적인 방법이 무엇인지 잘 모르겠습니다. 이전 작업에서 얻으려는 String을 제공해야합니다.

  2. await 호출의 fallbackcleanuprcv (있는 경우)의 결과를받지 못합니다. io.resource 함수에서는 리소스가 고정되어 있기 때문에 정상적으로 작동하지만 내 경우에는 리소스가 어떤 단계에서 변경 될 수 있습니다. 현재 오픈 버킷에 대한 참조를이 콜백에 전달하려면 어떻게해야합니까?
+0

괜찮습니다. 그러는 동안 나는'resource'와 함께 사용할 수있는 자신 만의'BucketedWriter extends Writer '를 만들었지 만, (자바 API를 구현하는 것) 매우 중요합니다. – Mortimer

답변

0

옵션 중 하나 (예 : 시간)는 싱크대에서 간단한 go을 사용하는 것이 좋습니다. 다른 옵션에 대한

val metronome = Process.awakeEvery(1.hour).map(true) 


def writeFileSink(file:String):Sink[Task,ByteVector] = ??? 


def timeBasedSink(prefix:String) = { 
    def go(index:Int) : Sink[Task,ByteVector] = { 
    metronome.wye(write(prefix + "_" + index))(wye.interrupt) ++ go(index + 1) 
    } 

    go(0) 
} 

(즉 바이트 작성) 방금 쓴 바이트의 신호를 유지하고 싱크와 결합, 유사한 기술을 사용할 수 있습니다 :이 하나는 기본적으로 파일마다 하나의 시간을 재개, 기반 시간을 사용합니다.