2012-05-14 3 views
6

REF 무결성 규칙을 사용하여 여러 개의 10k 레코드를 데이터베이스에 삽입하고 있습니다. 일부 데이터 행은 불행히도 중복되어 있습니다 (데이터베이스에 이미 존재한다는 점에서). SQLAlchemy에 의해 던져진 IntegrityError 예외를 처리하고 오류를 로깅 한 다음 계속해서 처리하기 위해 데이터베이스의 모든 행을 확인하기에는 너무 비싸다.SQLAlchemy IntegrityError 및 대량 데이터 가져 오기

내 코드는 다음과 같이 표시됩니다

# establish connection to db etc. 

tbl = obtain_binding_to_sqlalchemy_orm() 
datarows = load_rows_to_import() 

try: 
    conn.execute(tbl.insert(), datarows) 
except IntegrityError as ie: 
    # eat error and keep going 
except Exception as e: 
    # do something else 

제가 위에서 만드는 오전 (암시) 가정 SQLAlchemy의 한 트랜잭션에 여러 삽입 롤링되지 않는 것입니다. 내 가정이 틀린 경우 IntegrityError가 발생하면 나머지 삽입이 중단된다는 것을 의미합니다. 위의 의사 코드 "패턴"이 예상대로 작동하는지 누구든지 확인할 수 있습니까? 아니면 Thr IntegrityError 예외의 결과로 데이터가 손실 될까요?

또한 누구나이 작업을 수행하는 것이 더 좋은 생각이 들면 그 내용을 듣고 싶습니다.

답변

1

이전처럼 트랜잭션을 시작하지 않았다면 sqlalchemy의 autocommit feature이 실행되지만이 링크에서 설명한대로 명시 적으로 설정해야합니다.

0

ASCII 데이터 파일을 구문 분석하여 데이터를 테이블로 가져올 때도이 문제가 발생했습니다. 문제는 필자가 직관적으로 직관적으로 SQLAlchemy가 고유 한 데이터를 허용하면서 중복 된 행을 건너 뛰길 바랬다는 것입니다. 또는 유니 코드 문자열이 허용되지 않는 등 현재 SQL 엔진으로 인해 행에 임의의 오류가 발생하는 경우 일 수 있습니다.

그러나이 동작은 SQL 인터페이스의 정의 범위를 벗어납니다. SQL API 및 따라서 SQLAlchemy는 트랜잭션과 커밋 만 이해하며 이러한 선택적 동작을 설명하지 않습니다. 게다가 예외가 발생하면 삽입이 중단되어 나머지 데이터는 남겨 지므로 자동 커밋 기능에 의존하는 것이 위험합니다.

내 솔루션 (가장 우아한 것인지 확실하지 않음)은 루프의 모든 행을 처리하고 예외를 캐치 및 로그하며 맨 끝에 변경 사항을 커밋하는 것입니다.

당신이 어떻게 든 목록 목록 (즉, 열 값 목록 인 행 목록)에서 데이터를 획득했다고 가정합니다. 그런 다음 루프의 모든 행을 읽습니다.

# Python 3.5 
from sqlalchemy import Table, create_engine 
import logging 

# Create the engine 
# Create the table 
# Parse the data file and save data in `rows` 

conn = engine.connect() 
trans = conn.begin() # Disables autocommit 

exceptions = {} 
totalRows = 0 
importedRows = 0 

ins = table.insert() 

for currentRowIdx, cols in enumerate(rows): 
    try: 
     conn.execute(ins.values(cols)) # try to insert the column values 
     importedRows += 1 

    except Exception as e: 
     exc_name = type(e).__name__ # save the exception name 
     if not exc_name in exceptions: 
      exceptions[exc_name] = [] 
     exceptions[exc_name].append(currentRowIdx) 

    totalRows += 1 

for key, val in exceptions.items(): 
    logging.warning("%d out of %d lines were not imported due to %s."%(len(val), totalRows, key)) 

logging.info("%d rows were imported."%(importedRows)) 

trans.commit() # Commit at the very end 
conn.close() 

이 작업의 속도를 최대화하려면 자동 커밋을 비활성화해야합니다. 나는이 코드를 SQLite와 함께 사용하고 있으며 자동 체크가 비활성화 된 상태에서도 여전히 sqlite3을 사용하는 이전 버전보다 3-5 배 느리다. (SQLAlchemy로 이식 한 이유는 MySQL과 함께 사용할 수 있기 때문입니다.)

SQLite에 대한 직접 인터페이스만큼 빠르지 않은 가장 우아한 솔루션은 아닙니다. 머지 않아 코드를 프로파일 링하고 병목 현상을 발견하면이 대답을 솔루션으로 업데이트 할 것입니다.

관련 문제