2012-03-08 4 views
21

scoped_session을 사용하여 세션 개체를 구성한 다음 sqlalchemy 설명서에서 알 수 있듯이 올바르게 열고 데이터베이스 세션을 닫는 방법을 이해하는 데 어려움이 있습니다. 반환 된 세션을 사용하십시오. 스레드를 생성하는 객체는 스레드 세이프이므로 기본적으로 모든 스레드는 자신의 세션을 갖게 될 것이고 문제는 발생하지 않을 것입니다. 이제 아래의 예제가 작동합니다. 세션을 제대로 닫았는지 확인하기 위해 무한 루프에 넣었습니다. 올바르게 ("SHOW PROCESSLIST;"를 실행하여 mysql에서) 모니터하면 연결이 계속 커지고 닫히지 않습니다. , 비록 session.close()를 사용하고 각 실행이 끝날 때 scoped_session 객체를 제거하더라도. 내가 도대체 ​​뭘 잘못하고있는 겁니까? 더 큰 응용 프로그램에서 필자의 목표는 필 요한 최소한의 데이터베이스 연결 수를 사용하는 것입니다. 왜냐하면 현재 작업중인 구현은 필요한 모든 메서드에 새로운 세션을 만들어 반환하기 전에 닫는 것이 비효율적이기 때문입니다.SQLAlchemy 다중 스레드 응용 프로그램에서 적절한 세션 처리

from sqlalchemy import create_engine 
from sqlalchemy.orm import sessionmaker, scoped_session 
from threading import Thread 
from Queue import Queue, Empty as QueueEmpty 
from models import MyModel 


DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname' 


class MTWorker(object): 

    def __init__(self, worker_count=5): 
     self.task_queue = Queue() 
     self.worker_count = worker_count 
     self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) 
     self.DBSession = scoped_session(
      sessionmaker(
       autoflush=True, 
       autocommit=False, 
       bind=self.db_engine 
      ) 
     ) 

    def _worker(self): 
     db_session = self.DBSession() 
     while True: 
      try: 
       task_id = self.task_queue.get(False) 
       try: 
        item = db_session.query(MyModel).filter(MyModel.id == task_id).one() 
        # do something with item 
       except Exception as exc: 
        # if an error occurrs we skip it 
        continue 

       finally: 
        db_session.commit() 
        self.task_queue.task_done() 
      except QueueEmpty: 
       db_session.close() 
       return 

    def start(self): 
     try: 
      db_session = self.DBSession() 
      all_items = db_session.query(MyModel).all() 
      for item in all_items: 
       self.task_queue.put(item.id) 

      for _i in range(self.worker_count): 
       t = Thread(target=self._worker) 
       t.start() 

      self.task_queue.join() 
     finally: 
      db_session.close() 
      self.DBSession.remove() 


if __name__ == '__main__': 
    while True: 
     mt_worker = MTWorker(worker_count=50) 
     mt_worker.start() 

답변

36

만 (데이터베이스 당) 프로세스 당 한 번 create_enginescoped_session를 호출해야합니다. 각각은 자체 풀 또는 세션 풀 (각각)을 갖게되므로 하나의 풀만 만들도록하고 싶습니다. 그냥 모듈 수준의 글로벌합니다. 더 preciesly보다 사용자의 세션을 관리해야하는 경우, 당신은 아마 할 scoped_session

또 다른 변화를 이용하지 않아야하는 것은이 세션 인 것처럼 직접 DBSession을 사용하는 것입니다. scoped_session에서 세션 메서드를 호출하면 필요에 따라 스레드 로컬 세션을 만들고 세션으로 메서드 호출을 전달합니다. 는 기본적으로 5 연결 풀의 pool_size 이다의

또 다른 것은

는 알고 있어야합니다. 많은 애플리케이션의 경우 괜찮습니다,하지만 당신이 스레드의 을 많이 만드는 경우, 당신은 참으로 아주 도움이되었다, 매개 변수

DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname' 
db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) 
DBSession = scoped_session(
    sessionmaker(
     autoflush=True, 
     autocommit=False, 
     bind=db_engine 
    ) 
) 


class MTWorker(object): 

    def __init__(self, worker_count=5): 
     self.task_queue = Queue() 
     self.worker_count = worker_count 
# snip 
+1

이 정보 주셔서 감사 것을 조정해야 할 수도 있습니다. 왕 이여! – andrean

관련 문제