2015-02-01 4 views
3

새 스레드에서 하나씩 처리해야하는 작업 목록이 있으며 결과가 일부 주 스레드에 의해 메서드에 표시되어야합니다. 그러나 이것이 작동하지 않는 것처럼 보일 경우 flatMap 메서드가 주 스레드에서 호출됩니다.RxJava 비동기 구독

subscribeOn 메서드가이 경우 "스레드 전환"을 처리하지 않는 이유는 무엇입니까?

다른 스레드에서 일부 작업을 수행하는 데 더 좋은 패턴은 무엇입니까?

List<Task> tasks = ...; 
Observable.from(tasks) 
      .flatMap(task -> { 
       // should be handled in a new thread 
       try { 
        return Observable.just(task.call()); 
       } catch (Exception e) { 
        log.error("Error", e); 
       } 

       return Observable.empty(); 
      }) 
      .subscribeOn(Schedulers.newThread()) 
      .observeOn(MySchedulers.main()) 
      .subscribe(this::show); // subscribe called from main thread 

답변

3

주의 할 (Observable.create를 사용하여 매우 장황 수동으로 새 스레드를 생성에서 제외) : 저는 자바 프로그래머 아니지만, C#을, 그래서 모든 이상한 낙타의 경우 메소드 이름이 나를 흥분하고 나를 혼란스럽게합니다.

flatMap 작업을위한 새 스레드를 원할 경우 subscribeOn이 잘못된 위치에 있습니다. fromflatMap 사이에 삽입하십시오. subscribeOnobserveOn에 대한 자세한 설명은 this answer을 참조하십시오. .NET 용이지만 원칙은 동일합니다.

나는 자바에서 작업에 익숙하지 않은, 그래서 당신의 Task는 .NET의 Task 같이하고 task.call() 여부를 비동기 적이며 자신의 스레드를 시작 있는지 확실하지 않습니다 - 당신은 "말했다 이후 귀하의 질문에서하지 같아요. .. a 새 스레드에서 하나씩 처리해야하는 작업 목록입니다. "

newThread 스케줄러는 가입자 당 새 스레드 를 사용합니다 - 즉 from 운영자의 스레드에서있을 것입니다 있지만 flatMap은 하나의 가입을하기 때문에, 모든 task.call 호출은 동일한 스레드에서 구별 될 것이다.

실제로 task.call이 비동기 인 경우 결과는 동시성을 가져 오며 Rx의 의미와는 독립적입니다.

어느 쪽이든, (올바르게 배치 된) observeOn은 결과가 메인 스레드의 this::show으로 전달되도록합니다.

+0

'Task'는 lukstei가 만든 커스텀 클래스입니다. 우리는 .NET 비동기 태스크와 같은 이름과 목적을 가진'Task' 클래스를 가지고 있지 않습니다. 그러나,'subscribeOn'을 움직이는 것이 옳습니다! –

+0

@AdamS 네, 맞습니다. 'Task'는 오류 처리와 관련하여 대답을 위해 Runnable 또는 Callable – lukstei

+0

@ james-world에 감사드립니다. 하나의 작업 만 실패 할 수 있습니다. 조용히 무시해야하지만, 내 솔루션과는 달리 onError + onErrorResumeNext를 사용하는 이점은 없습니다. – lukstei

관련 문제