나는 다중 쓰레드 코드 조각을 가지고있다 - SQS로부터 데이터를 폴링하여 파이썬 큐에 추가하는 3 개의 쓰레드. 5 스레드는 파이썬 큐에서 메시지를 가져 와서 처리하고 백엔드 시스템으로 보냅니다. 여기 쓰레드 폴링 sqs를 처리하기위한 파이썬 큐에 추가하기
코드입니다 :python_queue = Queue.Queue()
class GetDataFromSQS(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, python_queue):
threading.Thread.__init__(self)
self.python_queue = python_queue
def run(self):
while True:
time.sleep(0.5) //sleep for a few secs before querying again
try:
msgs = sqs_queue.get_messages(10)
if msgs == None:
print "sqs is empty now"!
for msg in msgs:
#place each message block from sqs into python queue for processing
self.python_queue.put(msg)
print "Adding a new message to Queue. Queue size is now %d" % self.python_queue.qsize()
#delete from sqs
sqs_queue.delete_message(msg)
except Exception as e:
print "Exception in GetDataFromSQS :: " + e
class ProcessSQSMsgs(threading.Thread):
def __init__(self, python_queue):
threading.Thread.__init__(self)
self.python_queue = python_queue
self.pool_manager = PoolManager(num_pools=6)
def run(self):
while True:
#grabs the message to be parsed from sqs queue
python_queue_msg = self.python_queue.get()
try:
processMsgAndSendToBackend(python_queue_msg, self.pool_manager)
except Exception as e:
print "Error parsing:: " + e
finally:
self.python_queue.task_done()
def processMsgAndSendToBackend(msg, pool_manager):
if msg != "":
###### All the code related to processing the msg
for individualValue in processedMsg:
try:
response = pool_manager.urlopen('POST', backend_endpoint, body=individualValue)
if response == None:
print "Error"
else:
response.release_conn()
except Exception as e:
print "Exception! Post data to backend: " + e
def startMyPython():
#spawn a pool of threads, and pass them queue instance
for i in range(3):
sqsThread = GetDataFromSQS(python_queue)
sqsThread.start()
for j in range(5):
parseThread = ProcessSQSMsgs(python_queue)
#parseThread.setDaemon(True)
parseThread.start()
#wait on the queue until everything has been processed
python_queue.join()
# python_queue.close() -- should i do this?
startMyPython()
문제 : 내가 프로세스를 종료하는 경우 3 파이썬 노동자마다 몇 일 모든 (위 -p의 -H를 사용하여 모니터링) 무작위로 죽을 괜찮아이며, 스크립트를 다시 시작하십시오. 난 그 노동자가 의심되는 3 GetDataFromSQS 스레드입니다 ... 그리고 GetDataFromSQS 죽기 때문에 다른 다섯 노동자는 항상 잠을 실행하지만 파이썬 대기열에 데이터가 없습니다. 나는 파이썬에 익숙하지 않고 큐 로직과 스레드를 생성하기 위해이 튜토리얼을 따라 왔기 때문에 내가 여기서 잘못하고 있는지 확실하지 않다. - http://www.ibm.com/developerworks/aix/library/au-threadingpython/
미리 도움을 청하기 위해 고마워한다. 나는 내 문제를 분명히 설명하기를 희망한다.