2010-12-29 1 views
14

대기열을 사용하는 가장 좋은 방법은 무엇인지 알아 내려고합니다. DataTable을 반환하는 프로세스가 있습니다. 각 DataTable은 이전 DataTable과 병합됩니다. 한 가지 문제가 있습니다. 마지막 BulkCopy (OutOfMemory)까지 보유 할 레코드가 너무 많습니다.ConcurrentQueue를 사용한 스레딩 방법 <T>

따라서 각 수신 DataTable을 즉시 처리해야한다고 결정했습니다. ConcurrentQueue<T>에 대해 생각하고 있지만 ... WriteQueuedData() 메서드가 테이블을 큐에서 빼고 데이터베이스에 쓰는 방법을 알지 못합니다. 예를 들어

:

public class TableTransporter 
{ 
    private ConcurrentQueue<DataTable> tableQueue = new ConcurrentQueue<DataTable>(); 

    public TableTransporter() 
    { 
     tableQueue.OnItemQueued += new EventHandler(WriteQueuedData); // no events available 
    } 

    public void ExtractData() 
    { 
     DataTable table; 

     // perform data extraction 
     tableQueue.Enqueue(table); 
    } 

    private void WriteQueuedData(object sender, EventArgs e) 
    { 
     BulkCopy(e.Table); 
    } 
} 

내 첫 번째 질문은 제쳐두고, 내가 ExtractData()를 호출하면 실제로,에 가입 이벤트를 필요가 없다는 사실에서 비동기 적으로이 내가 필요로하는 모든 것입니까? 둘째로, 내가 대기하고있는 객체와 비동기 적으로 작동하기 위해 어떤 형태의 트리거가 필요하고 기능이 필요합니까?

업데이트 나는 그냥 OnItemQueued 이벤트 핸들러가 ConcurrentQueue<T>에서 클래스를 파생했다. 그 다음 :

new public void Enqueue (DataTable Table) 
{ 
    base.Enqueue(Table); 
    OnTableQueued(new TableQueuedEventArgs(Table)); 
} 

public void OnTableQueued(TableQueuedEventArgs table) 
{ 
    EventHandler<TableQueuedEventArgs> handler = TableQueued; 

    if (handler != null) 
    { 
     handler(this, table); 
    } 
} 

이 구현에 대한 우려 사항은 무엇입니까?

답변

18

문제에 대한 나의 이해에서, 당신은 몇 가지를 놓치고 있습니다.

동시 대기열은 데이터 구조를 명시 적으로 잠글 필요없이 대기열에 읽고 쓰는 다중 스레드를 허용하도록 설계된 데이터 구조입니다. (모든 재즈는 무대 뒤에서 돌보아 지거나 컬렉션이 잠금을 사용할 필요가없는 방식으로 구현됩니다.)

이를 염두에두고 시도한 패턴처럼 보입니다. 사용하는 것은 "생산/소비 자"입니다. 먼저 작업을 생성하고 대기열에 항목을 추가하는 작업이 있습니다. 두 번째 작업에는 두 번째 작업이 있습니다. 대기열에서 항목을 소비합니다 (항목을 dequeing).

정말 정말 두 개의 스레드가 필요합니다. 하나는 항목을 추가하고 다른 하나는 항목을 제거하는 것입니다. 동시 콜렉션을 사용 중이므로 항목을 추가하는 여러 스레드와 항목을 제거하는 여러 스레드를 가질 수 있습니다. 하지만 분명히 동시 대기열에서 더 많은 경쟁이 발생하면 병목 현상이 빨라집니다.

+0

2 개의 스레드가 있다고 생각했습니다. 주 스레드는 기본적으로 이벤트가 트리거 될 때까지 대기합니다. 두 번째 스레드는'ExtractData()'에 대한 비동기 호출로 시작합니다. 비동기 콜백에서 추출 프로세스를 계속 진행합니다. – IAbstract

+0

사실, 나는 그것을 거꾸로 가지고 있다고 생각한다; 주 스레드는 대기열에 들어가는 데이터 테이블이어야합니다. 대기열에있는 항목 이벤트 트리거를 통해 비동기 쓰기 메소드를 시작하십시오. – IAbstract

3

이 내가 생각 해낸 무엇을위한 완벽한 솔루션입니다 :

public class TableTransporter 
{ 
    private static int _indexer; 

    private CustomQueue tableQueue = new CustomQueue(); 
    private Func<DataTable, String> RunPostProcess; 
    private string filename; 

    public TableTransporter() 
    { 
     RunPostProcess = new Func<DataTable, String>(SerializeTable); 
     tableQueue.TableQueued += new EventHandler<TableQueuedEventArgs>(tableQueue_TableQueued); 
    } 

    void tableQueue_TableQueued(object sender, TableQueuedEventArgs e) 
    { 
     // do something with table 
     // I can't figure out is how to pass custom object in 3rd parameter 
     RunPostProcess.BeginInvoke(e.Table,new AsyncCallback(PostComplete), filename); 
    } 

    public void ExtractData() 
    { 
     // perform data extraction 
     tableQueue.Enqueue(MakeTable()); 
     Console.WriteLine("Table count [{0}]", tableQueue.Count); 
    } 

    private DataTable MakeTable() 
    { return new DataTable(String.Format("Table{0}", _indexer++)); } 

    private string SerializeTable(DataTable Table) 
    { 
     string file = Table.TableName + ".xml"; 

     DataSet dataSet = new DataSet(Table.TableName); 

     dataSet.Tables.Add(Table); 

     Console.WriteLine("[{0}]Writing {1}", Thread.CurrentThread.ManagedThreadId, file); 
     string xmlstream = String.Empty; 

     using (MemoryStream memstream = new MemoryStream()) 
     { 
      XmlSerializer xmlSerializer = new XmlSerializer(typeof(DataSet)); 
      XmlTextWriter xmlWriter = new XmlTextWriter(memstream, Encoding.UTF8); 

      xmlSerializer.Serialize(xmlWriter, dataSet); 
      xmlstream = UTF8ByteArrayToString(((MemoryStream)xmlWriter.BaseStream).ToArray()); 

      using (var fileStream = new FileStream(file, FileMode.Create)) 
       fileStream.Write(StringToUTF8ByteArray(xmlstream), 0, xmlstream.Length + 2); 
     } 
     filename = file; 

     return file; 
    } 

    private void PostComplete(IAsyncResult iasResult) 
    { 
     string file = (string)iasResult.AsyncState; 
     Console.WriteLine("[{0}]Completed: {1}", Thread.CurrentThread.ManagedThreadId, file); 

     RunPostProcess.EndInvoke(iasResult); 
    } 

    public static String UTF8ByteArrayToString(Byte[] ArrBytes) 
    { return new UTF8Encoding().GetString(ArrBytes); } 

    public static Byte[] StringToUTF8ByteArray(String XmlString) 
    { return new UTF8Encoding().GetBytes(XmlString); } 
} 

public sealed class CustomQueue : ConcurrentQueue<DataTable> 
{ 
    public event EventHandler<TableQueuedEventArgs> TableQueued; 

    public CustomQueue() 
    { } 
    public CustomQueue(IEnumerable<DataTable> TableCollection) 
     : base(TableCollection) 
    { } 

    new public void Enqueue (DataTable Table) 
    { 
     base.Enqueue(Table); 
     OnTableQueued(new TableQueuedEventArgs(Table)); 
    } 

    public void OnTableQueued(TableQueuedEventArgs table) 
    { 
     EventHandler<TableQueuedEventArgs> handler = TableQueued; 

     if (handler != null) 
     { 
      handler(this, table); 
     } 
    } 
} 

public class TableQueuedEventArgs : EventArgs 
{ 
    #region Fields 
    #endregion 

    #region Init 
    public TableQueuedEventArgs(DataTable Table) 
    {this.Table = Table;} 
    #endregion 

    #region Functions 
    #endregion 

    #region Properties 
    public DataTable Table 
    {get;set;} 
    #endregion 
} 

개념의 증거, 꽤 잘 작동하는 것 같다. 기껏해야 나는 4 개의 작업자 스레드를 보았습니다.

+0

TODO : 최신 비동기 메소드로 업데이트하십시오. – IAbstract

+0

이것을 살펴보면 좋은 구현이지만, 빠른 테스트를 실행하면 언제 항목을 대기열에서 빼낼 수 있습니까? –

+0

@RichardPriddy : 5 년 전 이었으므로 (* 3 번째 회사로 옮긴 지 오래되었습니다.), 나는 이것이 완전한 예가 아니라고 생각할 수 있습니다. 마지막에 * 개념 증명 * 발언을 주목하십시오. ;) 즉, 요구 사항에 따라 당신은'대기열에 넣은'이벤트를 노출시킬 수 있고, 다른 어떤 것이 대기열에서 빼는 것을 처리하게 할 수 있습니다. 그렇지 않으면, 포스트 프로세스 함수의'AsyncCallback' 어딘가에서 큐를 빼는 것이 논리적 일 수 있습니다. 이 늦은 날짜에 더 구체적인 것을 찾아내는 것은 정말로 어려울 것입니다. – IAbstract

8

ConcurrentQueue는 매우 드문 경우에만 유용하다고 생각합니다. 주요 장점은 자물쇠가 없다는 것입니다. 그러나 대개 생산자 스레드는 처리 할 수있는 데이터가 있음을 어떻게 든 소비자 스레드에 알려야합니다. 스레드 간의 시그널링에는 잠금이 필요하며 ConcurrentQueue를 사용하면 이점이 없어집니다. 스레드를 동기화하는 가장 빠른 방법은 잠금 내에서만 작동하는 Monitor.Pulse()를 사용하는 것입니다. 다른 모든 동기화 도구는 더 느립니다.

물론 소비자는 대기열에 잠김없이 작동하는 항목이 있는지 계속 확인할 수 있지만 프로세서 자원을 낭비합니다. 소비자가 점검하는 사이에 기다리는 것이 조금은 더 좋습니다.

대기열에 쓸 때 스레드를 높이는 것은 매우 나쁜 생각입니다. 이벤트 핸들러를 실행하면 ConcurrentQueue를 사용하여 1 마이크로 초를 절약 할 수 있으며 이벤트 처리기를 실행하면 1000 배 이상 오래 걸릴 수 있습니다.

모든 처리가 이벤트 처리기 또는 비동기 호출에서 수행되는 경우 질문은 왜 여전히 대기열이 필요합니까? 핸들러에 직접 데이터를 전달하는 것이 좋으며 대기열을 전혀 사용하지 않는 것이 좋습니다.

ConcurrentQueue의 구현은 동시성을 허용하기에 다소 복잡합니다. 대부분의 경우 일반 대기열 <>을 사용하고 대기열에 대한 모든 액세스를 잠급니다. 대기열 액세스에는 마이크로 초 만 있으면되기 때문에 2 개의 스레드가 동일한 마이크로 초 안에 대기열에 액세스하는 것이 거의 불가능하며 잠금 때문에 지연이 거의 없습니다. 잠금을 사용하는 일반 대기열 <>을 사용하면 ConcurrentQueue보다 더 빠른 코드 실행이 종종 발생합니다.

+0

다운 투표를받는 것에 대해 수치스러워합니다. 나는 이것이 유효하고 실용적인 의견이라고 생각한다. – user3085342