2013-04-26 3 views
3

다음을 수행하려고합니다. 큐를 생성 할 별도의 스레드에서 스트림 수신기를 시작하십시오. 나중에 큐가 처리 될 것입니다. 그러나 Storm은 실. 그것은저기서 붙어 있습니다.스톰 클러스터에서 스레드가 작동하지 않습니다.

그리고 내 코드는 다음과 같습니다

import os, sys, traceback, random, StringIO, time 
import random 
from uuid import uuid4 
from select import select 
from subprocess import Popen,PIPE 
import pyinotify 
import simplejson, pycurl 
import sys, signal 
import twitter 
import tweepy 
import Queue 
import threading 
try: 
    import simplejson as json 
except ImportError: 
    import json 

import storm 
queue = Queue.Queue() 

class MyModelParser(tweepy.parsers.ModelParser): 
    def parse(self, method, payload): 
     result = super(MyModelParser, self).parse(method, payload) 
     result._payload = json.loads(payload) 
     return result 

class CustomStreamListener(tweepy.StreamListener): 
    ''' Handles data received from the stream. ''' 
    def __init__(self, api, q): 
     self.api = api 
     self.queue = q 
     self.queue.put('lalala') 

    def on_status(self, status): 
     self.queue.put('%s' % status.author.screen_name) 
     self.queue.task_done() 

    def on_error(self, status_code): 
     return True # To continue listening 

    def on_timeout(self): 
     return True # To continue listening 

class Starter(): 
    def __init__(self,q): 
     self.queue = q 
     hashtag = ['justinbieber','snooki','daddy_yankee','MikeTyson','iamdiddy','lala'] 
     auth = self.t_auth() 
     api = tweepy.API(auth, parser=MyModelParser()) 
     stream = tweepy.streaming.Stream(auth,CustomStreamListener(api,queue)) 
     stream.filter(follow=None, track=hashtag) 

    def t_auth(self): 
     consumer_key="" 
     consumer_secret="" 
     access_key = "" 
     access_secret = "" 

     auth = tweepy.OAuthHandler(consumer_key, consumer_secret) 
     auth.set_access_token(access_key, access_secret) 

     return auth 

class TwitterSpout(storm.Spout): 
    SPOUT_NAME = "TwitterSpout" 
    queue = queue 

    def initialize(self, conf, context): 
     self.pid = os.getpid()  
     try: 
      t = threading.Thread(target=Starter(self.queue)) 
      t.daemon=True 
      t.start()   

     except KeyboardInterrupt, e: 
      self.log('\n\nStopping') 
      raise 
+0

너무 많은 파이썬을 모른다. 여전히 묻습니다. 단순히 대기열을 밀어 넣고 나중에 그곳에서 처리한다면 작동합니까? – abhi

답변

0

사용 pyleus (https://github.com/Yelp/pyleus)과 주둥이 구현 next_tuple (자신을)이 있어야합니다 아래의 예와 같이 출력 필드를 방출해야하는;

from pyleus.storm import Spout 


class DummySpout(Spout): 

    OUTPUT_FIELDS = ['sentence', 'name'] 

    def initialize(self): 
     pass 

    def next_tuple(self): 
     self.emit(("This is a sentence.", "spout",)) 


if __name__ == '__main__': 
    DummySpout().run() 

그러면 볼트를 씁니다.

from pyleus.storm import SimpleBolt 


class DummyBolt(SimpleBolt): 

OUTPUT_FIELDS = ['sentence'] 

def process_tuple(self, tup): 
    sentence, name = tup.values 
    new_sentence = "{0} says, \"{1}\"".format(name, sentence) 
    self.emit((new_sentence,), anchors=[tup]) 


if __name__ == '__main__': 
    DummyBolt().run() 

내가 어떻게 사용하고 있는지 볼 수 있습니다. https://github.com/Yelp/pyleus/issues/140

관련 문제