2016-12-22 7 views
1

특정 시점에 실행될 작업과 관련된 항목을 비 차단 방식으로 사용하는 큐를 작성해야합니다. 즉, 항목과 관련된 모든 작업은 별도의 스레드에서 실행되어야합니다.System.Reactive Schedulers DueTime scheduling

며칠 전 System.Reactive.Concurrency.IScheduler 인터페이스와 스케줄러가있어서 System.Reactive에 내장되어 있습니다. 무엇이 특별히 관심을 끌었는지 Schedule<TState>(TState, TimeSpan, Func<IScheduler, TState, IDisposable>)Schedule() 메서드의 오버로드입니다.

이 과부하는 - Scheduler.ThreadPoolScheduler, Scheduler.TaskPoolScheduler 또는 NewThreadScheduler과 함께 사용하면됩니다.

그래서 나는이 포스트의 하단에있는 스케줄러를 비교하기 위해 좀 더 빠르고 프로그램을 썼다. 아쉽게도 모든 스케줄러에 대해 일관성 검사가 실패합니다. 즉, 나중에 실행되도록 예약 된 일부 항목이 이전에 실행될 항목보다 먼저 실행되었음을 의미합니다.

이것은 스케줄러의 성격 때문입니까? (새 스레드/작업을 시작하면 작업이 망가집니다.)?

using System; 
using System.Collections.Concurrent; 
using System.Collections.Generic; 
using System.Diagnostics; 
using System.Linq; 
using System.Reactive.Concurrency; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 

namespace RxDueTimeScheduling 
{ 
    public class ScheduledItem 
    { 
     public TimeSpan Delay; 
     public DateTime EnqueueTime; 
     public DateTime DueTime;   
     public double Fault; 
     public double ExecutionTime; 
    } 

class Program 
{ 
    private static AutoResetEvent mWait = new AutoResetEvent(false); 
    private static long mEnqueueingFinishedAfter; 
    private static int mMinDelay = 10; 
    private static int mMaxDelay = 500; 
    private static int mMessagesPerTest = 50; 
    private static List<ScheduledItem> mResultList; 
    private static Stopwatch mTestStopWatch = new Stopwatch(); 
    private static Stopwatch mExecutionStopWatch = new Stopwatch(); 
    private static object mLockObject = new Object(); 
    private static string mFormatString = "mm:ss:fffff"; // ATTENTION: not displaying hours to make output fit into one line in console 
    private static volatile int mCounter; 
    private static volatile int mLogCounter; 

    static void Main(string[] args) 
    { 
     var testData = CreateTestData(mMinDelay, mMaxDelay, mMessagesPerTest); 
     RunTest(Scheduler.NewThread, testData);    
     RunTest(Scheduler.TaskPool,testData); 
     RunTest(Scheduler.ThreadPool, testData); 

     Console.WriteLine(); 
     Console.WriteLine("All Tests finished. Press Enter to exit"); 
     Console.ReadLine(); 

    } 


    private static void RunTest(IScheduler scheduler, List<int> testData) 
    { 
     Thread.Sleep(1000); 
     Console.WriteLine(); 
     Console.WriteLine("------- new test with " + scheduler.GetType().Name + " -------------"); 
     mCounter = 1; 
     mLogCounter = 0;    
     mEnqueueingFinishedAfter = 0; 
     mResultList = new List<ScheduledItem>();    
     Stopwatch w = new Stopwatch(); 
     mTestStopWatch.Restart(); 
     w.Restart();   
     testData.ForEach(d => 
     { 
      var now = DateTime.Now; 
      var delay = TimeSpan.FromMilliseconds(d); 
      var item = new ScheduledItem() 
      { 
       Delay = delay, 
       EnqueueTime = now, 
       DueTime = now + delay,      
      }; 

      scheduler.Schedule(item, item.Delay, myActionFunc);     
     }); 
     w.Stop(); 
     mEnqueueingFinishedAfter = w.ElapsedMilliseconds; 
     if (!mWait.WaitOne(mMaxDelay*mMessagesPerTest)) 
     { 
      Console.WriteLine("Could not finish test {0} in {1} seconds. Messages Processed {2}", scheduler.GetType().Name, (mMaxDelay*mMessagesPerTest/1000), mResultList.Count); 
      PrintStats(scheduler); 
     } 
    } 


    private static List<int> CreateTestData(int minDelay, int maxDelay, int count) 
    { 
     var result = new List<int>(); 
     Random r = new Random(); 
     for(int i = 0; i < count; i++) 
     { 
      var nextDelay = r.Next(minDelay, maxDelay); 
      result.Add(nextDelay); 
     } 
     return result; 
    } 

    private static Func<IScheduler, ScheduledItem, IDisposable> myActionFunc = new Func<IScheduler, ScheduledItem, IDisposable>(OnDequeued); 


    private static IDisposable OnDequeued(IScheduler scheduler, ScheduledItem item) 
    { 
     DateTime? now = null; 
     lock (mLockObject)    
     { 

      mExecutionStopWatch.Restart();    
      now = DateTime.Now; 
      item.Fault = (now.Value - item.DueTime).TotalMilliseconds; 
      mResultList.Add(item); 
     } 


      Console.WriteLine(
       mLogCounter++ + 
       ":{0} scheduled on {1} for execution on {2}, fault in ms: {3} delay in ms:{4}, thread: {5}", 
       now.Value.ToString(mFormatString), 
       item.EnqueueTime.ToString(mFormatString), 
       item.DueTime.ToString(mFormatString), 
       item.Fault, 
       item.Delay.TotalMilliseconds, 
       Thread.CurrentThread.ManagedThreadId); 
      if (mCounter++ == mMessagesPerTest) 
      { 
       mTestStopWatch.Stop(); 
       PrintStats(scheduler); 
       mWait.Set(); 
      } 
      mExecutionStopWatch.Stop(); 
      item.ExecutionTime = mExecutionStopWatch.ElapsedMilliseconds; 
      return null;    
    } 

    private static void PrintStats(IScheduler scheduler) 
    { 
     var consistencyErrors = ConsistencyCheck(mResultList); 
     string consistencyResultString = Environment.NewLine + (consistencyErrors.Count == 0 
              ? "Consistency Check passed." 
              : "Consistency Errors item: " + string.Join(",",consistencyErrors).TrimEnd(',')); 
     Console.WriteLine(); 
     Console.WriteLine(String.Format("Test: {0} Duration: {1} Enqueueing: {2} Average Fault: {3} Max Fault: {4} Cosistency Check Result: {5}, Average ExeTime: {6} Max ExeTime: {7}", 
             scheduler.GetType().Name, 
             mTestStopWatch.ElapsedMilliseconds, 
             mEnqueueingFinishedAfter, 
             (int) mResultList.Average(r => r.Fault), 
             (int) mResultList.Max(r => r.Fault), 
             consistencyResultString, 
             mResultList.Average(r => r.ExecutionTime), 
             mResultList.Max(r => r.ExecutionTime) 
          )); 
    } 

    private static List<int> ConsistencyCheck(List<ScheduledItem> resultList) 
    { 
     var result = new List<int>(); 
     for(int i = 1; i<resultList.Count;i++) 
     { 
      var item = resultList[i]; 
      var previousItem = resultList[i-1]; 

      if (item.DueTime < previousItem.DueTime) 
      { 
       result.Add(i); 
      } 
     } 
     return result; 
    } 
} 

}

답변

0
Schedule<TState>(TState, TimeSpan, Func<IScheduler, TState, IDisposable>)

과부하 지정된 기간에 의해 작업 단위를 지연시킨다. 테스트에서는 각 항목에 대해 임의의 지연을 지정한 다음 순서 순서대로 실행 순서를 테스트합니다. 타이머 해상도보다 작은 지연의 경우 즉시 작업이 예약됩니다.

주문을 실행하려면 EventLoopScheduler으로 가십시오.

관련 문제