2016-09-27 4 views
2

RxJava 내부에 대해 David Karnok이 작성한 this 문서를 읽는 동안 RxJava의 SerializedObserver 클래스에 가까운 구현 예를 직면했습니다. 그래서 질문은 왜 내부 q 변수 첫 synchronized 블록에 도입 된 것입니다RxJava SerializedObserver 구현

class ValueListEmitterLoop<T> { 
    List<T> queue;       
    boolean emitting; 
    Consumer<? super T> consumer; 

    public void emit(T value) { 
     synchronized (this) { 
      if (emitting) { 
       List<T> q = queue; 
       if (q == null) { 
        q = new ArrayList<>(); 
        queue = q; 
       } 
       q.add(value); 
       return; 
      } 
      emitting = true; 
     } 
     consumer.accept(value);    
     for (;;) { 
      List<T> q; 
      synchronized (this) {   
       q = queue; 
       if (q == null) {    
        emitting = false; 
        return; 
       } 
       queue = null;    
      } 
      q.forEach(consumer);    
     }   
    } 
} 

: 여기에 코드? 나는 두 번째 synchronized 블록에 그 이유를 분명히 알 수 있습니다.

if (queue == null) { 
    queue = new ArrayList<>(); 
} 
queue.add(value); 

답변

2

나는 그것을 그들이 여러 번 사용되며, 일부 휘발성/동기화 접근이 근처에 특히 지역 변수로 필드를 읽을 수있는 좋은 방법을 찾을 : 난 그냥 사용하지에 실종 어떤 이유가있다. aq이 필드에 접근했다

volatile boolean cancelled; 

final Queue<T> queue; 

final Subscriber<? super T> actual; 

void drain() { 
    Subscriber<? super T> a = actual; 
    Queue<T> q = queue; 

    for (;;) { 
     if (cancelled) { 
      return; 
     } 

     T v = q.poll(); 

     if (v == null) { 
      a.onComplete(); 
      return; 
     } 

     a.onNext(v); 
    } 
} 

경우, 프로세서/JVM이 때문에 휘발성 액세스의 다시 메모리에서 모든 시간을 읽을해야합니다 : 예를 들어

는, 다음은 일반적인 패턴이다 cancelled 및 유사한 원자가가 poll()이다.

+0

고맙습니다! 나는 너의 대답을 기대했다. –