변경할 큐에 반응하지 않는 관찰 가능한 I가 다음 코드다른 스레드
static void Main()
{
var holderQueue = new ConcurrentQueue<int>(GetInitialElements());
Action<ConcurrentQueue<int>> addToQueueAction = AddToQueue;
var observableQueue = holderQueue.ToObservable();
IScheduler newThreadScheduler = new NewThreadScheduler();
IObservable<Timestamped<int>> myQueueTimestamped = observableQueue.Timestamp();
var bufferedTimestampedQueue = myQueueTimestamped.Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3), newThreadScheduler);
var t = new TaskFactory();
t.StartNew(() => addToQueueAction(holderQueue));
using(bufferedTimestampedQueue.SubscribeOn(newThreadScheduler).Subscribe(currentQueue =>
{
Console.WriteLine("buffer time elapsed, current queue contents is: {0} items.", currentQueue.Count);
foreach(var item in currentQueue)
Console.WriteLine("item {0} at {1}", item.Value, item.Timestamp);
Console.WriteLine("holderqueue has: {0}", currentQueue.Count);
}))
{
Console.WriteLine("started observing queue");
Console.ReadLine();
}
}
private static void AddToQueue(ConcurrentQueue<int> concurrentQueue)
{
while(true)
{
var x = new Random().Next(1, 10);
concurrentQueue.Enqueue(x);
Console.WriteLine("added {0}", x);
Console.WriteLine("crtcount is: {0}", concurrentQueue.Count);
Thread.Sleep(1000);
}
}
private static IEnumerable<int> GetInitialElements()
{
var random = new Random();
var items = new List<int>();
for (int i = 0; i < 10; i++)
items.Add(random.Next(1, 10));
return items;
}
의도는 다음과 같다 :
holderQueue
오브젝트는 변경 몇 가지 요소 (GetInitialElements
)와 초기 채워 (메서드 AddToQueue
에 의해) 다른 요소가있는 다른 스레드에서 관찰 할 수있는이 변경을 감지하고 해당 구독 메서드에서 해당 메서드를 실행하여 시간이 경과 할 때 (따라서 각 3 초) 따라 대응해야합니다.
즉, 간단히 말해서 Subscribe
코드에 각각 3 초를 실행하고 다른 스레드에서 변경된 큐의 변경 내용을 보여 주면됩니다. 대신 Subscribe
본문은 한 번만 실행됩니다. 왜?
감사
이해
는 그래서 다시 코드를 볼 수 있습니다. 이 경우 대기열 자체가 말하는 것처럼 "관찰"될 수있는 방법이 없습니까? –
아니요; 그러나 실제로 원하는 것은 'Subject'입니다. 피사체에 대해 OnNext를 호출하여 Rx 파이프 라인에서 수신 한 다음 원하는대로 버퍼링 한 항목을 "대기열에 추가"할 수 있습니다.파이프 라인을 연결하기 전에 * 항목에 항목을 넣을 수있게하려면'ReplaySubject'를 사용하십시오.이 항목은 내부 버퍼에 항목을 기록한 다음 연결될 때 다시 관찰자에게 재생합니다. –
만약 당신이 ** ** 선택의 여지가 있지만'ConcurrentQueue'를 사용한다면, 커스텀 코드를 써서 새로운 아이템을위한 큐를 폴링하고 관찰 가능한 파이프 라인에 밀어 넣을 수 있습니다. 이것은 매우 비효율적이지만, 불행하게도 큐는 새로운 항목이 추가 될 때 코드에 알리기 위해 후크를 노출시키지 않습니다. –