3
다음을 수행하려고합니다. 큐를 생성 할 별도의 스레드에서 스트림 수신기를 시작하십시오. 나중에 큐가 처리 될 것입니다. 그러나 Storm
은 실. 그것은저기서 붙어 있습니다.스톰 클러스터에서 스레드가 작동하지 않습니다.
그리고 내 코드는 다음과 같습니다
import os, sys, traceback, random, StringIO, time
import random
from uuid import uuid4
from select import select
from subprocess import Popen,PIPE
import pyinotify
import simplejson, pycurl
import sys, signal
import twitter
import tweepy
import Queue
import threading
try:
import simplejson as json
except ImportError:
import json
import storm
queue = Queue.Queue()
class MyModelParser(tweepy.parsers.ModelParser):
def parse(self, method, payload):
result = super(MyModelParser, self).parse(method, payload)
result._payload = json.loads(payload)
return result
class CustomStreamListener(tweepy.StreamListener):
''' Handles data received from the stream. '''
def __init__(self, api, q):
self.api = api
self.queue = q
self.queue.put('lalala')
def on_status(self, status):
self.queue.put('%s' % status.author.screen_name)
self.queue.task_done()
def on_error(self, status_code):
return True # To continue listening
def on_timeout(self):
return True # To continue listening
class Starter():
def __init__(self,q):
self.queue = q
hashtag = ['justinbieber','snooki','daddy_yankee','MikeTyson','iamdiddy','lala']
auth = self.t_auth()
api = tweepy.API(auth, parser=MyModelParser())
stream = tweepy.streaming.Stream(auth,CustomStreamListener(api,queue))
stream.filter(follow=None, track=hashtag)
def t_auth(self):
consumer_key=""
consumer_secret=""
access_key = ""
access_secret = ""
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
return auth
class TwitterSpout(storm.Spout):
SPOUT_NAME = "TwitterSpout"
queue = queue
def initialize(self, conf, context):
self.pid = os.getpid()
try:
t = threading.Thread(target=Starter(self.queue))
t.daemon=True
t.start()
except KeyboardInterrupt, e:
self.log('\n\nStopping')
raise
너무 많은 파이썬을 모른다. 여전히 묻습니다. 단순히 대기열을 밀어 넣고 나중에 그곳에서 처리한다면 작동합니까? – abhi