2014-04-23 2 views
4

구독을 직렬화하여 네트워크를 통해 보내려고합니다. 나는 스칼라를 사용하여,이 같은 일을 해요 :rx-java 직렬화 네트워크에서 관찰 가능

observable.materialize.subscribe{ n : Notification => sendToNetwork(n)} 

그러나, 나는군요 오류 :

java.io.NotSerializableException: rx.lang.scala.Notification$OnNext 

(정확하게하려면, 나는 Akka를 사용하고 알림을 보내려고 해요 그러나 나는이 문제가 그것보다 더 일반적이라고 생각한다.)

실제로 rx.lang.scala.Notification 동반자 객체의 내부 클래스입니다 Notification의 서브 클래스 인 OnNext 클래스를 직렬화하기를 거부 것으로 나타납니다

http://rxscala.github.io/scaladoc/#rx.lang.scala.Notification $$ OnNext

을 ... 그리고 자바 문서에서 어딘가에서 하나의 내부 비 정적 클래스를 직렬화 할 수 없다는 것을 알았습니다.

이 부분에 대한 올바른 이해가 있습니까? 그렇다면 rx-java의 클래스 계층 구조에 대한 제한 사항입니까? 아니면이 문제를 해결할 수있는 방법이 있습니까? Notification s를 직렬화 하시겠습니까?

답변

3

이 부분에 대한 올바른 이해가 있습니까?

외부 클래스가 직렬화 가능한 경우 비 정적 내부 클래스를 직렬화합니다. 하지만 Java와 Scala 모두에서 컴파일러에게 클래스가 직렬화 가능 (Serializable 확장)이라고 명시해야하며 rx-java Notification과 rx-scala OnNext은 직렬화 할 수 없습니다.

또는이 문제를 해결하고 알림을 serialize 할 수있는 방법이 있습니까?

Akka에서는 어떤 클래스에 대해서도 사용자 고유의 serializer를 쓸 수 있습니다 : http://doc.akka.io/docs/akka/snapshot/scala/serialization.html. Java 직렬화는 기본적으로 사용됩니다.

+0

감사를 (그냥 내 프로젝트를 확인) 곧 별도의 프로젝트로 이동합니다 - 나는 Akka 사용자 정의 직렬화 알고 있어요, 그것은이다 실제로 나는 결국 무엇을 끝냈다. 어쩌면 나는 다르게 질문을 했어야한다 : * 왜 * Notification은 Serializable을 구현하지 않는가? 할 일은 자연 스러울 것 같습니다. 어떤 생각? 추신. 나는 이것을 위해 github에 대한 요청을 제기했다 : https://github.com/Netflix/RxJava/issues/1078. 그들이 무엇을 말할지 궁금하다. – Luciano

+1

Serializable을 구현하는 것은 불행히도 쉬운 일이지만 대부분의 경우 잘못된 것입니다. We (Akka)는 Java 직렬화를 지원하는 실수를 저지 렸지만 앞으로는 기본 바인딩을 제거하여 모든 점에서 하위 파 성능을 제공하고 다른 언어와 런타임 사이의 취약한 시스템을 생성한다는 사실을 사람들이인지하게 할 수 있습니다 버전. –

1

Kontraktor-Reactive-Streams는 반응 스트림을 기반으로 한 킥 - 엉뚱한 리모팅 성능을 제공합니다. Kontraktor를 사용하지 않았지만 빠른 원격 처리를 제공하는 도구로 사용할 수 있습니다 (빠른 직렬화를 기반으로 함).

public static void remotingRxToRx() { 
    Observable<Integer> range = Observable.range(0, 50_000_000); 
    Publisher<Integer> pub = RxReactiveStreams.toPublisher(range); 

    KxReactiveStreams.get().asRxPublisher(pub) 
     .serve(new TCPNIOPublisher().port(3456)); 

    RateMeasure rm = new RateMeasure("events"); 

    KxPublisher<Integer> remoteStream = 
     KxReactiveStreams.get() 
      .connect(Integer.class, new TCPConnectable().host("localhost").port(3456)); 

    RxReactiveStreams.toObservable(remoteStream) 
     .forEach(i -> rm.count()); 
} 

현재 링크 : https://github.com/RuedigerMoeller/kontraktor/tree/trunk/modules/reactive-streams/src/examples/src/rxstreamserver

귀하의 회신

관련 문제