2011-11-15 2 views
24

다차원 배열 파일에서 PostgreSQL 데이터베이스로 전송할 수천만 개의 행이 있습니다. 제 도구는 파이썬과 psycopg2입니다. 데이터를 일괄 적재하는 가장 효율적인 방법은 copy_from입니다. 그러나 내 데이터는 대부분 32 비트 부동 소수점 숫자 (실수 또는 실수 4)이므로 실제 텍스트 → 실제 텍스트로 변환하지 않을 것입니다.psycopg2로 이진 COPY 테이블 FROM 사용

# Just one row of data 
num_row = [23253, 342, -15.336734, 2494627.949375] 

import psycopg2 
# Python3: 
from io import StringIO 
# Python2, use: from cStringIO import StringIO 

conn = psycopg2.connect("dbname=mydb user=postgres") 
curs = conn.cursor() 

# Convert floating point numbers to text, write to COPY input 
cpy = StringIO() 
cpy.write('\t'.join([repr(x) for x in num_row]) + '\n') 

# Insert data; database converts text back to floating point numbers 
cpy.seek(0) 
curs.copy_from(cpy, 'num_data', columns=('node', 'ts', 'val1', 'val2')) 
conn.commit() 

은 바이너리 모드를 사용하여 작업 할 수있는 상응하는 항목이 : 나는 파이썬 문자열 (텍스트)를 사용하여와 어디에 있어요 여기

CREATE TABLE num_data 
(
    id serial PRIMARY KEY NOT NULL, 
    node integer NOT NULL, 
    ts smallint NOT NULL, 
    val1 real, 
    val2 double precision 
); 

은 예를 들면 다음과 같습니다 데이터베이스 DDL은? 부동 소수점 숫자를 2 진수로 유지 하시겠습니까? 이것은 부동 소수점 정밀도를 보존 할뿐만 아니라 더 빠를 수도 있습니다.

(참고 예와 동일한 정밀도를 보려면 SET extra_float_digits='2' 사용) 여기서 파이썬 3 FROM COPY 이진 상당

+0

글쎄, [COPY로 바이너리 파일 가져 오기] (http://www.postgresql.org/docs/9.1/interactive/sql-copy.html) 할 수 있지만 전체 파일이 있어야합니다. 하나의 값뿐만 아니라 특정 이진 형식 –

+0

@ 에윈, 그렇습니다. 나는 바이너리 모드를 COPY에 대해 읽었지 만, psycopg2가 지원하는지, 아니면 다른 접근법을 사용해야하는지 잘 모르겠습니다. –

+0

내가 사용한 이진 파일 형식의 유일한 응용 프로그램은 * PostgreSQL에서 내 보낸 파일을 가져 오는 것입니다. 특정 형식을 작성할 수있는 다른 프로그램에 대해서는 알지 못합니다. 그래도 어딘가에있을 수는 없습니다. 반복되는 작업이라면 Postgres를 텍스트 형식으로 한 번 복사하고 다음에 이진 파일을 출력하고'COPY FROM .. FORMAT BINARY'를 쓸 수 있습니다. –

답변

29

을 :

from io import BytesIO 
from struct import pack 
import psycopg2 

# Two rows of data; "id" is not in the upstream data source 
# Columns: node, ts, val1, val2 
data = [(23253, 342, -15.336734, 2494627.949375), 
     (23256, 348, 43.23524, 2494827.949375)] 

conn = psycopg2.connect("dbname=mydb user=postgres") 
curs = conn.cursor() 

# Determine starting value for sequence 
curs.execute("SELECT nextval('num_data_id_seq')") 
id_seq = curs.fetchone()[0] 

# Make a binary file object for COPY FROM 
cpy = BytesIO() 
# 11-byte signature, no flags, no header extension 
cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0)) 

# Columns: id, node, ts, val1, val2 
# Zip: (column position, format, size) 
row_format = list(zip(range(-1, 4), 
         ('i', 'i', 'h', 'f', 'd'), 
         (4, 4, 2, 4, 8))) 
for row in data: 
    # Number of columns/fields (always 5) 
    cpy.write(pack('!h', 5)) 
    for col, fmt, size in row_format: 
     value = (id_seq if col == -1 else row[col]) 
     cpy.write(pack('!i' + fmt, size, value)) 
    id_seq += 1 # manually increment sequence outside of database 

# File trailer 
cpy.write(pack('!h', -1)) 

# Copy data to database 
cpy.seek(0) 
curs.copy_expert("COPY num_data FROM STDIN WITH BINARY", cpy) 

# Update sequence on database 
curs.execute("SELECT setval('num_data_id_seq', %s, false)", (id_seq,)) 
conn.commit() 

업데이트

은 나는 COPY를 위해 파일을 쓰는 위의 접근법을 다시 썼다. 파이썬의 내 데이터는 NumPy 배열로되어 있으므로이를 사용하는 것이 좋습니다. 여기에 1M 행 7 열 몇 가지 예를 data입니다 :

CREATE TABLE num_data_binary 
(
    id integer PRIMARY KEY, 
    node integer NOT NULL, 
    ts smallint NOT NULL, 
    s0 real, 
    s1 real, 
    s2 real, 
    s3 real, 
    s4 real, 
    s5 real, 
    s6 real 
) WITH (OIDS=FALSE); 

num_data_text라는 이름의 다른 유사한 테이블 : 내 데이터베이스에

import psycopg2 
import numpy as np 
from struct import pack 
from io import BytesIO 
from datetime import datetime 

conn = psycopg2.connect("dbname=mydb user=postgres") 
curs = conn.cursor() 

# NumPy record array 
shape = (7, 2000, 500) 
print('Generating data with %i rows, %i columns' % (shape[1]*shape[2], shape[0])) 

dtype = ([('id', 'i4'), ('node', 'i4'), ('ts', 'i2')] + 
     [('s' + str(x), 'f4') for x in range(shape[0])]) 
data = np.empty(shape[1]*shape[2], dtype) 
data['id'] = np.arange(shape[1]*shape[2]) + 1 
data['node'] = np.tile(np.arange(shape[1]) + 1, shape[2]) 
data['ts'] = np.repeat(np.arange(shape[2]) + 1, shape[1]) 
data['s0'] = np.random.rand(shape[1]*shape[2]) * 100 
prv = 's0' 
for nxt in data.dtype.names[4:]: 
    data[nxt] = data[prv] + np.random.rand(shape[1]*shape[2]) * 10 
    prv = nxt 

, 나는 두 개의 같은 모양 테이블을 가지고있다. 내가 도우미 기능을 사용하고 어떻게

def prepare_text(dat): 
    cpy = BytesIO() 
    for row in dat: 
     cpy.write('\t'.join([repr(x) for x in row]) + '\n') 
    return(cpy) 

def prepare_binary(dat): 
    pgcopy_dtype = [('num_fields','>i2')] 
    for field, dtype in dat.dtype.descr: 
     pgcopy_dtype += [(field + '_length', '>i4'), 
         (field, dtype.replace('<', '>'))] 
    pgcopy = np.empty(dat.shape, pgcopy_dtype) 
    pgcopy['num_fields'] = len(dat.dtype) 
    for i in range(len(dat.dtype)): 
     field = dat.dtype.names[i] 
     pgcopy[field + '_length'] = dat.dtype[i].alignment 
     pgcopy[field] = dat[field] 
    cpy = BytesIO() 
    cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0)) 
    cpy.write(pgcopy.tostring()) # all rows 
    cpy.write(pack('!h', -1)) # file trailer 
    return(cpy) 

이를 : 여기에

는 NumPy와 기록 배열의 정보를 사용하여 COPY (텍스트 및 바이너리 형식 모두)에 대한 데이터를 준비하기 위해 몇 가지 간단한 도우미 기능입니다 기준 두 COPY 포맷 방법 :

,617,451
Processing copy object for num_data_text 
Copy object prepared in 0:01:15.288695; 84355016 bytes; transfering to database 
Database copy time: 0:00:37.929166 
     Total time: 0:01:53.217861 
Processing copy object for num_data_binary 
Copy object prepared in 0:00:01.296143; 80000021 bytes; transfering to database 
Database copy time: 0:00:23.325952 
     Total time: 0:00:24.622095 
: 여기
def time_pgcopy(dat, table, binary): 
    print('Processing copy object for ' + table) 
    tstart = datetime.now() 
    if binary: 
     cpy = prepare_binary(dat) 
    else: # text 
     cpy = prepare_text(dat) 
    tendw = datetime.now() 
    print('Copy object prepared in ' + str(tendw - tstart) + '; ' + 
      str(cpy.tell()) + ' bytes; transfering to database') 
    cpy.seek(0) 
    if binary: 
     curs.copy_expert('COPY ' + table + ' FROM STDIN WITH BINARY', cpy) 
    else: # text 
     curs.copy_from(cpy, table) 
    conn.commit() 
    tend = datetime.now() 
    print('Database copy time: ' + str(tend - tendw)) 
    print('  Total time: ' + str(tend - tstart)) 
    return 

time_pgcopy(data, 'num_data_text', binary=False) 
time_pgcopy(data, 'num_data_binary', binary=True) 

지난 2 개 time_pgcopy 명령의 출력

따라서 NumPy → 파일 및 파일 → 데이터베이스 단계는 이진법을 사용하면 훨씬 빠릅니다. 분명한 차이점은 Python이 COPY 파일을 준비하는 방법인데, 이는 텍스트가 실제로 느리다. 일반적으로 이진 형식은이 스키마의 텍스트 형식으로 2/3 시간 내에 데이터베이스에로드됩니다.

마지막으로 데이터베이스 내의 두 테이블의 값을 비교하여 숫자가 다른지 확인했습니다. 행의 약 1.46 %가 s0 열에 대해 다른 값을 가지며이 분수는 s6에 대해 6.17 %로 증가합니다 (아마도 내가 사용한 임의의 방법과 관련이 있습니다). 모든 70M 32 비트 부동 소수점 값 사이의 0이 아닌 절대 값 차이는 9.3132257e-010과 7.6293945e-006 사이입니다. 텍스트 및 이진로드 방법 간의 이러한 작은 차이는 텍스트 형식 방법에 필요한 float → text → float 변환의 정밀도 상실로 인한 것입니다.

+0

꽤 멋지다. 어딘가에서 그것을 얻을 자신의 작성 했습니까? 실제로 성능이 향상됩니까? –

+0

작동하면 멋지다! 포맷은 제쳐두고, 해결책은 psycopg의'copy_expert()'를 사용하는 것입니다. – piro

+0

@ 에윈, 네, [copy] (http://www.postgresql.org/docs/current/interactive/sql-copy.html), [struct] (http : // docs. python.org/library/struct.html) 및 [dtype] (http://docs.scipy.org/doc/numpy/reference/arrays.dtypes.html)을 참조하십시오. 벤치 마크가 있으며 잘 보입니다. –

1

Here은 제 버전입니다. Mike의 버전을 기반으로합니다.

그것의 아주 임시을하지만, 두 가지 장점이 있습니다 :

  • hstore 바이너리 형식으로 작성하는 방법을
  • readline를 오버로드하여 스트림으로 발전기 및 행위를 기대는.