2011-10-25 3 views
1

저는 Twitter의 스트리밍 API에 대한 연결을 만드는 작업 (서브 클래스로 celery.task.Task)을 만들고 있습니다. Twitter API 호출의 경우 tweepy를 사용하고 있습니다. 셀러리 문서에서 읽은 바와 같이 '작업은 모든 요청에 ​​대해 인스턴스화되지 않지만 글로벌 인스턴스로 작업 레지스트리에 등록됩니다.' 작업을 위해 apply_async (또는 지연)를 호출 할 때마다 원래 인스턴스화되었지만 발생하지 않는 작업에 액세스 할 것으로 예상했습니다. 대신 사용자 정의 작업 클래스의 새 인스턴스가 만들어집니다. 이 API는 tweepy API 호출로 생성 된 원래 연결을 종료 할 수있는 유일한 방법이기 때문에 원래 사용자 정의 작업에 액세스 할 수 있어야합니다.셀러리가 여러 개의 작업 인스턴스를 만듭니다.

from celery import registry 
from celery.task import Task 

class FollowAllTwitterIDs(Task): 
    def __init__(self): 
     # requirements for creation of the customstream 
     # goes here. The CustomStream class is a subclass 
     # of tweepy.streaming.Stream class 

     self._customstream = CustomStream(*args, **kwargs) 

    @property 
    def customstream(self): 
     if self._customstream: 
      # terminate existing connection to Twitter 
      self._customstream.running = False 
     self._customstream = CustomStream(*args, **kwargs) 

    def run(self): 
     self._to_follow_ids = function_that_gets_list_of_ids_to_be_followed() 

     self.customstream.filter(follow=self._to_follow_ids, async=False) 
follow_all_twitterids = registry.tasks[FollowAllTwitterIDs.name] 

을 그리고 장고보기

def connect_to_twitter(request): 
    if request.method == 'POST': 
     do_stuff_here() 
     . 
     . 
     . 

     follow_all_twitterids.apply_async(args=[], kwargs={}) 

    return 

을 위해 어떤 도움을 주시면 감사하겠습니다 :

여기이 도움이 될 경우 코드의 일부 조각입니다. : D는

EDIT : 질문에 대한 추가 컨텍스트를 들어

는 CustomStream 객체는 필터() 메소드가 호출 될 때마다 httplib.HTTPSConnection 인스턴스를 생성한다. 이 연결을 만들려는 시도가있을 때마다이 연결을 닫아야합니다. customstream.running을 False로 설정하여 연결을 닫습니다.

답변

0

당신이 어떤 이유로 없는 생각하면 작업은 당신이

인쇄 ("인스턴스화") 수입 추적 traceback.print_stack()

를 추가 제안, 한 번 인스턴스화한다

에서 Task.__init__까지의 방법을 사용하면 이것이 어디에서 일어날 지 알 수 있습니다. 답장을

from celery.task import Task, task 

class TwitterTask(Task): 
    _stream = None 
    abstract = True 

    def __call__(self, *args, **kwargs): 
     try: 
      return super(TwitterTask, self).__call__(stream, *args, **kwargs) 
     finally: 
      if self._stream: 
       self._stream.running = False 

    @property 
    def stream(self): 
     if self._stream is None: 
      self._stream = CustomStream() 
     return self._stream 

@task(base=TwitterTask) 
def follow_all_ids(): 
    ids = get_list_of_ids_to_follow() 
    follow_all_ids.stream.filter(follow=ids, async=false) 
+0

감사 :

나는 당신의 작업이 더 나은 다음과 같이 표현 될 수있다 생각합니다. 나는 위의 구현을 시도하고 셀러리가 어디에 있는지 궁금했다 .utils.cached_property가 사용 되었습니까? – Christian

+0

실수로 거기에 추가되었습니다. :) 그것을 제거하겠습니다. – asksol

관련 문제