2014-09-01 2 views
3

나는 다중 쓰레드 코드 조각을 가지고있다 - SQS로부터 데이터를 폴링하여 파이썬 큐에 추가하는 3 개의 쓰레드. 5 스레드는 파이썬 큐에서 메시지를 가져 와서 처리하고 백엔드 시스템으로 보냅니다. 여기 쓰레드 폴링 sqs를 처리하기위한 파이썬 큐에 추가하기

코드입니다 :

python_queue = Queue.Queue() 

class GetDataFromSQS(threading.Thread): 
    """Threaded Url Grab""" 
    def __init__(self, python_queue): 
     threading.Thread.__init__(self) 
     self.python_queue = python_queue 

    def run(self): 
     while True: 
      time.sleep(0.5) //sleep for a few secs before querying again 
      try: 
       msgs = sqs_queue.get_messages(10) 
       if msgs == None: 
        print "sqs is empty now"! 
       for msg in msgs: 
        #place each message block from sqs into python queue for processing 
        self.python_queue.put(msg) 
        print "Adding a new message to Queue. Queue size is now %d" % self.python_queue.qsize() 
        #delete from sqs 
        sqs_queue.delete_message(msg) 
      except Exception as e: 
       print "Exception in GetDataFromSQS :: " + e 


class ProcessSQSMsgs(threading.Thread): 
    def __init__(self, python_queue): 
     threading.Thread.__init__(self) 
     self.python_queue = python_queue 
     self.pool_manager = PoolManager(num_pools=6) 

    def run(self): 
     while True: 
      #grabs the message to be parsed from sqs queue 
      python_queue_msg = self.python_queue.get() 
      try: 
       processMsgAndSendToBackend(python_queue_msg, self.pool_manager) 
      except Exception as e: 
       print "Error parsing:: " + e 
      finally: 
       self.python_queue.task_done() 

def processMsgAndSendToBackend(msg, pool_manager): 
    if msg != "": 
     ###### All the code related to processing the msg 
     for individualValue in processedMsg: 
      try: 
       response = pool_manager.urlopen('POST', backend_endpoint, body=individualValue) 
       if response == None: 
        print "Error" 
       else: 
        response.release_conn() 
      except Exception as e: 
       print "Exception! Post data to backend: " + e 


def startMyPython(): 
    #spawn a pool of threads, and pass them queue instance 
    for i in range(3): 
     sqsThread = GetDataFromSQS(python_queue) 
     sqsThread.start() 

    for j in range(5): 
     parseThread = ProcessSQSMsgs(python_queue) 
     #parseThread.setDaemon(True) 
     parseThread.start() 

    #wait on the queue until everything has been processed 
    python_queue.join() 
    # python_queue.close() -- should i do this? 

startMyPython() 

문제 : 내가 프로세스를 종료하는 경우 3 파이썬 노동자마다 몇 일 모든 (위 -p의 -H를 사용하여 모니터링) 무작위로 죽을 괜찮아이며, 스크립트를 다시 시작하십시오. 난 그 노동자가 의심되는 3 GetDataFromSQS 스레드입니다 ... 그리고 GetDataFromSQS 죽기 때문에 다른 다섯 노동자는 항상 잠을 실행하지만 파이썬 대기열에 데이터가 없습니다. 나는 파이썬에 익숙하지 않고 큐 로직과 스레드를 생성하기 위해이 튜토리얼을 따라 왔기 때문에 내가 여기서 잘못하고 있는지 확실하지 않다. - http://www.ibm.com/developerworks/aix/library/au-threadingpython/

미리 도움을 청하기 위해 고마워한다. 나는 내 문제를 분명히 설명하기를 희망한다.

답변

4

스레드 매달리기 문제는 sqs 큐의 핸들을 가져 오는 것과 관련이있었습니다. 저는 자격 증명 관리에 IAM을 사용했고 sq에 연결하기 위해 boto sdk를 사용했습니다.

이 문제의 근본 원인은 boto 패키지가 AWS에서 auth에 대한 메타 데이터를 읽는 중이었고 한 번 실패했습니다.

해결 방법은 boto 구성을 편집하여 AWS에 대한 인증 호출을 수행하려는 시도를 늘리는 것입니다.

[BOTO] metadata_service_num_attempts = 5

(https://groups.google.com/forum/#!topic/boto-users/1yX24WG3g1E)

관련 문제