2

변경할 큐에 반응하지 않는 관찰 가능한 I가 다음 코드다른 스레드

static void Main() 
    { 
     var holderQueue = new ConcurrentQueue<int>(GetInitialElements()); 

     Action<ConcurrentQueue<int>> addToQueueAction = AddToQueue; 
     var observableQueue = holderQueue.ToObservable(); 
     IScheduler newThreadScheduler = new NewThreadScheduler(); 

     IObservable<Timestamped<int>> myQueueTimestamped = observableQueue.Timestamp(); 

     var bufferedTimestampedQueue = myQueueTimestamped.Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3), newThreadScheduler); 

     var t = new TaskFactory(); 
     t.StartNew(() => addToQueueAction(holderQueue)); 

     using(bufferedTimestampedQueue.SubscribeOn(newThreadScheduler).Subscribe(currentQueue => 
     { 
      Console.WriteLine("buffer time elapsed, current queue contents is: {0} items.", currentQueue.Count); 
      foreach(var item in currentQueue) 
       Console.WriteLine("item {0} at {1}", item.Value, item.Timestamp); 

      Console.WriteLine("holderqueue has: {0}", currentQueue.Count); 
     })) 
     { 
      Console.WriteLine("started observing queue"); 

      Console.ReadLine(); 
     } 
    } 

    private static void AddToQueue(ConcurrentQueue<int> concurrentQueue) 
    { 
     while(true) 
     { 
      var x = new Random().Next(1, 10); 
      concurrentQueue.Enqueue(x); 
      Console.WriteLine("added {0}", x); 
      Console.WriteLine("crtcount is: {0}", concurrentQueue.Count); 
      Thread.Sleep(1000); 
     } 
    } 

    private static IEnumerable<int> GetInitialElements() 
    { 
     var random = new Random(); 
     var items = new List<int>(); 
     for (int i = 0; i < 10; i++) 
      items.Add(random.Next(1, 10)); 

     return items; 
    } 

의도는 다음과 같다 :

holderQueue 오브젝트는 변경 몇 가지 요소 (GetInitialElements)와 초기 채워 (메서드 AddToQueue에 의해) 다른 요소가있는 다른 스레드에서 관찰 할 수있는이 변경을 감지하고 해당 구독 메서드에서 해당 메서드를 실행하여 시간이 경과 할 때 (따라서 각 3 초) 따라 대응해야합니다.

즉, 간단히 말해서 Subscribe 코드에 각각 3 초를 실행하고 다른 스레드에서 변경된 큐의 변경 내용을 보여 주면됩니다. 대신 Subscribe 본문은 한 번만 실행됩니다. 왜?

감사

답변

4

ToObservable 방법은 IEnumerable<T> 소요 관찰로 변환한다. 결과적으로 동시 대기열을 사용하고 즉시 열거하여 사용 가능한 모든 항목을 실행합니다. 나중에 항목을 추가하기 위해 큐를 수정해도 동시 큐의 GetEnumerator() 구현에서 반환 된 이미 열거 된 IEnumerable<T>에는 영향을주지 않습니다.

+0

이해

는 그래서 다시 코드를 볼 수 있습니다. 이 경우 대기열 자체가 말하는 것처럼 "관찰"될 수있는 방법이 없습니까? –

+0

아니요; 그러나 실제로 원하는 것은 'Subject'입니다. 피사체에 대해 OnNext를 호출하여 Rx ​​파이프 라인에서 수신 한 다음 원하는대로 버퍼링 한 항목을 "대기열에 추가"할 수 있습니다.파이프 라인을 연결하기 전에 * 항목에 항목을 넣을 수있게하려면'ReplaySubject'를 사용하십시오.이 항목은 내부 버퍼에 항목을 기록한 다음 연결될 때 다시 관찰자에게 재생합니다. –

+0

만약 당신이 ** ** 선택의 여지가 있지만'ConcurrentQueue'를 사용한다면, 커스텀 코드를 써서 새로운 아이템을위한 큐를 폴링하고 관찰 가능한 파이프 라인에 밀어 넣을 수 있습니다. 이것은 매우 비효율적이지만, 불행하게도 큐는 새로운 항목이 추가 될 때 코드에 알리기 위해 후크를 노출시키지 않습니다. –

1

David Pfeffer의 답변에 따르면 .ToObserverable()을 사용하면 필요한 것을 얻을 수 없습니다.

난 당신의 코드를 볼 때

그러나, 나는 몇 가지 참조 : 당신은 ConcurrentQueue<T>을 사용하는

  1. 당신은 당신이 작업
  2. 를 통해 큐에 추가됩니다 NewThreadScheduler
  3. 를 사용을

몇 가지 사항 만 변경하면 여기에서 할 일을 달성 할 수 있다고 생각합니다. 먼저 나는 실제로 당신이 BlockingCollection<T>을 찾고 있다고 생각합니다. 가능성은 희박하지만 스레드 안전 큐처럼 작동하도록 할 수 있습니다.

다음은 이미 스레드를 NewThreadScheduler으로 처리하는 데 헌신하고 있습니다. 대기열에서 폴링/풀링을 수행하지 않으시겠습니까?

마지막으로 BlockingCollection<T>.GetConsumingEnumerable(CancellationToken) 메서드를 사용하면 실제로 돌아가서 .ToObservable() 메서드를 사용할 수 있습니다!

static void Main() 
{ 
    //The processing thread. I try to set the the thread name as these tend to be long lived. This helps logs and debugging. 
    IScheduler newThreadScheduler = new NewThreadScheduler(ts=>{ 
     var t = new Thread(ts); 
     t.Name = "QueueReader"; 
     t.IsBackground = true; 
     return t; 
    }); 

    //Provide the ability to cancel our work 
    var cts = new CancellationTokenSource(); 

    //Use a BlockingCollection<T> instead of a ConcurrentQueue<T> 
    var holderQueue = new BlockingCollection<int>(); 
    foreach (var element in GetInitialElements()) 
    { 
     holderQueue.Add(element); 
    } 

    //The Action that periodically adds items to the queue. Now has cancellation support 
    Action<BlockingCollection<int>,CancellationToken> addToQueueAction = AddToQueue; 
    var tf = new TaskFactory(); 
    tf.StartNew(() => addToQueueAction(holderQueue, cts.Token)); 

    //Get a consuming enumerable. MoveNext on this will remove the item from the BlockingCollection<T> effectively making it a queue. 
    // Calling MoveNext on an empty queue will block until cancelled or an item is added. 
    var consumingEnumerable = holderQueue.GetConsumingEnumerable(cts.Token); 

    //Now we can make this Observable, as the underlying IEnumerbale<T> is a blocking consumer. 
    // Run on the QueueReader/newThreadScheduler thread. 
    // Use CancelationToken instead of IDisposable for single method of cancellation. 
    consumingEnumerable.ToObservable(newThreadScheduler) 
     .Timestamp() 
     .Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3), newThreadScheduler) 
     .Subscribe(buffer => 
      { 
       Console.WriteLine("buffer time elapsed, current queue contents is: {0} items.", buffer.Count); 
       foreach(var item in buffer) 
        Console.WriteLine("item {0} at {1}", item.Value, item.Timestamp); 

       Console.WriteLine("holderqueue has: {0}", holderQueue.Count); 
      }, 
      cts.Token); 


    Console.WriteLine("started observing queue"); 

    //Run until [Enter] is pressed by user. 
    Console.ReadLine(); 

    //Cancel the production of values, the wait on the consuming enumerable and the subscription. 
    cts.Cancel(); 
    Console.WriteLine("Cancelled"); 
} 

private static void AddToQueue(BlockingCollection<int> input, CancellationToken cancellationToken) 
{ 
    while(!cancellationToken.IsCancellationRequested) 
    { 
     var x = new Random().Next(1, 10); 
     input.Add(x); 
     Console.WriteLine("added '{0}'. Count={1}", x, input.Count); 
     Thread.Sleep(1000); 
    } 
} 

private static IEnumerable<int> GetInitialElements() 
{ 
    var random = new Random(); 
    var items = new List<int>(); 
    for (int i = 0; i < 10; i++) 
     items.Add(random.Next(1, 10)); 

    return items; 
} 

가 지금은 당신이 기대했던 결과를 얻을 것이라고 생각 :

added '9'. Count=11 
started observing queue 
added '4'. Count=1 
added '8'. Count=1 
added '3'. Count=1 
buffer time elapsed, current queue contents is: 14 items. 
item 9 at 25/01/2015 22:25:35 +00:00 
item 5 at 25/01/2015 22:25:35 +00:00 
item 5 at 25/01/2015 22:25:35 +00:00 
item 9 at 25/01/2015 22:25:35 +00:00 
item 7 at 25/01/2015 22:25:35 +00:00 
item 6 at 25/01/2015 22:25:35 +00:00 
item 2 at 25/01/2015 22:25:35 +00:00 
item 2 at 25/01/2015 22:25:35 +00:00 
item 9 at 25/01/2015 22:25:35 +00:00 
item 3 at 25/01/2015 22:25:35 +00:00 
item 9 at 25/01/2015 22:25:35 +00:00 
item 4 at 25/01/2015 22:25:36 +00:00 
item 8 at 25/01/2015 22:25:37 +00:00 
item 3 at 25/01/2015 22:25:38 +00:00 
holderqueue has: 0 
added '7'. Count=1 
added '2'. Count=1 
added '5'. Count=1 
buffer time elapsed, current queue contents is: 3 items. 
item 7 at 25/01/2015 22:25:39 +00:00 
item 2 at 25/01/2015 22:25:40 +00:00 
item 5 at 25/01/2015 22:25:41 +00:00 
holderqueue has: 0 
Cancelled