2014-11-18 4 views
2

Java 8 Stream API을 알게되었는데 스트림 소비자에게 스트림이 완료되었음을 알리는 방법을 모르겠습니다.(RxJava) Java 8 상당 관찰 가능 # onComplete()

필자의 경우 스트림 파이프 라인의 결과는 데이터베이스에 일괄 적으로 쓰거나 메시징 서비스에 게시됩니다. 그러한 경우, 스트림 파이프 라인은 일단 스트림이 닫히면 (자), 엔드 포인트를 「플래시」및 「닫는」메소드를 호출 할 필요가 있습니다.

나는 RxJava에서 구현이 목적을 위해이 사용되는 Observer#onComplete 방법을 기억으로 관찰 가능한 패턴에 노출 조금 있었다.

한편 Java 8 Consumeraccept 메서드 만 노출하지만이를 "닫기"할 수는 없습니다. 라이브러리를 파헤 쳤는데 메서드를 제공하는 Sink이라는 Consumer이라는 하위 인터페이스를 발견했으나 공개되지 않았습니다. 마지막으로 스트림의 가장 융통성있는 소비자 인 것으로 보이는 Collector을 구현할 생각 이었지만 더 간단한 옵션은 없었습니까?

+2

물론 있습니다. 단말 동작에 대한 호출, 예를 들면. 'Stream.forAll (Consumer)'가 반환되면 작업이 완료됩니다. 그 직후에 완전한 성명서를 일반 성명서로 작성하십시오. – Holger

+0

장난감 "프로세서"응용 프로그램을 사용하여 사례를 설명하는 프로젝트를 게시했습니다. https://github.com/spaceCamel/reactive-stream –

답변

3

최종 작업을 수행하는 가장 간단한 방법은 것입니다 try 블록 내부의 터미널 operatoni을 수행 할 수 있습니다 예를 들어, 오른쪽 스트림의 단말 동작 후에 해당 문장을 걸기

IntStream.range(0, 100).parallel().forEach(System.out::println); 
System.out.println("done"); 

이 조작

가 성공적인 경우에 수행 될 것이다 만, 적절한 곳 커밋된다. Consumer이 지정되지 않은 순서로 동시에 실행되는 동안 모든 사람이 정상적으로 복귀해도 작업을 수행했다는 것을 보장합니다.

예외적 인 경우에도 수행되는 연산을 정의하는 것은 쉽지 않습니다. 다음 예를 살펴보십시오.

try(IntStream is=IntStream.range(0, 100).onClose(()->System.out.println("done"))) { 
    is.parallel().forEach(System.out::println); 
} 

이것은 첫 번째와 동일하지만 예외적 인 경우 (예 :

try(IntStream is=IntStream.range(0, 100).onClose(()->System.out.println("done"))) { 
    is.parallel().forEach(x -> { 
     System.out.println(x); 
     if(Math.random()>0.7) throw new RuntimeException(); 
    }); 
} 

당신은done 후 번호 의 출력물가 발생할 수 있습니다. 이는 예외적 인 경우 모든 종류의 정리에 적용됩니다. 예외를 catch하거나 finally 블록을 처리하면 여전히 비동기 작업이 실행 중일 수 있습니다. 예외적 인 경우 데이터가 불완전하기 때문에 예외적 인 경우 트랜잭션을 롤백하는 데 아무런 문제가 없지만 지금 롤백 된 리소스에 항목을 쓰려는 시도를 계속해야합니다.

생각한 Collector 기반 솔루션은 성공적인 완료를위한 완료 작업 만 정의 할 수 있습니다. 그래서 이것들은 첫 번째 예와 같습니다. 터미널 작동 후 완성 문을 놓는 것이 Collector의 더 간단한 대안입니다.당신이 모두를 구현하는 작업을 정의하려면


는 항목 처리 및 정리 단계, 당신은 그것을 위해 자신의 인터페이스를 만들고 도우미 메서드에 필요한 Stream 설정을 캡슐화 할 수 있습니다. 여기가 같이하는 방법입니다 :

조작 인터페이스 :

interface IoOperation<T> { 
    void accept(T item) throws IOException; 
    /** Called after successfull completion of <em>all</em> items */ 
    default void commit() throws IOException {} 
    /** 
    * Called on failure, for parallel streams it must set the consume() 
    * method into a silent state or handle concurrent invocations in 
    * some other way 
    */ 
    default void rollback() throws IOException {} 
} 

도우미 메소드 구현 : 오류 처리없이 사용

public static <T> void processAllAtems(Stream<T> s, IoOperation<? super T> c) 
throws IOException { 
    Consumer<IoOperation> rollback=io(IoOperation::rollback); 
    AtomicBoolean success=new AtomicBoolean(); 
    try(Stream<T> s0=s.onClose(() -> { if(!success.get()) rollback.accept(c); })) { 
     s0.forEach(io(c)); 
     c.commit(); 
     success.set(true); 
    } 
    catch(UncheckedIOException ex) { throw ex.getCause(); } 
} 
private static <T> Consumer<T> io(IoOperation<T> c) { 
    return item -> { 
     try { c.accept(item); } 
     catch (IOException ex) { throw new UncheckedIOException(ex); } 
    }; 
} 

가있을 수 있습니다 쉽게로

class PrintNumbers implements IoOperation<Integer> { 
    public void accept(Integer i) { 
     System.out.println(i); 
    } 
    @Override 
    public void commit() { 
     System.out.println("done."); 
    } 
} 
processAllAtems(IntStream.range(0, 100).parallel().boxed(), new PrintNumbers()); 

오류 처리는 가능하지만 여기에서 동시성을 처리해야합니다. 다음 예제는 number를 출력하지만 끝에 닫혀 야하는 새로운 출력 스트림을 사용하므로 예외적으로 동시 호출은 동시에 닫힌 스트림을 처리해야합니다.

class WriteNumbers implements IoOperation<Integer> { 
    private Writer target; 
    WriteNumbers(Writer writer) { 
     target=writer; 
    } 
    public void accept(Integer i) throws IOException { 
     try { 
      final Writer writer = target; 
      if(writer!=null) writer.append(i+"\n"); 
      //if(Math.random()>0.9) throw new IOException("test trigger"); 
     } catch (IOException ex) { 
      if(target!=null) throw ex; 
     } 
    } 
    @Override 
    public void commit() throws IOException { 
     target.append("done.\n").close(); 
    } 
    @Override 
    public void rollback() throws IOException { 
     System.err.print("rollback"); 
     Writer writer = target; 
     target=null; 
     writer.close(); 
    } 
} 
FileOutputStream fos = new FileOutputStream(FileDescriptor.out); 
FileChannel fch = fos.getChannel(); 
Writer closableStdIO=new OutputStreamWriter(fos); 
try { 
    processAllAtems(IntStream.range(0, 100).parallel().boxed(), 
        new WriteNumbers(closableStdIO)); 
} finally { 
    if(fch.isOpen()) throw new AssertionError(); 
} 
+0

하지만 _ 최종 작업 _은 소비자에 따라 달라질 수 있습니다. ** 스트림의 소비자 외부에 배치하면 커플 링이 증가하고 응집력이 감소합니다 **. 테스트 각도에서 이것을 보면 _final operation_이 호출되었는지 확인하기 위해 어떻게 소비자를 조롱 할 수 있을까? 예외적 인 경우를 다루는 것이 쉽지 않다는 사실은 스트림 객체가 모든 스레드가 반환 된 시간을 알 수 있고 _final 연산 _을 호출 할 수 있기 때문에 요점을 확인하는 것으로 보입니다. 이것이 맞을지도 모르겠다. 바로 JDK-only를 고수하면 컬렉터보다 더 간단 할 것이다. –

+0

'forEach' 호출과 후속 구문을 최종 동작을하는 확장 된'Consumer' 인터페이스를받는 도우미 메서드로 래핑 할 수 있습니다. – Holger

+0

@Holdger 이제 구현 만 보입니다. GitHub 프로젝트에 포함시키지 않아서 죄송합니다 (질문에 대한 내 의견 참조). 나는 나중에 그것을하려고 노력할 것이다. –

0

Java 8 스트림 (예 : collect(), forEach() 등)의 터미널 작업은 항상 스트림을 완료합니다.

스트림에서 오브젝트를 처리하는 것이 있으면 콜렉터가 리턴 될 때 스트림이 종료 될 때 알 수 있습니다. 당신은 당신의 프로세서를 종료해야하는 경우

, 당신은 시도 -과 - 자원에 싸서

try(BatchWriter writer = new ....){ 

     MyStream.forEach(o-> writer.write(o)); 

}//autoclose writer 
+1

주의하십시오. 터미널 운영에 대한 보증은 정상적인 반환 및 그 결과가있을 경우에만 해당됩니다. 예외가 발생하면 다른 스레드에서 실행중인 코드가 여전히있는 동안 전달 될 수 있습니다. 즉, 리소스가있는'try'가 일부 'Consumer'에 의해 여전히 액세스되는 리소스를 닫을 수 있습니다. 이 경우 Writer의 경우 중요한 것은 아니지만 데이터가 불완전 할 수 있습니다. 그러나 'Consumer'내에 이미 폐쇄 된 스트림에 대한 준비가되어 있어야합니다. – Holger

0

Stream # onClose (Runnable)을 사용하면 스트림이 닫힐 때 호출 할 콜백을 지정할 수 있습니다. 스트림은 푸시 기반 rx.Observable과는 달리 푸시 기반이므로 소비자가 아닌 스트림과 연결됩니다.

관련 문제