2017-05-15 1 views
1

멀티 테넌트 시스템을 설정했습니다. RabbitMQ가있는 셀러리가 준비되었습니다. 모델의 사전 저장 신호에서 전자 메일을 트리거하는 시스템이 있습니다. 시험은 벌금을 실행하고 심지어 메일 기능을 통해 taversing됩니다셀러리 작업 테스트

class BaseSetup(TenantTestCase): 
    def setup_tenant(self, tenant): 
     """ 
     Add any additional setting to the tenant before it get saved. This is required if you have 
     required fields. 
     """ 
     user = User.objects.create(email="[email protected]", is_active=True) 
     user.set_password('dummy') 
     tenant.owner = user 
     user.save() 

    def setUp(self): 
     self.sync_shared() 
     tenant_model = get_tenant_model() 
     app.conf.update(CELERY_ALWAYS_EAGER=True) 
     app.conf.update(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True) 
     print app.conf.CELERY_ALWAYS_EAGER 
     test_schema_name = self.get_test_schema_name() 
     test_tenant_domain_name = self.get_test_tenant_domain() 
     self.tenant = tenant_model.objects.filter(schema_name=test_schema_name).first() 
     self.c = TenantClient(self.tenant) 

    def test_core_student_put_api(self): 
     response = self.c.post('http://test.localhost:8000/login/',{'email':'[email protected]','password':'dummy'}) 
     response_add_student=self.c.post('http://test.localhost:8000/student_admin/',{'email':'[email protected]','stu_number':'100','role':'STUDENT'},**{'HTTP_AUTHORIZATION':'JWT '+response.data['token']}) 

     self.assertEqual(response_put_employee.status_code, 201) 

: 이것에 대한

@receiver(pre_save, sender=Student) 
def invite_stu(sender, instance, **kwargs): 
    user_email = instance.email 
    subject = 'dummy' 
    message = '' 

    html_message = 'dummy' 
    from_email = my email id 
    to_list = [user_email] 
    send_html_mail(subject, message, html_message, settings.DEFAULT_FROM_EMAIL, to_list) 

    # super(Employee, self).save() 
    return instance 

내가 테스트 케이스를 쓰고 있어요 : 다음은 코드입니다. 그러나 셀러리는 테스트를 통해 메일을 보내면 메일을 보내고 보내지 않지만, API를 통해 학생을 추가하면 이메일을 보냅니다.

어떤 방법 으로든이 작업을 할 수 있습니까? 감사!

답변

1

해결책을 찾았습니다. 내 작업은 정기적 인 작업이었고, 장고 테스트는 테스트 중에 새로운 테스트 DB를 만들고 해당 데이터베이스에 저장된 메일러 큐가 발견되었고 셀러리가 테스트 데이터베이스에서 픽업 할 수 없었습니다. 그래서, 명시 적으로 호출해야했습니다. 내 테스트 케이스의 작업은 다음과 같습니다.

@override_settings(task_eager_propagates=True,task_always_eager=True,broker_url='memory://',backend='memory') 
def test_core_student_put_api(self): 
     response = response = self.c.post('http://test.localhost:8000/login/',{'email':'[email protected]','password':'dummy'}) 
     response_add_student=self.c.post('http://test.localhost:8000/student_admin/',{'email':'[email protected]','stu_number':'100','role':'STUDENT'},**{'HTTP_AUTHORIZATION':'JWT '+response.data['token']}) 

     self.assertTrue(project_tasks.delay()) 
     self.assertEqual(response_put_employee.status_code, 200) 

테스트 구성을 위해 샐러리 (celery) 문서에서 제안한대로 셀로리 설정을 초과해야했습니다. project_tasks.delay()assertTrue으로 명시 적으로 호출했습니다. 내 작업에는 간단한 send_mail() 명령이 포함되어 있습니다. 신호에서 올바른 데이터를 가져 오는 중입니다. invite_stu send_html_mail()은 send_html_mail()을 호출하여 테스트 대기열에있는 메일 대기열에 추가되며 주기적으로 수행되는 작업 (주기적 작업)을 명시 적으로 호출하면 메일을 보냅니다. 셀러리가 지각 할 수없는 정기적 인 작업을 임시 데이터베이스 문제로 인해 테스트 케이스, 그것에 대한 좀 더 연구를해야하지만, 현재 내 문제를 해결합니다.