특정 시점에 실행될 작업과 관련된 항목을 비 차단 방식으로 사용하는 큐를 작성해야합니다. 즉, 항목과 관련된 모든 작업은 별도의 스레드에서 실행되어야합니다.System.Reactive Schedulers DueTime scheduling
며칠 전 System.Reactive.Concurrency.IScheduler
인터페이스와 스케줄러가있어서 System.Reactive
에 내장되어 있습니다. 무엇이 특별히 관심을 끌었는지 Schedule<TState>(TState, TimeSpan, Func<IScheduler, TState, IDisposable>)
은 Schedule()
메서드의 오버로드입니다.
이 과부하는 - Scheduler.ThreadPoolScheduler
, Scheduler.TaskPoolScheduler
또는 NewThreadScheduler
과 함께 사용하면됩니다.
그래서 나는이 포스트의 하단에있는 스케줄러를 비교하기 위해 좀 더 빠르고 프로그램을 썼다. 아쉽게도 모든 스케줄러에 대해 일관성 검사가 실패합니다. 즉, 나중에 실행되도록 예약 된 일부 항목이 이전에 실행될 항목보다 먼저 실행되었음을 의미합니다.
이것은 스케줄러의 성격 때문입니까? (새 스레드/작업을 시작하면 작업이 망가집니다.)?
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Concurrency;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace RxDueTimeScheduling
{
public class ScheduledItem
{
public TimeSpan Delay;
public DateTime EnqueueTime;
public DateTime DueTime;
public double Fault;
public double ExecutionTime;
}
class Program
{
private static AutoResetEvent mWait = new AutoResetEvent(false);
private static long mEnqueueingFinishedAfter;
private static int mMinDelay = 10;
private static int mMaxDelay = 500;
private static int mMessagesPerTest = 50;
private static List<ScheduledItem> mResultList;
private static Stopwatch mTestStopWatch = new Stopwatch();
private static Stopwatch mExecutionStopWatch = new Stopwatch();
private static object mLockObject = new Object();
private static string mFormatString = "mm:ss:fffff"; // ATTENTION: not displaying hours to make output fit into one line in console
private static volatile int mCounter;
private static volatile int mLogCounter;
static void Main(string[] args)
{
var testData = CreateTestData(mMinDelay, mMaxDelay, mMessagesPerTest);
RunTest(Scheduler.NewThread, testData);
RunTest(Scheduler.TaskPool,testData);
RunTest(Scheduler.ThreadPool, testData);
Console.WriteLine();
Console.WriteLine("All Tests finished. Press Enter to exit");
Console.ReadLine();
}
private static void RunTest(IScheduler scheduler, List<int> testData)
{
Thread.Sleep(1000);
Console.WriteLine();
Console.WriteLine("------- new test with " + scheduler.GetType().Name + " -------------");
mCounter = 1;
mLogCounter = 0;
mEnqueueingFinishedAfter = 0;
mResultList = new List<ScheduledItem>();
Stopwatch w = new Stopwatch();
mTestStopWatch.Restart();
w.Restart();
testData.ForEach(d =>
{
var now = DateTime.Now;
var delay = TimeSpan.FromMilliseconds(d);
var item = new ScheduledItem()
{
Delay = delay,
EnqueueTime = now,
DueTime = now + delay,
};
scheduler.Schedule(item, item.Delay, myActionFunc);
});
w.Stop();
mEnqueueingFinishedAfter = w.ElapsedMilliseconds;
if (!mWait.WaitOne(mMaxDelay*mMessagesPerTest))
{
Console.WriteLine("Could not finish test {0} in {1} seconds. Messages Processed {2}", scheduler.GetType().Name, (mMaxDelay*mMessagesPerTest/1000), mResultList.Count);
PrintStats(scheduler);
}
}
private static List<int> CreateTestData(int minDelay, int maxDelay, int count)
{
var result = new List<int>();
Random r = new Random();
for(int i = 0; i < count; i++)
{
var nextDelay = r.Next(minDelay, maxDelay);
result.Add(nextDelay);
}
return result;
}
private static Func<IScheduler, ScheduledItem, IDisposable> myActionFunc = new Func<IScheduler, ScheduledItem, IDisposable>(OnDequeued);
private static IDisposable OnDequeued(IScheduler scheduler, ScheduledItem item)
{
DateTime? now = null;
lock (mLockObject)
{
mExecutionStopWatch.Restart();
now = DateTime.Now;
item.Fault = (now.Value - item.DueTime).TotalMilliseconds;
mResultList.Add(item);
}
Console.WriteLine(
mLogCounter++ +
":{0} scheduled on {1} for execution on {2}, fault in ms: {3} delay in ms:{4}, thread: {5}",
now.Value.ToString(mFormatString),
item.EnqueueTime.ToString(mFormatString),
item.DueTime.ToString(mFormatString),
item.Fault,
item.Delay.TotalMilliseconds,
Thread.CurrentThread.ManagedThreadId);
if (mCounter++ == mMessagesPerTest)
{
mTestStopWatch.Stop();
PrintStats(scheduler);
mWait.Set();
}
mExecutionStopWatch.Stop();
item.ExecutionTime = mExecutionStopWatch.ElapsedMilliseconds;
return null;
}
private static void PrintStats(IScheduler scheduler)
{
var consistencyErrors = ConsistencyCheck(mResultList);
string consistencyResultString = Environment.NewLine + (consistencyErrors.Count == 0
? "Consistency Check passed."
: "Consistency Errors item: " + string.Join(",",consistencyErrors).TrimEnd(','));
Console.WriteLine();
Console.WriteLine(String.Format("Test: {0} Duration: {1} Enqueueing: {2} Average Fault: {3} Max Fault: {4} Cosistency Check Result: {5}, Average ExeTime: {6} Max ExeTime: {7}",
scheduler.GetType().Name,
mTestStopWatch.ElapsedMilliseconds,
mEnqueueingFinishedAfter,
(int) mResultList.Average(r => r.Fault),
(int) mResultList.Max(r => r.Fault),
consistencyResultString,
mResultList.Average(r => r.ExecutionTime),
mResultList.Max(r => r.ExecutionTime)
));
}
private static List<int> ConsistencyCheck(List<ScheduledItem> resultList)
{
var result = new List<int>();
for(int i = 1; i<resultList.Count;i++)
{
var item = resultList[i];
var previousItem = resultList[i-1];
if (item.DueTime < previousItem.DueTime)
{
result.Add(i);
}
}
return result;
}
}
}