저는 Twitter의 스트리밍 API에 대한 연결을 만드는 작업 (서브 클래스로 celery.task.Task)을 만들고 있습니다. Twitter API 호출의 경우 tweepy를 사용하고 있습니다. 셀러리 문서에서 읽은 바와 같이 '작업은 모든 요청에 대해 인스턴스화되지 않지만 글로벌 인스턴스로 작업 레지스트리에 등록됩니다.' 작업을 위해 apply_async (또는 지연)를 호출 할 때마다 원래 인스턴스화되었지만 발생하지 않는 작업에 액세스 할 것으로 예상했습니다. 대신 사용자 정의 작업 클래스의 새 인스턴스가 만들어집니다. 이 API는 tweepy API 호출로 생성 된 원래 연결을 종료 할 수있는 유일한 방법이기 때문에 원래 사용자 정의 작업에 액세스 할 수 있어야합니다.셀러리가 여러 개의 작업 인스턴스를 만듭니다.
from celery import registry
from celery.task import Task
class FollowAllTwitterIDs(Task):
def __init__(self):
# requirements for creation of the customstream
# goes here. The CustomStream class is a subclass
# of tweepy.streaming.Stream class
self._customstream = CustomStream(*args, **kwargs)
@property
def customstream(self):
if self._customstream:
# terminate existing connection to Twitter
self._customstream.running = False
self._customstream = CustomStream(*args, **kwargs)
def run(self):
self._to_follow_ids = function_that_gets_list_of_ids_to_be_followed()
self.customstream.filter(follow=self._to_follow_ids, async=False)
follow_all_twitterids = registry.tasks[FollowAllTwitterIDs.name]
을 그리고 장고보기
def connect_to_twitter(request):
if request.method == 'POST':
do_stuff_here()
.
.
.
follow_all_twitterids.apply_async(args=[], kwargs={})
return
을 위해 어떤 도움을 주시면 감사하겠습니다 :
여기이 도움이 될 경우 코드의 일부 조각입니다. : D는
EDIT : 질문에 대한 추가 컨텍스트를 들어
는 CustomStream 객체는 필터() 메소드가 호출 될 때마다 httplib.HTTPSConnection 인스턴스를 생성한다. 이 연결을 만들려는 시도가있을 때마다이 연결을 닫아야합니다. customstream.running을 False로 설정하여 연결을 닫습니다.
감사 :
나는 당신의 작업이 더 나은 다음과 같이 표현 될 수있다 생각합니다. 나는 위의 구현을 시도하고 셀러리가 어디에 있는지 궁금했다 .utils.cached_property가 사용 되었습니까? – Christian실수로 거기에 추가되었습니다. :) 그것을 제거하겠습니다. – asksol