2016-10-09 1 views
0

매개 변수 ObjectInputStream을 사용하고 소켓의 데이터를 관찰하는 작은 코드를 작성했습니다. readObject() 함수가 "null"을 반환하고 observeSocket (ObjectInputStream in) 함수가 Object 만 사용하므로 구독자가 onError() 함수를 실행하고 프로그램을 종료하면 문제가 발생합니다.Rxjava에서 수신 거부 할 때까지 소켓을 관찰하는 방법

하지만 내가 필요로하는 것은 Object에 대한 소켓 관찰을 계속하고 Observer가 함수가 기능을 종료해야하는 경우에만 Object가 소켓 위에서 관찰되는 경우에만 반환하는 것입니다. 필요한 기능을 수행하기 위해 코드를 어떻게 수정할 수 있습니까? 배압 인식하고 계약을 준수 Observables은 때문에 Observable.create(OnSubscribe)을 사용

public Observable<Object> observeSocket(ObjectInputStream in){ 
    return Observable.create(subscriber -> { 
     while(!subscriber.isUnsubscribed()) { 
      subscriber.onNext(getData(in)); 
     } 

     subscriber.onCompleted(); 

    }); 
} 

public Object getData(ObjectInputStream in){ 

    Object streamData = null; 

    try{ 

     streamData = in.readObject(); 
    } 

    catch(IOException e){ 
     //e.printStackTrace(); 
    } 

    catch(ClassNotFoundException e){ 
     e.printStackTrace(); 
    } 

    return streamData; 

} 

답변

2

피가 까다로운 사업이다. Observable.create(SyncOnSubscribe)을 사용하기위한 좋은 후보자입니다.

ObjectInputStream ois = ...; 

Observable<Object> objects = 
    Observable.create(
    SyncOnSubscribe.createStateless(observer -> { 
     try { 
      Object value = ois.readObject(); 
      // you decide how end of file is indicated 
      // a common strategy is to write a null object 
      // to the end of the Object stream. 
      if (value == END_OF_FILE) { 
       observer.onCompleted(); 
      } else { 
       observer.onNext(value); 
      } 
     } catch (Exception e) { 
      observer.onError(e); 
     } 
    }));   
+0

이 비용은 컴퓨터에서 테스트 했습니까? SyncOnSubscribe 일반 서명이 Action 반환 값과 다른 경우 컴파일 오류와 구문 오류가 발생합니다. – user64287

+0

createStateless는 Action1() 및 observer.onNext 만 사용하거나 다른 함수는 구현할 수 없습니다. – user64287

+1

Observable.create를 넣는 것을 잊어 버렸습니다. rxjava를 사용하여 나에게 잘 컴파일합니다. 1.2.1 –