2013-01-02 1 views
2

나는 두 개의 IDisposables을 순차적으로 배치해야합니다. 첫 번째 IDisposable은 두 번째 IDisposable에 의해 죽을 서비스에 의존하는 Rx 구독을 종료하므로 순서가 중요합니다. 이것은 IObservable의 구독이 다른 스레드에서 발생해야하지만 UI 스레드에서 관찰 및 처리가 발생해야하는 Windows Forms 응용 프로그램 내에 있습니다. 대략 (한 번 감소) 다음 내가 가지고있는 코드에서 (. 사실, 나는 폐기만큼 순서가 보장 될 때 UI 스레드에서 발생하는 경우 상관 없어) 그래서 :여러 IDisposables의 순차적 처분 보장

SomeService = new DisposableService(); 
Subscription = Foo(someService).SubscribeOn(NewThreadScheduler.Default).ObserveOn(theForm).Subscribe(...) 

을의 수에 UI 이벤트 이러한 두 가지를 순서대로 삭제해야합니다 (Subscription 및 SomeService). 나는 동일한 스레드에서 시리얼 폐기를 제공하기 위해 수신의 CompositeDisposableContextDisposable에 추가를 사용하여 시도했다 이렇게하려면 다음을 수행

_Disposable = new CompositeDisposable(new[] {      
    new ContextDisposable(WindowsFormsSynchronizationContext.Current, Subscription),      
    new ContextDisposable(WindowsFormsSynchronizationContext.Current, SomeService)}); 

위는 그러나 작동하지 않습니다. 제 로깅 _Disposable에 기초 SomeService 대한 ContextDisposable 동일한 스레드라고하지만 ContextDisposable 여전히 서비스에 배치되는 다른 스레드 concurent 발생하는 (따라서 경합 조건 NPEs 결과)이다.

나는 단지 주 동안의 C# 프로그래밍 봤는데 그래서 문제가 컨텍스트와 운영자가 어떻게 작동하는지 내 오해 확신합니다. 이 문제에 대한 올바른 접근법은 무엇입니까?

+0

두 스레드를 동일한 스레드에서 처리 할 수없는 이유가 있습니까? dispose 메소드와 다른 모든 메소드를 동기화하는 경우 직접 순서를 제어 할 수 있습니다. 하나의 쓰레드에서 소비하고 다른 쓰레드로 처리해야한다면, 모든 소비자가 쓰레드 세이프 (threadafe safe)를하고, 처리하기 전에 그 쓰레드가 폐기되었는지를 확인해야한다. – devshorts

답변

0

내가 오해하고있는 것이 아니라면 어떤 스레드가 무엇을 처리할지 제어 할 수 있습니다. 누가 어떤 스레드에서 구독해도 상관 없습니다. 이 예를 봐

internal class Program 
{ 
    private static void Main(string[] args) 
    { 
     ReactiveTest rx1 = null; 
     ReactiveTest rx2 = null; 

     var thread1 = new Thread(() => rx1 = new ReactiveTest()); 
     var thread2 = new Thread(() => rx2 = new ReactiveTest()); 

     thread1.Start(); 
     thread2.Start(); 

     Thread.Sleep(TimeSpan.FromSeconds(1)); 

     thread1.Join(); 
     thread2.Join(); 

     rx1.Dispose(); 
     rx2.Dispose(); 
    } 
} 

public class ReactiveTest : IDisposable 
{ 
    private IDisposable _timerObservable; 

    private object _lock = new object(); 

    public ReactiveTest() 
    { 
     _timerObservable = Observable.Interval(TimeSpan.FromMilliseconds(250)).Subscribe(i => 
      Console.WriteLine("[{0}] - {1}", Thread.CurrentThread.ManagedThreadId, i)); 
    } 

    public void Dispose() 
    { 
     lock (_lock) 
     { 
      _timerObservable.Dispose(); 
      Console.WriteLine("[{0}] - DISPOSING", Thread.CurrentThread.ManagedThreadId); 
     } 
    } 
} 

[14] - 0 
[7] - 0 
[15] - 1 
[7] - 1 
[14] - 2 
[15] - 2 
[10] - DISPOSING 
[10] - DISPOSING 

당신은 우리가 그 다음 세 번째에 배치 된 두 개의 별도 스레드에 가입 볼 수 있습니다를 출력합니다. 서브 스크립 션에서 일어날 필요가있는 스레드 세이프 (threadafe)가있는 경우에 대비하여 처분을 잠갔습니다. 이 예제에서는 실제로 불필요합니다.

+0

당신이 말하는 것은 의미가 있으며 코드가 예상대로 동작합니다. 스레드 구독이 처리되는 것과 관련하여 내 응용 프로그램에서 다른 동작을보고 있습니다. 다시 한 번 생각해 보면 제 문제는'Publish'와'RefCount'를 사용하는 것과 관련이 있다고 생각합니다. 내 처분은 단순히 참조 횟수를 줄이며 "진짜"처분은 다른 스레드에서 0이되는 횟수만큼 실행됩니다. 경쟁 조건은 'RefCount'가 다른 스레드에서 처리하기 때문에 발생합니다. 해고 중입니다. –

+0

그건 분명히 할거야! – devshorts

+0

결국'RefCount'는 범인이 아니 었습니다. 그것은'SubscribeOn'을 사용하는 것과 관련이 있지만 해결 방법을 찾지 못했습니다. 내가 직면하고있는 문제를 보여주는 작업 코드로 새로운 질문을 올렸다 : http://stackoverflow.com/questions/14131306/controlling-what-thread-an-rx-subscription-is-disposed-on-after-subscribedon- 있다 –

0

SubscribeOn 일정 SubscribeDispose 모두 호출. 따라서 서브 스크립 션 변수에 대해 Dispose을 호출하면 현재 실행중인 UI 스레드인지 여부에 관계없이 서브 스크립 션 예약시 처리가 NewThreadScheduler.Default이됩니다.

SubscribeOn은 거의 사용하지 않는 것이 좋습니다. 그러나 귀하의 경우에는 귀하가 문제의 50 %를 해결한다고 주장하고 있으며, 이는 내가 본 대부분의 것보다 50 % 이상 더 많습니다. 따라서 실제로 백그라운드 스레드에서 실행하기 위해 서브 스크립 션이 필요한지 여부를 질문해야합니다. 첫 번째 장소. 아주 새로운 스레드를 작성한 다음 메소드를 호출하는 경우, 모든 메소드가 네트워크 요청 전송 또는 파일 읽기와 같은 일부 비동기 작업을 시작하는 경우 UI 스레드에서 직접 메소드를 호출하는 것과 비교하면 비용이 많이 듭니다. 아마도 보낼 네트워크 메시지를 계산하는 데 시간이 너무 많이 걸리는 것으로 밝혀지면 SubscribeOn을 사용하는 것이 맞을 수 있습니다. 물론, 처분 일정을 원할 때만 가능합니다. 당신의 관찰에 가입 백그라운드 스레드에서 실행해야합니다, 아직 처리가 자유 스레드를 유지해야하는 경우

후 대신 (테스트되지 않은) 다음 연산자를 사용하는 것이 좋습니다.

public static class ObservableExtensions 
{ 
    public static IObservable<TSource> SubscribeOn<TSource>(
    this IObservable<TSource> source, 
    bool doNotScheduleDisposal, 
    IScheduler scheduler) 
    { 
    if (!doNotScheduleDisposal) 
    { 
     return source.SubscribeOn(scheduler); 
    } 

    return Observable.Create<TSource>(observer => 
     { 
     // Implementation is based on that of the native SubscribeOn operator in Rx 
     var s = new SingleAssignmentDisposable(); 
     var d = new SerialDisposable(); 
     d.Disposable = s; 
     s.Disposable = scheduler.Schedule(() => 
     { 
      d.Disposable = source.SubscribeSafe(observer); 
     }); 
     return d; 
     }); 
    } 
}