0

분산 된 프로그램을 보았습니다. 네트워크의 모든 노드 (가상 시스템)는 모든 다른 노드와 데이터를주고받습니다 (연결을 통해). 데이터를 보내기 전에 모든 노드는 모든 다른 노드 (단일 소스 노드 포함)에 소켓을 가지고 있습니다. 3 초 후에 소스는 네트워크의 다른 노드 각각에 다른 파일 청크를 보내기 시작합니다. 모든 노드는 첫 번째 패킷이 도착한 후 수신 청크를 전달하기 시작합니다.피어로 연결 재설정 [Errno 104]

프로그램이 오류없이 여러 번 성공적으로 완료됩니다. 그러나 때로는 하나의 임의 노드가 연결 연결을 재설정하지만 (여전히 나가는 연결을 통해 데이터를 보냅니다).

각 노드에는 n-2 센더 스레드와 n-1 리시버 스레드가 있습니다.

전송 기능 :

def relaySegment_Parallel(self): 
     connectionInfoList = [] 
     seenSegments = [] 
     readyServers = [] 
     BUFFER_SIZE = Node.bufferSize 
     while len(readyServers) < self.connectingPeersNum-len(Node.sources) and self.isMainThreadActive(): #Data won't be relayed to the sources 
      try: 
       tempIp = None 
       for ip in Node.IPAddresses: 
        if ip not in readyServers and ip != self.ip and ip not in self.getSourcesIp(): 
         tempIp = ip 
         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
         s.connect((ip, Node.dataPort)) 
         connectionInfoList.append((s, ip)) 
         readyServers.append(ip) 
         if Node.debugLevel2Enable: 
          print "RelayHandler: Outgoing connection established with IP: " + str(ip) 
      except socket.error, v: 
       errorcode = v[0] 
       if errorcode == errno.ECONNRESET: 
        print "(RelayHandler) Connection reset ! Node's IP: " + str(tempIp) 
       if errorcode == errno.ECONNREFUSED: 
        print "(RelayHandler) Node " + str(tempIp) + " are not ready yet!" 
       continue 
      except: 
       print "Error: Cannot connect to IP: " + str (tempIp) 
       continue 
      print "(RelayHandler) Ready to relay data to " + str(len(readyServers)) + " numeber of servers." 
     try: 
      pool = ThreadPool(processes = Node.threadPoolSize) 
      while Node.terminateFlag == 0 and not self.isDistributionDone() and self.isMainThreadActive(): 
       if len(self.toSendTupleList) > 0: 
        self.toSendLock.acquire() 
        segmentNo, segmentSize, segmentStartingOffset, data = self.toSendTupleList.pop(0) 
        self.toSendLock.release() 
        if len(data) > 0: 
         if segmentNo not in seenSegments: 
          #Type: 0 = From Sourece , 1 = From Rlayer 
          #Sender Type/Segment No./Segment Size/Segment Starting Offset/ 
          tempList = [] 
          for s, ip in connectionInfoList: 
           tempData = "1/" + str(self.fileSize) + "/" + str(segmentNo) + "/" + str(segmentSize) + "/" + str(segmentStartingOffset) + "/" 
           tempList.append((s, ip, tempData)) 
          pool.map(self.relayWorker, tempList) 
          seenSegments.append(segmentNo) 
         relayList = [] 
         for s, ip in connectionInfoList: 
          relayList.append((s, ip, data)) 
         pool.map(self.relayWorker, relayList) 
      for s, ip in connectionInfoList: 
       s.shutdown(1)# 0:Further receives are disallowed -- 1: Further sends are disallow/sends -- 2: Further sends and receives are disallowed. 
       s.close() 
      pool.close() 
      pool.join() 
     except socket.error, v: 
      errorcode=v[0] 
      if errorcode==errno.ECONNREFUSED: 
       print "(RelayHandler) Error: Connection Refused in RelaySegment function. It can not connect to: ", ip 
      else: 
       print "\n(RelayHandler) Error1 in relaying segments (Parallel) to ", ip, " !!! ErrorCode: ", errorcode 
      traceback.print_exception(*sys.exc_info()) 
     except: 
      print "\n(RelayHandler) Error2 in relaying segments (Parallel) to ", ip 
      traceback.print_exception(*sys.exc_info()) 

수신 기능 :

def receiveDataHandler(self): 
     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     try: 
      s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)# Allows us to resue the port immediately after termination of the program 
      s.bind((self.ip, Node.dataPort)) 
      s.listen(Node.MaxNumClientListenedTo) 
      threadsList = [] 
      fHandler = fileHandler(self.inFileAddr, Node.bufferSize) 
      isStart = False 
      executionTime = 0 
      connectedPeersSofar = 0 
      while (not self.connectingPeersNum == connectedPeersSofar) and self.isMainThreadActive() and Node.terminateFlag == 0 and not self.isReceptionDone(): 
       conn, ipAddr = s.accept() 
       thread_receiveData = Thread2(target = self.receiveData_Serial, args = (conn, ipAddr, fHandler)) 
       thread_receiveData.start() 
       if Node.debugLevel2Enable: 
        print 'Receive Handler: New thread started for connection from address:', ipAddr 
       connectedPeersSofar += 1 
       threadsList.append(thread_receiveData) 
       if isStart == False: 
        isStart = True 
      print "(RecieiverHandeler) Receiver stops listening: Peers Num "+str(self.connectingPeersNum) +i " connected peers so far: " + str(connectedPeersSofar) 
      for i in range(0, len(threadsList)): 
       self.startTime = threadsList[i].join() 
      if isStart: 
       executionTime = float(time.time()) - float(self.startTime) 
      else: 
       print "\n\t No Start! Execution Time: --- 0 seconds ---" , "\n" 
      s.shutdown(2)# 0:Further receives are disallowed -- 1: Further sends are disallow/sends -- 2: Further sends and receives are disallowed. 
      s.close() 
      return executionTime 
     except socket.error, v: 
      errorcode = v[0] 
      if errorcode == 22: # 22: Invalid arument 
       print "Error: Invalid argument in connection acceptance (receive data handler)" 
      elif errorcode==errno.ECONNREFUSED: 
       print "Error: Connection Refused in receive" 
      else: 
       print "Error1 in Data receive Handler !!! ErrorCode: ", errorcode 
      traceback.print_exception(*sys.exc_info()) 
     except: 
      print "Error2 in Data receive Handler !!!" 
      traceback.print_exception(*sys.exc_info()) 

노드 (랜덤 고장 노드를 포함한) 모든 다른 노드에 연결된 모든 노드가 인쇄의 전송 쓰레드. 그러나, 임의의 노드의 수신 기능은

s.accept()

에 대기하고 연결하지만 연결하는 마지막 하나 인 단일 소스에서 연결을 허용하지 않습니다. 랜덤 노드는 아무 예외도 발생시키지 않고 대기합니다.

s.accept()

어느 한하지만 마지막 하나를 허용하지 않는 동안

임의의 노드의

s.listen()

(TCP의 protocole)이 보낸 사람이 서로 연결되어 있음을 생각하게 보인다. 그런 다음 어떤 이유로 연결을 다시 설정하므로 다른 사람 (보낸 사람)이 데이터를 보내려고 할 때 "연결 재설정 연결"예외를 발생시키는 것입니다. 오류없이 완료된 유일한 발신자는 연결할 마지막 발신자입니다.

오류 : 왜 무슨 일이 일어나고 있음을

Traceback (most recent call last): 
File "/home/ubuntu/DCDataDistribution/Node.py", line 137, in relayWorker 
socketConn.sendall(data) 
File "/usr/lib/python2.7/socket.py", line 224, in meth 
return getattr(self._sock,name)(*args) 
error: [Errno 104] Connection reset by peer 

?

FYI : Amazon EC2 인스턴스에서 프로그램을 실행하고 있습니다. 각 인스턴스 유형은 t2.micro (1 vCPU, 2.5 GHz, Intel Xeon Family (최대 3.3 GHz) 및 1 GiB 메모리)입니다. Ubuntu Server 14.04 LTS (HVM)가 모든 인스턴스에서 실행됩니다. pool의 일부 relayWorker 스레드가 여전히 미완성 될 수 있지만

+0

진단을위한 코드가 충분하지 않아 설명을 따르기가 약간 어렵습니다. 추측과 같이 여러분의 "무작위"수신 노드가 잘못된 소켓을 닫아서 'ECONNRESET' 오류가 발생하는 것으로 의심됩니다. 어쩌면 데이터 구조 스레드 동기화 문제? –

+0

@GilHamilton 코드의 제거 된 부분을 추가했습니다.사실 "임의의"수신 노드는 수신 기능의 "while"블록에서 벗어나지 않으며 다른 사람들이 연결되기를 기다립니다 (그러나 다른 사람들은 연결되었다고 말합니다). 따라서 노드는 "while"블록에서 나오지 않아 어떤 소켓도 닫을 수 없습니다. 그리고 소켓을 닫는 유일한 사람은 "receiveData_Serial"함수가 아닌 "receiveDataHandler"함수입니다. 더 이상의 설명이 필요하면 더 행복 할 것입니다. –

+0

설명에 코드를 맞추는 데 여전히 어려움이 있습니다. 예를 들어, "소켓을 닫은 사람은 'receiveDataHandler'입니다.하지만 수신 된 소켓이 전혀 수신되지 않습니다. 수신 대기중인 소켓 만 표시됩니다. 제안 : 노드 A가 노드 B에 연결되었다고 말하면, 원격 포트 번호가 connect (노드 A의 getpeername)에 대해 무엇인지 알아보십시오. 그런 다음 노드 B에서'netstat -atn'을 실행하여 그 포트가 어떤 상태인지 알아보십시오. –

답변

0
  for s, ip in connectionInfoList: 
       s.shutdown(1)# 0:Further receives are disallowed -- 1: Further sends are disallow/sends -- 2: Further sends and receives are disallowed. 
       s.close() 
      pool.close() 
      pool.join() 

당신은 연결을 shutdown. 순서 반전 :

   pool.close() 
       pool.join() 
       for s, ip in connectionInfoList: 
        s.close() 
관련 문제