2012-09-08 9 views
3

sqlalchemy를 사용하여 네트워크를 통해 데이터베이스를 업데이트하는 작업이 제공됩니다. 나는 파이썬의 스레딩 모듈을 사용하기로 결정했다. 현재 큐를 통해 작업 단위를 소비하도록 다른 스레드에 지시하기 위해 1 스레드, 일명 생산자 스레드를 사용하고 있습니다.다중 스레드 및 Sqlalchemy

생산자 스레드는 다음과 같이 수행합니다

def produce(self, last_id): 
    unit = session.query(Request).order_by(Request.id) \ 
     .filter(Request.item_id == None).yield_per(50) 
    self.queue.put(unit, True, Master.THREAD_TIMEOUT)  

소비자 스레드가이 비슷한 않지만 :

def consume(self): 
    unit = self.queue.get() 
    request = unit 
    item = Item.get_item_by_url(request) 
    request.item = item 
    session.add(request) 
    session.flush() 

을 내가 SQLAlchemy의의 범위 세션을 사용하고 있습니다 :

session = scoped_session(sessionmaker(autocommit=True, autoflush=True, bind=engine)) 

그러나 예외가 발생합니다.

"sqlalchemy.exc.InvalidRequestError: Object FOO is already attached to session '1234' (this is '5678')" 

이 예외는 다른 스레드에 속해 있기 때문에 소비자가 다른 범위의 세션을 사용하는 동안 하나의 세션 (생산자 세션)에서 요청 개체가 생성된다는 사실에서 알 수 있습니다.

내 프로덕션 스레드가 request.id에서 대기열로 내 제작자 스레드를 전달하는 동안 소비자가 요청 코드를 불러 와서 요청 개체를 검색해야합니다.

request = session.query(Request).filter(Request.id == request_id).first() 

이 솔루션은 다른 네트워크 호출과 관련되어있어 분명히 적합하지 않기 때문에이 솔루션이 마음에 들지 않습니다.

  1. 생산자의 db 호출 결과를 낭비하지 않도록 할 수있는 방법이 있습니까?
  2. 하나 이상의 ID가 작업 단위로 대기열로 전달되도록 "생산품"을 작성하는 방법이 있습니까?

의견 환영합니다.

답변

4

대기열에 넣기 전에 Request 인스턴스를 주 스레드 세션에서 분리 한 다음 대기열에서 다시 가져올 때 대기열 처리 스레드 세션에 연결해야합니다.

는 분리 요청을 전달 세션에 .expunge()를 호출하려면 :

session.expunge(unit) 

다음 큐 스레드를 처리 할 때, merging하여 재 부착; load 플래그를 False로 설정하여 데이터베이스 왕복을 다시 방지하십시오.

session.merge(request, load=False)