2017-11-21 1 views
-4

처음에는 네트워크 작업을 주 스레드에서 호출해서는 안됩니다. 그게 내가 Schedulers.io()에 completables를 관찰하는 이유입니다!rxjava2 : 연결 완료 결과를 사용하여 IO 스레드에서 관찰하기

나는 두 개의 연결을 연결하려고합니다. 모두 completable 사용 네트워크, 왜 내가 Schedulers.io()에 가입 thats. concatWith (또는 andThen) 코드를 사용하는 경우 NetworkOnMainThreadException으로 실패합니다. 다음은 kotlin 코드입니다.

val singleSubject = SingleSubject.create<String>(); 
completalbe1.concatWith(completable2) 
     .subscribeOn(Schedulers.io()) 
     .observeOn(Schedulers.io()) 
     .subscribe({ 
      singleSubject.onSuccess("ok") 
     }, { error -> 
      Log.e(tag, error.message, error)//here i got exception 
      singleSubject.onError(error) 
     }) 
return singleSubject 

체인을 완료하지 않고 코드를 다시 작성하면 모두 정상입니다. 여기에 작업 코드가 있습니다 :

val singleSubject = SingleSubject.create<String>(); 
completable1 
     .subscribeOn(Schedulers.io()) 
     .observeOn(Schedulers.io()) 
     .subscribe({ 
      completable2 
       .subscribeOn(Schedulers.io()) 
       .observeOn(Schedulers.io()) 
       .subscribe({ 
        singleSubject.onSuccess("ok") 
       }, { error -> 
        Log.e(tag, error.message, error) 
        singleSubject.onError(error) 
       }) 
     }, {error -> 
      Log.e(tag, error.message, error) 
      singleSubject.onError(error) 
     }) 
return singleSubject 

왜 첫 번째 코드 조각은 작동하지 않지만 두 번째 코드는 무엇입니까?

UPD1는 :

 android.os.NetworkOnMainThreadException 
at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1273) 
at libcore.io.BlockGuardOs.recvfrom(BlockGuardOs.java:249) 
at libcore.io.IoBridge.recvfrom(IoBridge.java:549) 
at java.net.PlainSocketImpl.read(PlainSocketImpl.java:481) 
at java.net.PlainSocketImpl.access$000(PlainSocketImpl.java:37) 
at java.net.PlainSocketImpl$PlainSocketInputStream.read(PlainSocketImpl.java:237) 
at okio.Okio$2.read(Okio.java:139) 
at okio.AsyncTimeout$2.read(AsyncTimeout.java:237) 
at okio.RealBufferedSource.exhausted(RealBufferedSource.java:56) 
at okhttp3.internal.connection.RealConnection.isHealthy(RealConnection.java:498) 
at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:133) 
at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100) 
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) 
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) 
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) 
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) 
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) 
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) 
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) 
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) 
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120) 
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) 
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) 
at okhttp3.logging.HttpLoggingInterceptor.intercept(HttpLoggingInterceptor.java:211) 
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) 
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) 
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185) 
at okhttp3.RealCall.execute(RealCall.java:69) 
at retrofit2.OkHttpCall.execute(OkHttpCall.java:180) 
at com.jakewharton.retrofit2.adapter.rxjava2.CallObservable.subscribeActual(CallObservable.java:41) 
at io.reactivex.Observable.subscribe(Observable.java:10955) 
at com.jakewharton.retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34) 
at io.reactivex.Observable.subscribe(Observable.java:10955) 
at io.reactivex.internal.operators.observable.ObservableIgnoreElementsCompletable.subscribeActual(ObservableIgnoreElementsCompletable.java:31) 
at io.reactivex.Completable.subscribe(Completable.java:1664) 
at io.reactivex.internal.operators.completable.CompletableConcatArray$ConcatInnerObserver.next(CompletableConcatArray.java:89) 
at io.reactivex.internal.operators.completable.CompletableConcatArray$ConcatInnerObserver.onComplete(CompletableConcatArray.java:65) 
at io.reactivex.internal.operators.completable.CompletableCreate$Emitter.onComplete(CompletableCreate.java:64) 
at com.catalyst.opti.AppManager$transferImage$1$subscribe$1.onStateChanged(AppManager.kt:323) 
at com.amazonaws.mobileconnectors.s3.transferutility.TransferStatusUpdater$1.run(TransferStatusUpdater.java:172) 
at android.os.Handler.handleCallback(Handler.java:742) 
at android.os.Handler.dispatchMessage(Handler.java:95) 
at android.os.Looper.loop(Looper.java:154) 
at android.app.ActivityThread.main(ActivityThread.java:5527) 
at java.lang.reflect.Method.invoke(Native Method) 
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:739) 
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:629) 

UPD2 :

@POST("some") 
    fun verifyLocation(@Header(AUTH_TOKEN_HEADER) authToken: String, @Body 
verifyLocation: VerifyLocation): Completable 
:

private fun transferImage(imageName: String, image: File): Completable { 
    return Completable.create(object : CompletableOnSubscribe { 
     override fun subscribe(e: CompletableEmitter) { 
      val transferObserver = transferUtility.upload("some", imageName, image) 
      transferObserver.setTransferListener(object : TransferListener { 
       override fun onProgressChanged(id: Int, bytesCurrent: Long, bytesTotal: Long) { 
        Log.i(tag, "bytesCurrent: $bytesCurrent, bytesTotal: $bytesTotal") 
       } 

       override fun onStateChanged(id: Int, state: TransferState?) { 
        if (state == TransferState.COMPLETED) { 
         e.onComplete() 
        } 
       } 

       override fun onError(id: Int, ex: java.lang.Exception) { 
        Log.d(tag, "error transfer s3: ${ex.message}", ex) 
        e.onError(ex) 
       } 
      }) 
     } 
    }); 
} 

completable2이 retrofit2 호출입니다 :

completable1는 AWS S3에 기능을 업로드하는 파일입니다 여기에 스택 트레이스입니다

+1

어떤 코드 줄에서 예외가 발생합니까? 예외 스택 추적은 어디에 있습니까? * 예외 * –

+1

[NetworkOnMainThreadException] (https://stackoverflow.com/questions/5150637/networkonmainthreadexception)의 중복 가능성이 있음을 알리는 즉시이 정보를 제공해야합니다. – EJP

+0

예외가 발생했을 때 주석을 추가했으며 중복되지 않았습니다. 나는 Schedulers.io()를 사용한다! –

답변

1

나는 transferObserver.setTransferListener이 주 스레드에서 콜백을 호출하고 나서 주 스레드에서 completable2을 구독 할 것으로 추측합니다. 다른 예와 마찬가지로 subscribeOn(Schedulers.io())completable2에 적용해야합니다.

val singleSubject = SingleSubject.create<String>(); 
completalbe1.subscribeOn(Schedulers.io()) 
    .concatWith(completable2.subscribeOn(Schedulers.io())) // <----------------------- 
    .observeOn(Schedulers.io()) 
    .subscribe({ 
     singleSubject.onSuccess("ok") 
    }, { error -> 
     Log.e(tag, error.message, error)//here i got exception 
     singleSubject.onError(error) 
    }) 

return singleSubject 

subscribeOn 구독 (측면) 효과에 영향을하지만 메인 스레드에서 onComplete를 호출 할 때 completalbe1는 관측 효과가 있습니다.

+0

감사합니다. 평소와 같이 답변이 작동합니다! 그러나 concatWith 메소드에 대한 javadoc은 다음과 같이 말합니다. "반환 값 :이 등록한 다음 완료 할 수있는 새 완료 가능". 나는 이것이 내가 결과에 가입 할 수 있다는 것을 이해했다. BTW, 알고 계십니까, downvotes에 동의 할 수있는 옵션이 있습니까, 내가 너무 만료 된 사용자가 아닙니다. –

관련 문제