2013-04-25 5 views
4

아래 두 가지 작업의 차이점은 무엇입니까?셀러리 작업 아래 두 가지 작업의 차이점

첫 번째 오류는 두 번째 오류는 발생합니다. 둘 다 동일하며, 그들은 추가 인수를 받아들이고 둘 다 같은 방식으로 호출됩니다.

ProcessRequests.delay(batch) **error object.__new__() takes no parameters** 


SendMessage.delay(message.pk, self.pk) **works!!!!** 

이제 오류의 의미를 알게되었지만 혼란은 왜 하나가 작동하는지 아닌지에 대한 것입니다.

작업 ...

1)

class ProcessRequests(Task): 
    name = "Request to Process" 
    max_retries = 1 
    default_retry_delay = 3 

    def run(self, batch): 
     #do something 

2)

class SendMessage(Task): 
    name = "Sending SMS" 
    max_retries = 10 
    default_retry_delay = 3 

    def run(self, message_id, gateway_id=None, **kwargs): 
     #do something 

전체 작업 코드 ....

from celery.task import Task 
from celery.decorators import task 

import logging 

from sms.models import Message, Gateway, Batch 
from contacts.models import Contact 
from accounts.models import Transaction, Account 


class SendMessage(Task): 
    name = "Sending SMS" 
    max_retries = 10 
    default_retry_delay = 3 

    def run(self, message_id, gateway_id=None, **kwargs): 
     logging.debug("About to send a message.") 

     # Because we don't always have control over transactions 
     # in our calling code, we will retry up to 10 times, every 3 
     # seconds, in order to try to allow for the commit to the database 
     # to finish. That gives the server 30 seconds to write all of 
     # the data to the database, and finish the view. 
     try: 
      message = Message.objects.get(pk=message_id) 
     except Exception as exc: 
      raise SendMessage.retry(exc=exc) 


     if not gateway_id: 
      if hasattr(message.billee, 'sms_gateway'): 
       gateway = message.billee.sms_gateway 
      else: 
       gateway = Gateway.objects.all()[0] 
     else: 
      gateway = Gateway.objects.get(pk=gateway_id) 

     # Check we have a credits to sent me message 
     account = Account.objects.get(user=message.sender) 
     # I'm getting the non-cathed version here, check performance!!!!! 
     if account._balance() >= message.length: 
      response = gateway._send(message) 

      if response.status == 'Sent': 
       # Take credit from users account. 
       transaction = Transaction(
        account=account, 
        amount=- message.charge, 
        description="Debit: SMS Sent", 

        ) 
       transaction.save() 
       message.billed = True 
       message.save() 
     else: 
      pass 


     logging.debug("Done sending message.") 


class ProcessRequests(Task): 
    name = "Request to Process" 
    max_retries = 1 
    default_retry_delay = 3 

    def run(self, message_batch): 
     for e in Contact.objects.filter(contact_owner=message_batch.user, group=message_batch.group): 
      msg = Message.objects.create(
       recipient_number=e.mobile, 
       content=message_batch.content, 
       sender=e.contact_owner, 
       billee=message_batch.user, 
       sender_name=message_batch.sender_name 
      ) 
      gateway = Gateway.objects.get(pk=2) 
      msg.send(gateway) 
      #replace('[FIRSTNAME]', e.first_name) 

시도 :

ProcessRequests.delay(batch) should work gives error error object.__new__() takes no parameters  
ProcessRequests().delay(batch) also gives error error object.__new__() takes no parameters 
+0

함수 서명과는 별도로'run()'을 어떻게 구현합니까? –

+0

'ProcessRequests(). delay (batch) '를 의미 했습니까? 그렇지 않으면 게시물의 코드에 아무런 문제가 없습니다. 제안 된대로, 우리에게'달리는 '시체를 보여주십시오. – gatto

+0

예.이 오류를 제공하는 예입니다. ProcessRequests(). 지연 (일괄 처리) 또는 ProcessRequest.지연 (일괄 처리)도 작동하지 않습니다 – GrantU

답변

6

내가 당신의 문제를 재현 할 수 있었다 :

import celery 
from celery.task import Task 

@celery.task 
class Foo(celery.Task): 
    name = "foo" 
    def run(self, batch): 
     print 'Foo' 

class Bar(celery.Task): 
    name = "bar" 
    def run(self, batch): 
     print 'Bar' 

# subclass deprecated base Task class 
class Bar2(Task): 
    name = "bar2" 
    def run(self, batch): 
     print 'Bar2' 

@celery.task(name='def-foo') 
def foo(batch): 
    print 'foo' 

출력 : 당신이 돈 왜 당신이 celery.task.Task 기본 클래스를 사용되지 않는 사용을 참조

In [2]: foo.delay('x') 
[WARNING/PoolWorker-4] foo 

In [3]: Foo().delay('x') 
[WARNING/PoolWorker-2] Foo 

In [4]: Bar().delay('x') 
[WARNING/PoolWorker-3] Bar 

In [5]: Foo.delay('x') 
TypeError: object.__new__() takes no parameters 

In [6]: Bar.delay('x') 
TypeError: unbound method delay() must be called with Bar instance as first argument (got str instance instead) 

In [7]: Bar2.delay('x') 
[WARNING/PoolWorker-1] Bar2 

, 이것이 ' unbound method 오류 발생 :

Definition: Task(self, *args, **kwargs) 
Docstring: 
Deprecated Task base class. 

Modern applications should use :class:`celery.Task` instead. 

나는 왜 ProcessRequests이 작동하지 않는지 모르겠다. 어쩌면 캐싱 문제 일 수도 있습니다. 이전에 클래스에 데코레이터를 적용하려고 시도했을 수도 있고 캐싱 된 것일 수도 있습니다. 그리고이 클래스는이 데코레이터를 Task 클래스에 적용하려고 할 때 발생하는 오류입니다.

.pyc 파일을 모두 삭제하고 셀러리 작업자를 다시 시작한 다음 다시 시도하십시오.

Bar() 잘못 즉, 때마다 이해가되지 않습니다 그래서 (클라이언트 측에서) 작업 클래스의 객체를 생성, 클래스 직접

  1. Tasks are instantiated only once per (worker) process를 사용하지 마십시오.
  2. Foo.delay() 또는 Foo().delay()은 데코레이터 name 인수와 클래스 name 속성의 조합에 따라 달라질 수 있습니다.

celery.registry.tasks 사전에서 작업 오브젝트를 취득하거나 대신 (내 예 foo) 기능에 대한 @celery.task 장식을 사용합니다.

+0

@ User7 : 작업 기능에 대한 기본 클래스를 지정할 수 있습니다 [여기] (http : //docs.celeryproject.org/en/latest/userguide/tasks.html#custom-task-classes). 귀하가 사용하는 케이스 옵션에서 사용자 정의 작업 클래스가 필요없이 데코레이터에 직접 전달 될 수 있습니다. – gatto

+0

'Foo'처럼 정의 된 클래스가 있는데,'Foo(). apply_async (args = [x])'오류가 발생했습니다.'TypeError : run()은 정확히 2 개의 인수를 취합니다 (1 주어진)', 그리고 그것은 나를 미치게합니다. 새 스타일 API로 작동하지 않을 수 있습니다. 예전 스타일 API로 잘 작동합니다. 너 나 좀 도와 줄 수있어? – elssar

+0

그건 내가하는 일이다. 이 질문을 참조하십시오. http://stackoverflow.com/questions/17608915/problems-with-the-new-style-celery-api – elssar