2017-01-27 2 views
0

저는 파이썬에서 UDP를 사용하여 많은 패킷 손실을 입었습니다. 패킷 손실을 원하지 않는다면 TCP를 사용해야한다는 것을 알고 있지만 보낸 사람에 대해 (전체) 제어권이 없습니다.파이썬, 패킷 손실로 UDP 패킷 받기

UDP 멀티 캐스트를 사용하여 초당 15 개의 이미지를 보내는 카메라입니다.

아래에서 지금 작성한 코드를 볼 수 있습니다. 다중 처리를 사용하여 생산자와 소비자 함수를 동시에 사용할 수 있습니다. 생산자 기능은 패킷을 포획하고 소비자 기능은 패킷을 처리하고 이미지를 .bmp 파일에 씁니다.

패키지의 바이트를 .bmp 파일에 쓰는 PacketStream 클래스를 작성했습니다.

카메라가 새 이미지를 보내면 첫 번째 바이트 = 0x01 인 첫 번째 패킷이 전송됩니다. 여기에는 이미지에 대한 정보가 들어 있습니다. 그런 다음 첫 번째 바이트 = 0x02로 612 패킷이 전송됩니다. 여기에는 이미지의 바이트 (508 바이트/패킷)가 포함됩니다.

초당 15 개의 이미지가 전송되기 때문에 초당 ~ 9000 개의 패킷이 전송됩니다. 알 수 있듯이이 속도는 ~ 22 패킷/ms로 이미지 당 빠른 속도로 발생합니다.

tcpdump 또는 wireshark를 사용하여 모든 패킷을 완벽하게 수신 할 수 있습니다. 하지만 아래 코드를 사용하면 패킷이 누락됩니다. 확실히 내 창 7 PC가이 문제를 처리 할 수 ​​있어야합니까? 나는 또한 나무 딸기 파이 3에 그것을 사용하고있다. 그리고 더 많거나 적은 패킷의 수가 놓친다. 따라서 코드에 문제가 있다고 생각합니다.

멀티 프로세싱 대신 스레딩, 큐 대신 파이프 스레딩과 같은 많은 작업을 시도했습니다.

는 또한 아무 소용

sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 3000000) 

와 소켓 버퍼를 증가했습니다.

파이썬에서는 가능한가요? 사전에

감사합니다,

import time 
from multiprocessing import Process, Queue 
import socket 
import struct 
from PIL import Image 


class PacketStream: 
    def __init__(self, output_path): 
     self.output_path = output_path 
     self.data_buffer = '' 
     self.img_id = -1 # -1 = waiting for start of new image 

    def process(self, data): 
     message_id = data[0] 
     if message_id == '\x01': 
      self.wrap_up_last_image() 
      self.img_id = ord(data[3]) 
      self.data_buffer = '' 
     if message_id == '\x02': 
      self.data_buffer += data[6:] 

    def wrap_up_last_image(self): 
     if self.img_id > 0: 
      n_bytes = len(self.data_buffer) 
      if n_bytes == 307200: 
       global i 
       write_image(self.output_path + str(i).zfill(7) + '_' + str(self.img_id).zfill(3) + '.bmp', 
          self.data_buffer) 
       i += 1 
      else: 
       print 'Image lost: %s bytes missing.' % (307200 - n_bytes) 


def write_image(path, data): 
    im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1) 
    im.save(path) 
    print time.time(), path 


def producer(q): 
    # setup socket 
    MCAST_GRP = '239.255.83.71' 
    MCAST_PORT = 2271 
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
    sock.bind(('', MCAST_PORT)) 
    mreq = struct.pack('4sl', socket.inet_aton(MCAST_GRP), socket.INADDR_ANY) 
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 
    while True: 
     q.put(sock.recv(512)) 


def consumer(q): 
    packet_stream = PacketStream('D:/bmpdump/') 
    while True: 
     data = q.get() 
     packet_stream.process(data) 

i = 0 
if __name__ == '__main__': 
    q = Queue() 

    t1 = Process(target=producer, args=(q,)) 
    t1.daemon = True # so they stop when the main prog stops 
    t1.start() 
    t2 = Process(target=consumer, args=(q,)) 
    t2.daemon = True 
    t2.start() 

    time.sleep(10.0) 

    print 'Program finished.' 

EDIT 모든 제안

감사합니다.

1) 이미 쓰레기 + 대기열을 시도했는데 '.join()도 큰 차이가없는 것 같습니다. 이제 문제는 제작자 스레드가 충분한 우선 순위를 얻지 못한다는 것입니다. 파이썬을 사용하여 이것을 늘리는 방법을 찾을 수 없습니까? 이것은 가능한가?

2) 아래 코드를 사용하여 약 10 % 만 잃을 수있었습니다. 프로세서의 핵심은 마지막 데이터 패키지는 코드를 개선하기 위해

import time 
import socket 
import struct 
from PIL import Image 


def write_image(path, data): 
    im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1) 
    im.save(path) 
    print time.time(), path 

def consume(data_buffer): 
    img_id = ord(data_buffer[0][1]) 
    real_data_buffer = [data[6:] for data in data_buffer] 
    data_string = ''.join(real_data_buffer) 

    global i 
    write_image('/media/pi/exthdd_02/bmpdump/' + str(i).zfill(7) + '_' + str(img_id).zfill(3) + '.bmp', data_string) 
    i += 1 

def producer(sock): 
    print 'Producer start' 
    data_buffer = [] 
    while True: 
     data = sock.recvfrom(512)[0] 
     if data[0] == '\x01': 
      data_buffer = [] 
     else: 
      data_buffer.append(data) 
     if len(data_buffer) == 612: 
      consume(data_buffer) 


# image counter 
i = 0 

# setup socket 
MCAST_GRP = '239.255.83.71' 
MCAST_PORT = 2271 
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
sock.bind((MCAST_GRP, MCAST_PORT)) 
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY) 
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 30000000) 

producer(sock) 
+0

UDP 소켓을 다룰 때''sock.recv''보다는''sock.recvfrom''을 사용해야한다고 생각합니다. 아마 그게 도움이 될까요? –

+0

Linux에서 sched_setscheduler() 나 유사한 API를 사용하거나 사용하고있는 다른 OS에서 비슷한 API를 사용하여 UDP 데이터 읽기 스레드/프로세스의 우선 순위를 높여야 할 수도 있습니다. Python API가 무엇인지 잘 모르겠습니다. 그 것에 해당합니다). 그렇게하면 독자의 스레드가 다른 작업에 의해 CPU에서 보류 될 확률이 줄어들어 전체 수신 버퍼 및 패킷 손실이 발생할 수 있습니다. –

답변

0

몇 가지 제안을 도착했습니다 즉, 패킷 스트림에서 일시 정지가있을 때 데이터를 소비하는 것입니다 ~ 25 %에서 (라즈베리 파이에)입니다 ,하지만 먼저 질문 : 물건을 느리게하는 것이 무엇인지를 모두 측정 했습니까? 예를 들어 시스템의 CPU 사용량을 살펴 보았습니다. 100 %를 치는 경우 패킷 손실의 원인이 될 수 있습니다. 대부분 유휴 상태이면 다른 일이 발생하며 문제는 코드의 성능과 관련이 없습니다.

  • 프로세스과 멀티 프로세싱을 사용하지 않는, 발생하는 직렬화 보낼

    • 사용 socket.recvfrom 대신 UDP 소켓을 다룰 때 sock.recv의 :

      이제 몇 가지 제안 코드를 개선하기 위해 하나의 프로세스에서 다른 프로세스로의 데이터는 우리가 ~ 9000 콜/초를 말하는 경우 성능 병목 현상이 될 수 있습니다. 대신 스레드 (threading + queue 모듈)을 사용해보십시오. 그러나 관찰 된 숫자를 제공하지 않으므로 실제로 말하기는 어렵습니다.

    • 패킷을 가져올 때 수신자의 버퍼를 빌드하기 위해 문자열 연결을 사용하지 마십시오. 이는 많은 새로운 임시 문자열 객체를 생성하고 항상 데이터를 복사합니다. 대신 각 패킷을 목록에 추가하고 모든 데이터를 수신하면 "".join(packets) 끝까지 한 번씩 모두 보내십시오.