0

내 앱에서 TCP 연결을 처리하기위한 여러 스레드가 있습니다. 하나는 읽기 용이고 하나는 송신 용이고 다른 하나는 새로운 연결을 처리하기위한 것입니다. 각 스레드는 모든 클라이언트에 대해 지정된 유형의 작업을 처리하므로 서로 다른 IP에 5 TcpClient 인스턴스로 데이터를 보냅니다. 나는 보내는 스레드에서 액세스하고 있기 때문에 버퍼로 BlockingCollection을 보내고 있지만 보내지는 데이터를 생성하는 다른 스레드에서도 보낸다. 보내는 스레드에서 실행되는 내 기능은 다음과 같습니다BlockingCollection.TryTake()가 시간 초과를 초과했습니다.

private void Sender() 
    { 
     while (run) 
     { 
      List<object[]> toRemove = new List<object[]>(); //bypass the fact that I cant remove a foreach iteration item from the BlockingCollection 
      foreach (object[] obj in sendBuffer.ToList()) 
      { 
       string IP = obj[0].ToString(); 
       string msg = obj[1].ToString(); 
       byte[] data = Encoding.UTF8.GetBytes(msg); 
       foreach (TcpClient tcpc in TcpClients) 
       { 
        if ((tcpc.Client.RemoteEndPoint as IPEndPoint).Address.ToString() == IP) 
        { 
         NetworkStream stream = tcpc.GetStream(); 
         stream.Write(data, 0, data.Length); 
         break; 
        } 
       } 
       toRemove.Add(obj); 
      } 
      for (int i = 0; i < toRemove.Count; i++) //the bypass mentioned above 
      { 
       object[] rmv = toRemove[i]; 
       sendBuffer.TryTake(out rmv); 
      } 
     } 
    } 

참고 : 사용 BlockingCollection<object[]>을 입력합니다. 내 문제는 트래픽의 일부 지점에서 버퍼가 채우기 시작한다는 것입니다. 버퍼에 최대 500 개의 메시지 제한을 설정했으며이를 쉽게 오버플로합니다. 자, 정확하게 이해하지 못했다면, 무엇을 TryTake은 아이템을 제거하려고 시도하고 있으며, 그 순간에 컬렉션이 사용 중이라면, 기다렸다가 다시 시도합니다. (참고 : 시간 제한을 50 밀리 초로 설정하려고 시도했습니다.) 이것이 사실이라면 (그렇지 않은 사람이 나를 수정하고 다른 이유를 제안하십시오), 문제는 아마도 컬렉션이 TryTake이 호출 될 때 대부분 바쁘다고 생각할 수 있습니다. 그럴 수 있니? 그리고 그렇다면 어떻게 해결할 수 있습니까?

데이터를 생성하는 스레드가 컬렉션을 사용하는 경우 컬렉션은 1-80 개의 항목 범위를 반복하는 foreach에서 2 초당 한 번 액세스됩니다. 그때까지 버퍼는 약 20 개 이상의 항목에서 문제가 발생하기 시작합니다. 보낸 사람 스레드가 이제는 하나의 클라이언트로만 전송됩니다. 나중에 최대 15 개가됩니다. 따라서 최고점 인 경우 80 개 항목 x 15 명의 사용자 = 약 2 초당 약 1200 개의 액세스가 발생합니다. 조언을 주시면 감사하겠습니다. 감사합니다.

답변

1

TryTake가 설명 된 것처럼 작동하지 않습니다. 기본 BlockingCollection은 ConcurrentQueue를 사용하여 항목을 저장하고 TryTake는 대기열의 다음 항목을 제공된 out 참조에 할당합니다. 메시지는 기다려야 BlockingCollection.Take()를 사용할 수있는 상황에서 예를

BlockingCollection<object[]> sendBuffer = new BlockingCollection<object[]>(); 

object[] message = new object[2]; 
object[] message2 = new object[2]; 
// Add messages to the queue 
sendBuffer.Add(message); 
sendBuffer.Add(message2); 

object[] toSend; 
// Take next message off the queue 
sendBuffer.TryTake(out toSend); 

// toSend === message 

를 들어

를 보낼 경우 :

BlockingCollection<object[]> sendBuffer = new BlockingCollection<object[]>(); 
// CancellationTokenSource is used in place of your run variable. 
System.Threading.CancellationTokenSource cancellationSource 
= new System.Threading.CancellationTokenSource(); 

    private void Sender() 
    { 
     // Exit loop if cancellation requested 
     while (!cancellationSource.Token.IsCancellationRequested) 
     { 

      object[] obj; 

      try { 
       // Blocks until an item is available in sendBuffer or cancellationSource.Cancel() is called. 
       obj = sendBuffer.Take(cancellationSource.Token); 
      } catch (OperationCanceledException) { 
       // OperationCanceledException will be thrown if cancellationSource.Cancel() 
       // is called during call to sendBuffer.Take 
       break; 
      } catch (InvalidOperationException) { 
       // An InvalidOperationException means that Take() was called on a completed collection. 
       // See BlockingCollection<T>.CompleteAdding 
       break; 
      } 

      string IP = obj[0].ToString(); 
      string msg = obj[1].ToString(); 
      byte[] data = Encoding.UTF8.GetBytes(msg); 
      foreach (TcpClient tcpc in TcpClients) { 
       if ((tcpc.Client.RemoteEndPoint as IPEndPoint).Address.ToString() == IP) { 
        NetworkStream stream = tcpc.GetStream(); 
        stream.Write(data, 0, data.Length); 
        break; 
       } 
      }    
     } 
    } 
관련 문제