scoped_session을 사용하여 세션 개체를 구성한 다음 sqlalchemy 설명서에서 알 수 있듯이 올바르게 열고 데이터베이스 세션을 닫는 방법을 이해하는 데 어려움이 있습니다. 반환 된 세션을 사용하십시오. 스레드를 생성하는 객체는 스레드 세이프이므로 기본적으로 모든 스레드는 자신의 세션을 갖게 될 것이고 문제는 발생하지 않을 것입니다. 이제 아래의 예제가 작동합니다. 세션을 제대로 닫았는지 확인하기 위해 무한 루프에 넣었습니다. 올바르게 ("SHOW PROCESSLIST;"를 실행하여 mysql에서) 모니터하면 연결이 계속 커지고 닫히지 않습니다. , 비록 session.close()를 사용하고 각 실행이 끝날 때 scoped_session 객체를 제거하더라도. 내가 도대체 뭘 잘못하고있는 겁니까? 더 큰 응용 프로그램에서 필자의 목표는 필 요한 최소한의 데이터베이스 연결 수를 사용하는 것입니다. 왜냐하면 현재 작업중인 구현은 필요한 모든 메서드에 새로운 세션을 만들어 반환하기 전에 닫는 것이 비효율적이기 때문입니다.SQLAlchemy 다중 스레드 응용 프로그램에서 적절한 세션 처리
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from threading import Thread
from Queue import Queue, Empty as QueueEmpty
from models import MyModel
DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname'
class MTWorker(object):
def __init__(self, worker_count=5):
self.task_queue = Queue()
self.worker_count = worker_count
self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
self.DBSession = scoped_session(
sessionmaker(
autoflush=True,
autocommit=False,
bind=self.db_engine
)
)
def _worker(self):
db_session = self.DBSession()
while True:
try:
task_id = self.task_queue.get(False)
try:
item = db_session.query(MyModel).filter(MyModel.id == task_id).one()
# do something with item
except Exception as exc:
# if an error occurrs we skip it
continue
finally:
db_session.commit()
self.task_queue.task_done()
except QueueEmpty:
db_session.close()
return
def start(self):
try:
db_session = self.DBSession()
all_items = db_session.query(MyModel).all()
for item in all_items:
self.task_queue.put(item.id)
for _i in range(self.worker_count):
t = Thread(target=self._worker)
t.start()
self.task_queue.join()
finally:
db_session.close()
self.DBSession.remove()
if __name__ == '__main__':
while True:
mt_worker = MTWorker(worker_count=50)
mt_worker.start()
이 정보 주셔서 감사 것을 조정해야 할 수도 있습니다. 왕 이여! – andrean