2014-10-09 4 views
1

셀프를 사용하여 mongodb에 큰 데이터를 삽입하려고하지만 mongodb의 동시성 문제가 있습니다. 한 번에 하나 이상의 작업을 셀러리에 보내면 데이터의 일부가 mongodb에 삽입되고 다른 데이터는 삽입되지 않습니다. 나는 mongodb가 삽입 작업에 데이터베이스를 잠그기 때문에 그런 것 같지만 데이터베이스에 데이터를 삽입하기 위해 동일한 유형의 여러 태스크를 보낼 수있는 솔루션이 필요합니다. 데이터베이스가 잠금 해제 될 때까지 기다리는 경우 데이터베이스가 잠겨 있는지 확인하십시오. 여기에 내 코드의 일부는 다음과 같습니다 기본 다중으로mongoengine을 사용하여 mongodb에 데이터를 삽입하는 방법

@celery.task(name='celery_tasks.add_book_product') 
def add_book_product(product_dict, store_id): 

    connect(DefaultConfig.MONGODB_DB, host=DefaultConfig.MONGODB_HOST) 

    store_obj = Store.objects.get(pk=store_id) 

    try: 
     book = Books.objects.get(pk=product_dict['RawBook']) 

     try: 
      product_obj = Product.objects.get(store=store_obj, related_book=book, kind='book') 
      print("Product {} found for store {}".format(product_obj.id, store_obj.id)) 
      product_obj.count = int(product_dict['count']) 
      product_obj.buy_price = int(product_dict['buy_book']) 
      product_obj.sell_price = int(product_dict['sell_book']) 

      product_obj.save() 

     except (DoesNotExist, ValidationError): 
      product_obj = Product(store=store_obj, 
            related_book=book, 
            kind='book', 
            count=int(product_dict['count']), 
            buy_price=int(product_dict['buy_book']), 
            sell_price=int(product_dict['sell_book']), 
            name=book.name_fa) 

      product_obj.save() 

      print("Appending books to store obj...") 
      store_obj.products.append(product_obj) 
      store_obj.save() 
      print("Appending books to store obj done") 

     return "Product {} saved for store {}".format(product_obj.id, store_obj.id) 
    except (DoesNotExist, ValidationError): 
     traceback.print_exc() 
     return "Product with raw book {} does not exist.".format(product_dict['RawBook']) 

답변

2

는 셀러리의 작업의 동시 실행을 수행하는 데 사용됩니다. 그러나 한 번에 하나의 작업 만 실행되도록하는 두 가지 방법이 있습니다.

해결 방법 1 : 당신이

celery -A your_app worker -l info 

와 셀러리 노동자를 시작할 때

기본 동시성은 컴퓨터가 가지고있는 코어의 수와 같다. 그래서 이런 일을 시작하면

celery -A your_app worker -l info -c 1 

주어진 시간에 오직 하나의 작업 만 실행합니다. 실행해야 할 다른 작업이 있으면 새 대기열을 시작하고 작업자에게 할당 할 수 있습니다.

해결 방법 2 :

이 조금 복잡하다. 당신은 작업에서 자물쇠를 사용해야합니다.

if acquire_lock(): 
    try: 
     #do something 
    finally: 
     release_lock() 
    return 

자세한 내용은 Celery documentation에서 확인할 수 있습니다.

관련 문제