최종 작업을 수행하는 가장 간단한 방법은 것입니다 try 블록 내부의 터미널 operatoni을 수행 할 수 있습니다 예를 들어, 오른쪽 스트림의 단말 동작 후에 해당 문장을 걸기
IntStream.range(0, 100).parallel().forEach(System.out::println);
System.out.println("done");
이 조작
가 성공적인 경우에 수행 될 것이다 만, 적절한 곳 커밋된다.
Consumer
이 지정되지 않은 순서로 동시에 실행되는 동안 모든 사람이 정상적으로 복귀해도 작업을 수행했다는 것을 보장합니다.
예외적 인 경우에도 수행되는 연산을 정의하는 것은 쉽지 않습니다. 다음 예를 살펴보십시오.
try(IntStream is=IntStream.range(0, 100).onClose(()->System.out.println("done"))) {
is.parallel().forEach(System.out::println);
}
이것은 첫 번째와 동일하지만 예외적 인 경우 (예 :
try(IntStream is=IntStream.range(0, 100).onClose(()->System.out.println("done"))) {
is.parallel().forEach(x -> {
System.out.println(x);
if(Math.random()>0.7) throw new RuntimeException();
});
}
당신은done
후 번호 의 출력물가 발생할 수 있습니다. 이는 예외적 인 경우 모든 종류의 정리에 적용됩니다. 예외를 catch하거나 finally
블록을 처리하면 여전히 비동기 작업이 실행 중일 수 있습니다. 예외적 인 경우 데이터가 불완전하기 때문에 예외적 인 경우 트랜잭션을 롤백하는 데 아무런 문제가 없지만 지금 롤백 된 리소스에 항목을 쓰려는 시도를 계속해야합니다.
생각한 Collector
기반 솔루션은 성공적인 완료를위한 완료 작업 만 정의 할 수 있습니다. 그래서 이것들은 첫 번째 예와 같습니다. 터미널 작동 후 완성 문을 놓는 것이 Collector
의 더 간단한 대안입니다.당신이 모두를 구현하는 작업을 정의하려면
는 항목 처리 및 정리 단계, 당신은 그것을 위해 자신의 인터페이스를 만들고 도우미 메서드에 필요한 Stream
설정을 캡슐화 할 수 있습니다. 여기가 같이하는 방법입니다 :
조작 인터페이스 :
interface IoOperation<T> {
void accept(T item) throws IOException;
/** Called after successfull completion of <em>all</em> items */
default void commit() throws IOException {}
/**
* Called on failure, for parallel streams it must set the consume()
* method into a silent state or handle concurrent invocations in
* some other way
*/
default void rollback() throws IOException {}
}
도우미 메소드 구현 : 오류 처리없이 사용
public static <T> void processAllAtems(Stream<T> s, IoOperation<? super T> c)
throws IOException {
Consumer<IoOperation> rollback=io(IoOperation::rollback);
AtomicBoolean success=new AtomicBoolean();
try(Stream<T> s0=s.onClose(() -> { if(!success.get()) rollback.accept(c); })) {
s0.forEach(io(c));
c.commit();
success.set(true);
}
catch(UncheckedIOException ex) { throw ex.getCause(); }
}
private static <T> Consumer<T> io(IoOperation<T> c) {
return item -> {
try { c.accept(item); }
catch (IOException ex) { throw new UncheckedIOException(ex); }
};
}
가있을 수 있습니다 쉽게로
class PrintNumbers implements IoOperation<Integer> {
public void accept(Integer i) {
System.out.println(i);
}
@Override
public void commit() {
System.out.println("done.");
}
}
processAllAtems(IntStream.range(0, 100).parallel().boxed(), new PrintNumbers());
오류 처리는 가능하지만 여기에서 동시성을 처리해야합니다. 다음 예제는 number를 출력하지만 끝에 닫혀 야하는 새로운 출력 스트림을 사용하므로 예외적으로 동시 호출은 동시에 닫힌 스트림을 처리해야합니다.
class WriteNumbers implements IoOperation<Integer> {
private Writer target;
WriteNumbers(Writer writer) {
target=writer;
}
public void accept(Integer i) throws IOException {
try {
final Writer writer = target;
if(writer!=null) writer.append(i+"\n");
//if(Math.random()>0.9) throw new IOException("test trigger");
} catch (IOException ex) {
if(target!=null) throw ex;
}
}
@Override
public void commit() throws IOException {
target.append("done.\n").close();
}
@Override
public void rollback() throws IOException {
System.err.print("rollback");
Writer writer = target;
target=null;
writer.close();
}
}
FileOutputStream fos = new FileOutputStream(FileDescriptor.out);
FileChannel fch = fos.getChannel();
Writer closableStdIO=new OutputStreamWriter(fos);
try {
processAllAtems(IntStream.range(0, 100).parallel().boxed(),
new WriteNumbers(closableStdIO));
} finally {
if(fch.isOpen()) throw new AssertionError();
}
물론 있습니다. 단말 동작에 대한 호출, 예를 들면. 'Stream.forAll (Consumer)'가 반환되면 작업이 완료됩니다. 그 직후에 완전한 성명서를 일반 성명서로 작성하십시오. – Holger
장난감 "프로세서"응용 프로그램을 사용하여 사례를 설명하는 프로젝트를 게시했습니다. https://github.com/spaceCamel/reactive-stream –