2

만료 시나리오는 : 나는 서버 B멀티 스레딩 SqlDataReader 개체 시간은 여기

데이터는 매우 크다으로 서버 A에서 테이블에 데이터를 삽입 할 , 그래서 일을 할 SqlBulkCopy의를 사용합니다.

프로듀서 : 모든 페이지 번호를

소비자 : 서버 A에서 페이지 번호, 페이징 데이터를 얻을 수 및 서버에 DataReader를 쓰기

문제 여기

내 생각이다 나는 만났다 :

오랜 시간을 실행하면, 예외는 다음과 같이된다 :

Timeout expired. The timeout period elapsed prior to completion of the operation 


[SqlException (0x80131904): Timeout expired. The timeout period elapsed prior to completion of the operation or the server is not responding.] System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection) +1948826 
System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception, Boolean breakConnection) +4844747 
System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj) +194 
System.Data.SqlClient.TdsParser.Run(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj) +2392 
System.Data.SqlClient.SqlDataReader.ConsumeMetaData() +33 
System.Data.SqlClient.SqlDataReader.get_MetaData() +83 
System.Data.SqlClient.SqlCommand.FinishExecuteReader(SqlDataReader ds, RunBehavior runBehavior, String resetOptionsString) +297 
System.Data.SqlClient.SqlCommand.RunExecuteReaderTds(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, Boolean async) +954 
System.Data.SqlClient.SqlCommand.RunExecuteReader(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, String method, DbAsyncResult result) +162 
System.Data.SqlClient.SqlCommand.RunExecuteReader(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, String method) +32 
System.Data.SqlClient.SqlCommand.ExecuteReader(CommandBehavior behavior, String method) +141 
System.Data.SqlClient.SqlCommand.ExecuteDbDataReader(CommandBehavior behavior) +12 
System.Data.Common.DbCommand.ExecuteReader() +12 
System.Data.Linq.SqlClient.SqlProvider.Execute(Expression query, QueryInfo queryInfo, IObjectReaderFactory factory, Object[] parentArgs, Object[] userArgs, ICompiledSubQuery[] subQueries, Object lastResult) +332 

내 코드에 문제가 없습니다.

Program.cs

class Program 
{ 
    static BlockingCollection<int> pcCollection = new BlockingCollection<int>(); 
    static string strRemoteConn = ConfigurationManager.AppSettings["RemoteConnStr"]; 
    static string strLocalConn = ConfigurationManager.AppSettings["LocalConnStr"]; 
    static string strCommandSql = ConfigurationManager.AppSettings["CommandSQL"]; 
    static string strTableName = ConfigurationManager.AppSettings["TableName"]; 
    static int batchSize = Int32.Parse(ConfigurationManager.AppSettings["CommitBatchSize"]); 
    static int taskCount = Int32.Parse(ConfigurationManager.AppSettings["TaskCount"]); 

    static object s_consumer = new object(); 

    static void Main(string[] args) 
    { 
     try 
     { 
      var watch = Stopwatch.StartNew(); 

      var tableCount = 0D; 

      using (var connection = new SqlConnection(strRemoteConn)) 
      using (SqlCommand cmd = connection.CreateCommand()) 
      { 
       connection.Open(); 
       cmd.CommandText = string.Format(@"SELECT 
                Total_Rows= SUM(st.row_count) 
               FROM 
                sys.dm_db_partition_stats st 
               WHERE 
                object_name(object_id) = '{0}' AND (index_id < 2)", strTableName); 
       cmd.CommandTimeout = 300; 

       tableCount = Double.Parse(cmd.ExecuteScalar().ToString()); 
      } 

      var totalPages = (int)Math.Ceiling(tableCount/batchSize); 

      var listPageRn = Enumerable.Range(1, totalPages); 

      var listPartPage = listPageRn.Split(taskCount).ToList(); 

      var listProducerTask = new List<Task>(); 
      var listConsumerTask = new List<Task>(); 

      var consumerTask = taskCount; 

      for (int i = 1; i <= consumerTask; i++) 
      { 
       var taskFlag = i; 
       var consumer = Task.Factory.StartNew(() => 
       { 
        ConsumerAction(taskFlag.ToString()); 

       }, TaskCreationOptions.LongRunning); 

       listConsumerTask.Add(consumer); 
      } 

      var producerTaskIndex = 1; 
      foreach (var item in listPartPage) 
      { 
       var tmpIndex = producerTaskIndex.ToString(); 
       var producer = Task.Factory.StartNew(() => 
       { 
        ProducerAction(item, tmpIndex); 
       }); 

       listProducerTask.Add(producer); 
       producerTaskIndex++; 
      } 

      Task.WaitAll(listProducerTask.ToArray()); 
      pcCollection.CompleteAdding(); 
      Task.WaitAll(listConsumerTask.ToArray()); 

      watch.Stop(); 
      var mins = watch.ElapsedMilliseconds/1000/60; 
      Console.WriteLine("All Batch Insert Time Elapsed:\t {0} mins", mins); 
     } 
     catch (AggregateException ex) 
     { 
      using (StreamWriter writer = File.AppendText("BatchError.txt")) 
      { 
       writer.WriteLine("Error Time: {0}", DateTime.Now); 
       foreach (var exception in ex.InnerExceptions) 
       { 
        writer.WriteLine("Error: {0}", exception.Message); 
        writer.WriteLine("Source: {0}", exception.Source); 
        writer.WriteLine("Track: {0}", exception.StackTrace); 
       } 
      } 
      throw; 

     } 
     catch (Exception ex) 
     { 
      using (StreamWriter writer = File.AppendText("BatchError.txt")) 
      { 
       writer.WriteLine("Error Time: {0}", DateTime.Now); 
       writer.WriteLine("Error: {0}", ex.Message); 
       writer.WriteLine("Source: {0}", ex.Source); 
       writer.WriteLine("Track: {0}", ex.StackTrace); 
      } 
      throw; 
     } 

     Console.ReadLine(); 
    } 

    static void ProducerAction(IEnumerable<int> source, string taskFlag = "1") 
    { 
     foreach (var item in source) 
     { 
      Console.WriteLine("Producer-{0} processing item batch {1}", taskFlag, item); 

      pcCollection.Add(item); 
     } 
    } 

    static void ConsumerAction(string taskFlag = "") 
    { 
     foreach (var item in pcCollection.GetConsumingEnumerable()) 
     { 
      Console.WriteLine("consumer-{0} processing item", taskFlag); 
      var processing = new ManageBatchProcessing 
      { 
       LocalConnStr = strLocalConn, 
       RemoteConnStr = strRemoteConn, 
       BatchSize = batchSize, 
       TableName = strTableName, 
       CommandSql = strCommandSql 
      }; 

      processing.ProcessDatabase(item); 
     } 
    } 

ManageBatchProcessing.cs

public class ManageBatchProcessing 
{ 
    public string LocalConnStr { get; set; } 
    public string RemoteConnStr { get; set; } 

    public string CommandSql { get; set; } 
    public int BatchSize { get; set; } 
    public string TableName { get; set; } 

    public void ProcessDatabase(int item) 
    { 
     var watch = new Stopwatch(); 
     watch.Start(); 

     var start = (item - 1) * this.BatchSize + 1; 
     var end = item * this.BatchSize; 
     var strCommandSql = string.Format(this.CommandSql, start, end); 
     using (var remoteConn = new SqlConnection(this.RemoteConnStr)) 
     using (var localConn = new SqlConnection(this.LocalConnStr)) 
     { 
      remoteConn.Open(); 
      localConn.Open(); 

      using (var command = new SqlCommand(strCommandSql, remoteConn)) 
      using (var dataReader = command.ExecuteReader()) 
      { 
       command.CommandTimeout = 0; 
       using (var bulkCopy = new SqlBulkCopy(localConn)) 
       { 
        bulkCopy.DestinationTableName = this.TableName; 
        bulkCopy.BulkCopyTimeout = 0; 
        bulkCopy.WriteToServer(dataReader); 
        bulkCopy.Close(); 
       } 
      } 

      remoteConn.Close(); 
      localConn.Close(); 
     } 

     watch.Stop(); 

     var totalSeconds = (double)watch.ElapsedMilliseconds/1000; 
     Console.WriteLine("\t\t\t -------------------------------------------------"); 
     Console.ForegroundColor = ConsoleColor.Green; 
     Console.WriteLine("\t\t\t insert target table done {0} s", totalSeconds.ToString("#.##")); 
     Console.ResetColor(); 
     Console.WriteLine("\t\t\t -------------------------------------------------"); 
    } 

    public DataTable RetriveToDatabase(int item) 
    { 
     var start = (item - 1) * this.BatchSize + 1; 
     var end = item * this.BatchSize; 
     var dataTable = new DataTable(); 

     using (var connection = new SqlConnection(this.RemoteConnStr)) 
     { 
      var watch = new Stopwatch(); 
      watch.Start(); 

      using (var adapter = new SqlDataAdapter(string.Format(this.CommandSql, start, end), connection)) 
      { 
       adapter.SelectCommand.CommandTimeout = 3600; 
       adapter.Fill(dataTable); 
      } 

      watch.Stop(); 

      var totalSeconds = (double)watch.ElapsedMilliseconds/1000; 
      Console.WriteLine("\t\t\t -------------------------------------------------"); 
      Console.ForegroundColor = ConsoleColor.Red; 
      Console.WriteLine("\t\t\t convert datareader to table done {0} s", totalSeconds.ToString("#.##")); 
      Console.ResetColor(); 
      Console.WriteLine("\t\t\t -------------------------------------------------"); 
      return dataTable; 
     } 
    } 

    public void WriteToDatabase(IDataReader reader) 
    { 
     using (var connection = new SqlConnection(this.LocalConnStr)) 
     { 
      var watch = new Stopwatch(); 
      watch.Start(); 

      connection.Open(); 

      using (var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null)) 
      { 
       bulkCopy.DestinationTableName = this.TableName; 
       bulkCopy.BulkCopyTimeout = 0; 
       bulkCopy.WriteToServer(reader); 
      } 

      connection.Close(); 

      watch.Stop(); 

      var totalSeconds = (double)watch.ElapsedMilliseconds/1000; 
      Console.WriteLine("\t\t\t -------------------------------------------------"); 
      Console.ForegroundColor = ConsoleColor.Green; 
      Console.WriteLine("\t\t\t insert target table done {0} s", totalSeconds.ToString("#.##")); 
      Console.ResetColor(); 
      Console.WriteLine("\t\t\t -------------------------------------------------"); 
     } 
    } 

    public void WriteToDatabase(DataTable dataTable) 
    { 
     using (var connection = new SqlConnection(this.LocalConnStr)) 
     { 
      var watch = new Stopwatch(); 
      watch.Start(); 

      connection.Open(); 

      using (var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null)) 
      { 
       bulkCopy.DestinationTableName = this.TableName; 
       bulkCopy.BulkCopyTimeout = 0; 
       bulkCopy.WriteToServer(dataTable); 
      } 

      connection.Close(); 

      watch.Stop(); 

      var totalSeconds = (double)watch.ElapsedMilliseconds/1000; 
      Console.WriteLine("\t\t\t -------------------------------------------------"); 
      Console.ForegroundColor = ConsoleColor.Green; 
      Console.WriteLine("\t\t\t insert target table done {0} s", totalSeconds.ToString("#.##")); 
      Console.ResetColor(); 
      Console.WriteLine("\t\t\t -------------------------------------------------"); 
     } 
    } 

} 
+0

당신이 [복제]를 수행하려고 (https://msdn.microsoft.com/en-us/library/ms151198.aspx) : 여기

오른쪽 버전? – Prisoner

+0

라고 말할 수 있습니다. 그러나 데이터에는 적어도 천만 또는 억이 있습니다. –

+0

복제가 적합하면 수동 작업보다는 복제를 사용해보십시오. – Prisoner

답변

0

나는 마침내 내 코드를 잘못 알고 :

여기 내 코드입니다.

command.CommandTimeout = 0; 

이 잘못된 위치에 있습니다.

using (var command = new SqlCommand(strCommandSql, remoteConn)) 
      { 
       command.CommandTimeout = 0; 

       using (var dataReader = command.ExecuteReader()) 
       { 
        using (var bulkCopy = new SqlBulkCopy(this.LocalConnStr, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.UseInternalTransaction)) 
        { 
         bulkCopy.DestinationTableName = this.TableName; 
         bulkCopy.BulkCopyTimeout = 0; 
         bulkCopy.WriteToServer(dataReader); 
         bulkCopy.Close(); 
        } 
       } 
      }