2012-10-23 3 views
2

나는 일반 URI 검색 시스템을 구축 중이다. 본질적으로 제네릭 클래스 Retriever<T>이 있으며 검색 할 URI 대기열을 유지 관리합니다. 가능한 한 빨리 해당 큐를 처리하는 별도의 스레드가 있습니다. 질문 제목에 표시된대로 URI 유형의 예는 HTTP 유형 URI입니다.많은 HTTP 웹 요청을 병렬 처리하는 좋은 방법은 무엇입니까?

문제는 리소스가 검색되도록 요청할 때 추상 메서드 T RetrieveResource(Uri location)을 통해 비동기 부족으로 인해 속도가 느려진다는 것입니다.

반환 유형을 RetrieveResource에서 Task<T>으로 변경하는 것이 가장 먼저 생각했습니다. 그러나 그것은 수천 개의 뛰어난 작업이있을 때 작업이 겹쳐서 많은 문제를 일으키는 것으로 보입니다. 스레드 풀을 활용하는 대신 실제 스레드를 많이 생성하는 것처럼 보입니다. 한 번에 너무 많은 일이 일어나기 때문에 모든 것이 느려지므로 상상할 수 없습니다.

검색 할 대기중인 항목이 많으며 대기열에 들어갈 때 처리 속도가 빠르지 않을 것으로 예상됩니다. 시간이 지남에 따라 시스템이 따라 잡을 수있는 기회가 있습니다. 그러나 확실히 빠르지는 않습니다.

대기열과 처리 할 스레드를 유지하는 대신에 작업 항목을 ThreadPool에 대기 시키기로했습니다. 그러나 모든 작업 항목이 처리되거나 나중에 우선 순위 또는 다른 것을 허용하기 전에 시스템을 종료해야한다고 말하는 경우 이상적이라고 확신하지 못합니다.

자원 검색은 시간이 많이 소요되는 프로세스 (0.250-5 초)이지만 반드시 리소스 집약적 인 프로세스는 아닙니다. 이를 수백 개의 요청에 병렬 처리하는 것이 좋습니다.

우리의 요구 사항은 다음과 같습니다

  • URI를 시스템이 큐 작업을 경우에도, 모든 스레드에서 큐에 할 수
      는 검색 나중에 우선 순위가 될 수 있어야 할 수도 있습니다
    • 검색이 있어야한다 일시 정지 가능
    • 아무 것도 검색되지 않을 때 최소 회전이 발생해야합니다 (여기서 BlockingCollection이 유용합니다).

    불필요한 복잡성없이 병렬 처리하는 좋은 방법이 있습니까?

    다음은 기존 코드 중 일부입니다.

    public abstract class Retriever<T> : IRetriever<T>, IDisposable 
    { 
        private readonly Thread worker; 
        private readonly BlockingCollection<Uri> pending; 
        private volatile int isStarted; 
        private volatile int isDisposing; 
    
        public event EventHandler<RetrievalEventArgs<T>> Retrieved; 
    
        protected Retriever() 
        { 
         this.worker = new Thread(this.RetrieveResources); 
         this.pending = new BlockingCollection<Uri>(new ConcurrentQueue<Uri>()); 
         this.isStarted = 0; 
         this.isDisposing = 0; 
        } 
    
        ~Retriever() 
        { 
         this.Dispose(false); 
        } 
    
        private void RetrieveResources() 
        { 
         while (this.isDisposing == 0) 
         { 
          while (this.isStarted == 0) 
          { 
           Monitor.Wait(this.pending); 
          } 
    
          Uri location = this.pending.Take(); 
    
          // This is what needs to be concurrently done. 
          // In this example, it's synchronous, but just on a separate thread. 
          T result = this.RetrieveResource(location); 
    
          // At this point, we would fire our event with the retrieved data 
         } 
        } 
    
        protected abstract T RetrieveResource(Uri location); 
    
        protected void Dispose(bool disposing) 
        { 
         if (Interlocked.CompareExchange(ref this.isDisposing, 1, 0) == 1) 
         { 
          return; 
         } 
    
         if (disposing) 
         { 
          this.pending.CompleteAdding(); 
          this.worker.Join(); 
         } 
        } 
    
        public void Add(Uri uri) 
        { 
         try 
         { 
          this.pending.Add(uri); 
         } 
         catch (InvalidOperationException) 
         { 
          return; 
         } 
        } 
    
        public void AddRange(IEnumerable<Uri> uris) 
        { 
         foreach (Uri uri in uris) 
         { 
          try 
          { 
           this.pending.Add(uri); 
          } 
          catch (InvalidOperationException) 
          { 
           return; 
          } 
         } 
        } 
    
        public void Start() 
        { 
         if (Interlocked.CompareExchange(ref this.isStarted, 1, 0) == 1) 
         { 
          throw new InvalidOperationException("The retriever is already started."); 
         } 
    
         if (this.worker.ThreadState == ThreadState.Unstarted) 
         { 
          this.worker.Start(); 
         } 
    
         Monitor.Pulse(this.pending); 
        } 
    
        public void Stop() 
        { 
         if (Interlocked.CompareExchange(ref this.isStarted, 0, 1) == 0) 
         { 
          throw new InvalidOperationException("The retriever is already stopped."); 
         } 
        } 
    
        public void Dispose() 
        { 
         this.Dispose(true); 
         GC.SuppressFinalize(this); 
        } 
    } 
    

    위 ... 차라리 너무 많은 복잡성, 이상한 코드를 추가 생각이에 대한 해결책 예제를 빌드하려면 ...이 될 것입니다.

    private void RetrieveResources() 
        { 
         while (this.isDisposing == 0) 
         { 
          while (this.isStarted == 0) 
          { 
           Monitor.Wait(this.pending); 
          } 
    
          Uri location = this.pending.Take(); 
    
          Task<T> task = new Task<T>((state) => 
           { 
            return this.RetrieveResource(state as Uri); 
           }, location); 
    
          task.ContinueWith((t) => 
           { 
            T result = t.Result; 
            RetrievalEventArgs<T> args = new RetrievalEventArgs<T>(location, result); 
    
            EventHandler<RetrievalEventArgs<T>> callback = this.Retrieved; 
            if (!Object.ReferenceEquals(callback, null)) 
            { 
             callback(this, args); 
            } 
           }); 
    
          task.Start(); 
         } 
        } 
    
  • +0

    수천 개의 뛰어난 작업이있을 때 * 많은 문제를 일으 킵니까? –

    +0

    @CuongLe 거기에 몇 가지 설명을 추가했습니다. 기본적으로 프로파일 링을 통해 많은 뛰어난 작업이있을 때 많은 컨텍스트 전환이 있음을 알았습니다. 원래이 시스템은 HTTP 웹 요청에만 국한되었습니다. WebClient, HttpClient 및 HttpWebRequest에서 사용할 수있는 비동기 메서드를 사용하면 HTTP 웹 요청이 .NET에서 자동으로 실행되는 것으로 표시되었습니다. 이것들은 쓰레드 풀을 사용하는 대신 독립적 인 백그라운드 쓰레드를 만듭니다. –

    +0

    RetrieveResources()는 private 메서드이기 때문에 RetrieveResources()를 호출 할 때마다 Retriever 하위 클래스 을 인스턴스화해야합니다. 리트리버 생성자에서 거기에 스레드를 시작합니다. 스레드의 오버 헤드는 1M 정도이므로 사용 여부에 상관 없습니다. 그래서 Retriever 개체가 수천 개일 때, 모든 모리풀을 사용하는 수천 개의 스레드가 생겨 프로그램이 마침내 부서 질 수 있습니다. 나는 당신이 생산자/소비자 패턴을 시도 할 것을 제안한다. 한 명의 생산자와 여러 명의 소비자가 있습니다. 소비자는 스레드 풀 – Larry

    답변

    2

    나는 꽤 좋은 해결책을 생각해 냈습니다. 리소스를 검색하는 방법과 결과의 표현 방법을 모두 추상화했습니다. 이렇게하면 임의의 결과가있는 임의의 URI를 검색 할 수 있습니다. 어떤 종류의 URI는 "ORM"과 유사합니다.

    가변적 인 동시성 수준을 지원합니다. 전에 질문을 게시했을 때, 비동기 및 동시성이 상당히 다르다는 것을 잊었습니다. 작업에서 모두 달성 한 것은 비동기식이었고 작업 스케줄러를 방해했습니다. 실제로 원하는 것은 동시성 이었기 때문입니다.

    시작/중지 기능이있는 것이 좋다고 생각했기 때문에 취소로 추가했습니다.

    public abstract class Retriever<T> : IRetriever<T> 
    { 
        private readonly object locker; 
        private readonly BlockingCollection<Uri> pending; 
        private readonly Thread[] threads; 
        private CancellationTokenSource cancellation; 
    
        private volatile int isStarted; 
        private volatile int isDisposing; 
    
        public event EventHandler<RetrieverEventArgs<T>> Retrieved; 
    
        protected Retriever(int concurrency) 
        { 
         if (concurrency <= 0) 
         { 
          throw new ArgumentOutOfRangeException("concurrency", "The specified concurrency level must be greater than zero."); 
         } 
    
         this.locker = new object(); 
         this.pending = new BlockingCollection<Uri>(new ConcurrentQueue<Uri>()); 
         this.threads = new Thread[concurrency]; 
         this.cancellation = new CancellationTokenSource(); 
    
         this.isStarted = 0; 
         this.isDisposing = 0; 
    
         this.InitializeThreads(); 
        } 
    
        ~Retriever() 
        { 
         this.Dispose(false); 
        } 
    
        private void InitializeThreads() 
        { 
         for (int i = 0; i < this.threads.Length; i++) 
         { 
          Thread thread = new Thread(this.ProcessQueue) 
          { 
           IsBackground = true 
          }; 
    
          this.threads[i] = thread; 
         } 
        } 
    
        private void StartThreads() 
        { 
         foreach (Thread thread in this.threads) 
         { 
          if (thread.ThreadState == ThreadState.Unstarted) 
          { 
           thread.Start(); 
          } 
         } 
        } 
    
        private void CancelOperations(bool reset) 
        { 
         this.cancellation.Cancel(); 
         this.cancellation.Dispose(); 
    
         if (reset) 
         { 
          this.cancellation = new CancellationTokenSource(); 
         } 
        } 
    
        private void WaitForThreadsToExit() 
        { 
         foreach (Thread thread in this.threads) 
         { 
          thread.Join(); 
         } 
        } 
    
        private void ProcessQueue() 
        { 
         while (this.isDisposing == 0) 
         { 
          while (this.isStarted == 0) 
          { 
           Monitor.Wait(this.locker); 
          } 
    
          Uri location; 
    
          try 
          { 
           location = this.pending.Take(this.cancellation.Token); 
          } 
          catch (OperationCanceledException) 
          { 
           continue; 
          } 
    
          T data; 
    
          try 
          { 
           data = this.Retrieve(location, this.cancellation.Token); 
          } 
          catch (OperationCanceledException) 
          { 
           continue; 
          } 
    
          RetrieverEventArgs<T> args = new RetrieverEventArgs<T>(location, data); 
    
          EventHandler<RetrieverEventArgs<T>> callback = this.Retrieved; 
          if (!Object.ReferenceEquals(callback, null)) 
          { 
           callback(this, args); 
          } 
         } 
        } 
    
        private void ThowIfDisposed() 
        { 
         if (this.isDisposing == 1) 
         { 
          throw new ObjectDisposedException("Retriever"); 
         } 
        } 
    
        protected abstract T Retrieve(Uri location, CancellationToken token); 
    
        protected virtual void Dispose(bool disposing) 
        { 
         if (Interlocked.CompareExchange(ref this.isDisposing, 1, 0) == 1) 
         { 
          return; 
         } 
    
         if (disposing) 
         { 
          this.CancelOperations(false); 
          this.WaitForThreadsToExit(); 
          this.pending.Dispose(); 
         } 
        } 
    
        public void Start() 
        { 
         this.ThowIfDisposed(); 
    
         if (Interlocked.CompareExchange(ref this.isStarted, 1, 0) == 1) 
         { 
          throw new InvalidOperationException("The retriever is already started."); 
         } 
    
         Monitor.PulseAll(this.locker); 
         this.StartThreads(); 
        } 
    
        public void Add(Uri location) 
        { 
         this.pending.Add(location); 
        } 
    
        public void Stop() 
        { 
         this.ThowIfDisposed(); 
    
         if (Interlocked.CompareExchange(ref this.isStarted, 0, 1) == 0) 
         { 
          throw new InvalidOperationException("The retriever is already stopped."); 
         } 
    
         this.CancelOperations(true); 
        } 
    
        public void Dispose() 
        { 
         this.Dispose(true); 
         GC.SuppressFinalize(this); 
        } 
    } 
    
    +0

    이것을 사용하면 얼마 지나지 않아 빠른 웹 서버로의 빠른 경로에서 병목 현상이 HTTP 요청조차 발생하지 않는 것처럼 보입니다. 스트림에서 수천 개의 요청이 병목 현상을 읽는 중입니다. –

    +0

    Start 메서드에서 lock (locker)이 누락되었습니다. 그렇지 않으면 동기화 예외가 발생합니다. 스레드 상태가 항상 Unstarted이므로 StartThreads 메서드도 잘못되었습니다. Unstarted와 같지 않은 백그라운드는 thread.ThreadState.HasFlag (ThreadState.Unstarted)를 사용하거나 스레드가 절대로 시작하지 않아야합니다. – formatc

    +0

    실제로 문제를 실제로 테스트했을 때 그 문제를 생각해 보았습니다. 맞아,'HasFlag' 호출이 필요합니다. 또한'Monitor' 호출을 감싸는'lock'도 필요합니다. 두 가지 수정 사항은 나중에 편집하겠습니다. 또는 다른 사람이 할 수 있습니다. 좋은 메모, 고마워. –

    관련 문제