2014-12-08 1 views
5

PostgreSQL과 SQL은 Serializable transaction isolation level을 정의합니다. 트랜잭션을이 수준으로 분리하면 충돌하는 동시 트랜잭션이 중단되고 다시 시도해야합니다.SQLAlchemy, Serializable 트랜잭션 격리 및 관용적 Python 방식의 재 시도

트랜잭션 충돌이있는 경우 전체 HTTP 요청을 재생할 수있는 Plone/Zope에서 트랜잭션 재시도 개념을 잘 알고 있습니다. SQLAlchemy를 사용하여 비슷한 기능을 구현할 수 있습니까 (잠재적으로는 zope.sqlalchemy)? 나는 zope.sqlalchemy와 Zope transaction manager의 문서를 읽으려고했지만 이것은 나에게 분명하지 않다.

는 특히이 같은 뭔가를 원하는 :

# Try to do the stuff, if it fails because of transaction conflict do again until retry count is exceeded 
    with transaction.manager(retries=3): 
     do_stuff() 

    # If we couldn't get the transaction through even after 3 attempts, fail with a horrible exception 
+0

... 글쓰기 후에 내가 찾은이 질문은 - http://zodb.readthedocs.org/en/latest/transactions.html#retrying-transactions - 어쩌면 거기에 약간의 희박한 방법이있다. 루프를 다시 시도 하시겠습니까? –

+0

나는 그것이 당신이 얻을 수있는 최선이라고 생각합니다. 'with '는 코드를 반복 할 수 없으며 루프는 정리를 허용하지 않습니다. – Eevee

+0

@ Eevee : 함수 데코레이터는 어떻습니까? –

답변

2

그래서 2 주 주위에 파고 나는 내 자신과 함께 온 더 기성품 솔루션을 받고 없습니다 후.

여기에 managed_transaction 함수 장식자를 제공하는 ConflictResolver 클래스가 있습니다. 데코레이터를 사용하여 재 시도 가능한 기능을 표시 할 수 있습니다. 나는. 함수를 실행할 때 데이터베이스 충돌 오류가 발생하면 함수가 다시 실행되어 충돌 오류를 일으킨 db 트랜잭션이 완료 될 것으로 기대됩니다.

소스 코드

은 여기에서 : 3.4 이상 만 https://bitbucket.org/miohtama/cryptoassets/src/529c50d74972ff90fe5b61dfbfc1428189cc248f/cryptoassets/core/tests/test_conflictresolver.py?at=master

파이썬 : https://bitbucket.org/miohtama/cryptoassets/src/529c50d74972ff90fe5b61dfbfc1428189cc248f/cryptoassets/core/tests/test_conflictresolver.py?at=master

유닛 테스트는 현재 커버.

"""Serialized SQL transaction conflict resolution as a function decorator.""" 

import warnings 
import logging 
from collections import Counter 

from sqlalchemy.orm.exc import ConcurrentModificationError 
from sqlalchemy.exc import OperationalError 


UNSUPPORTED_DATABASE = "Seems like we might know how to support serializable transactions for this database. We don't know or it is untested. Thus, the reliability of the service may suffer. See transaction documentation for the details." 

#: Tuples of (Exception class, test function). Behavior copied from _retryable_errors definitions copied from zope.sqlalchemy 
DATABASE_COFLICT_ERRORS = [] 

try: 
    import psycopg2.extensions 
except ImportError: 
    pass 
else: 
    DATABASE_COFLICT_ERRORS.append((psycopg2.extensions.TransactionRollbackError, None)) 

# ORA-08177: can't serialize access for this transaction 
try: 
    import cx_Oracle 
except ImportError: 
    pass 
else: 
    DATABASE_COFLICT_ERRORS.append((cx_Oracle.DatabaseError, lambda e: e.args[0].code == 8177)) 

if not DATABASE_COFLICT_ERRORS: 
    # TODO: Do this when cryptoassets app engine is configured 
    warnings.warn(UNSUPPORTED_DATABASE, UserWarning, stacklevel=2) 

#: XXX: We need to confirm is this the right way for MySQL, SQLIte? 
DATABASE_COFLICT_ERRORS.append((ConcurrentModificationError, None)) 


logger = logging.getLogger(__name__) 


class CannotResolveDatabaseConflict(Exception): 
    """The managed_transaction decorator has given up trying to resolve the conflict. 

    We have exceeded the threshold for database conflicts. Probably long-running transactions or overload are blocking our rows in the database, so that this transaction would never succeed in error free manner. Thus, we need to tell our service user that unfortunately this time you cannot do your thing. 
    """ 


class ConflictResolver: 

    def __init__(self, session_factory, retries): 
     """ 

     :param session_factory: `callback()` which will give us a new SQLAlchemy session object for each transaction and retry 

     :param retries: The number of attempst we try to re-run the transaction in the case of transaction conflict. 
     """ 
     self.retries = retries 

     self.session_factory = session_factory 

     # Simple beancounting diagnostics how well we are doing 
     self.stats = Counter(success=0, retries=0, errors=0, unresolved=0) 

    @classmethod 
    def is_retryable_exception(self, e): 
     """Does the exception look like a database conflict error? 

     Check for database driver specific cases. 

     :param e: Python Exception instance 
     """ 

     if not isinstance(e, OperationalError): 
      # Not an SQLAlchemy exception 
      return False 

     # The exception SQLAlchemy wrapped 
     orig = e.orig 

     for err, func in DATABASE_COFLICT_ERRORS: 
      # EXception type matches, now compare its values 
      if isinstance(orig, err): 
       if func: 
        return func(e) 
       else: 
        return True 

     return False 

    def managed_transaction(self, func): 
     """SQL Seralized transaction isolation-level conflict resolution. 

     When SQL transaction isolation level is its highest level (Serializable), the SQL database itself cannot alone resolve conflicting concurrenct transactions. Thus, the SQL driver raises an exception to signal this condition. 

     ``managed_transaction`` decorator will retry to run everyhing inside the function 

     Usage:: 

      # Create new session for SQLAlchemy engine 
      def create_session(): 
       Session = sessionmaker() 
       Session.configure(bind=engine) 
       return Session() 

      conflict_resolver = ConflictResolver(create_session, retries=3) 

      # Create a decorated function which can try to re-run itself in the case of conflict 
      @conflict_resolver.managed_transaction 
      def myfunc(session): 

       # Both threads modify the same wallet simultaneously 
       w = session.query(BitcoinWallet).get(1) 
       w.balance += 1 

      # Execute the conflict sensitive code inside a managed transaction 
      myfunc() 

     The rules: 

     - You must not swallow all exceptions within ``managed_transactions``. Example how to handle exceptions:: 

      # Create a decorated function which can try to re-run itself in the case of conflict 
      @conflict_resolver.managed_transaction 
      def myfunc(session): 

       try: 
        my_code() 
       except Exception as e: 
        if ConflictResolver.is_retryable_exception(e): 
         # This must be passed to the function decorator, so it can attempt retry 
         raise 
        # Otherwise the exception is all yours 

     - Use read-only database sessions if you know you do not need to modify the database and you need weaker transaction guarantees e.g. for displaying the total balance. 

     - Never do external actions, like sending emails, inside ``managed_transaction``. If the database transaction is replayed, the code is run twice and you end up sending the same email twice. 

     - Managed transaction section should be as small and fast as possible 

     - Avoid long-running transactions by splitting up big transaction to smaller worker batches 

     This implementation heavily draws inspiration from the following sources 

     - http://stackoverflow.com/q/27351433/315168 

     - https://gist.github.com/khayrov/6291557 
     """ 

     def decorated_func(): 

      # Read attemps from app configuration 
      attempts = self.retries 

      while attempts >= 0: 

       session = self.session_factory() 
       try: 
        result = func(session) 
        session.commit() 
        self.stats["success"] += 1 
        return result 

       except Exception as e: 
        if self.is_retryable_exception(e): 
         session.close() 
         self.stats["retries"] += 1 
         attempts -= 1 
         if attempts < 0: 
          self.stats["unresolved"] += 1 
          raise CannotResolveDatabaseConflict("Could not replay the transaction {} even after {} attempts".format(func, self.retries)) from e 
         continue 
        else: 
         session.rollback() 
         self.stats["errors"] += 1 
         # All other exceptions should fall through 
         raise 

     return decorated_func 
0

Postgres 및 Oracle 충돌 오류는 zope.sqlalchemy에서 다시 시도 할 수있는 것으로 표시됩니다. 엔진 구성에서 격리 수준을 설정하면 pyramid_tm 또는 Zope의 트랜잭션 재시도 논리가 작동합니다.