2016-08-29 3 views
0

같은 알고리즘으로 처리해야하는 수백만 개의 행이있는 PostgreSQL 테이블이 있습니다. 이 작업을 위해 Python과 SQLAlchemy.Core를 사용하고 있습니다.PostgreSQL 테이블의 분산 처리

이 알고리즘은 하나 또는 여러 개의 행을 입력으로 허용하고 업데이트 된 값이있는 동일한 양의 행을 반환합니다.

id1, id2, NULL, NULL, NULL -> id1, id2, value1, value2, value3 
id1, id3, NULL, NULL, NULL -> id1, id3, value4, value5, value6 
id2, id3, NULL, NULL, NULL -> id2, id3, value7, value8, value9 
... 
id_n, id_m, NULL, NULL, NULL -> id_n, id_m, value_xxx, value_yyy, value_zzz 

이 작업을 수행하기 위해 PC 클러스터를 사용하고 있습니다. 이 클러스터는 dask.distributed 스케줄러와 작업자를 실행합니다.

이 작업은 map 기능을 사용하여 효과적으로 구현할 수 있다고 생각합니다. 내 생각에 각 작업자는 데이터베이스를 쿼리하고 NULL 값을 가진 일부 행을 처리하도록 선택하고 그 결과로 결과를 업데이트합니다.

내 질문은 : SQL 쿼리를 작성하는 방법, 그것은 근로자 중 테이블 조각을 배포 할 수 있습니까?

나는 각 노동자가 방출하는 것으로, SQL 쿼리에 offsetlimit 각 작업자 행의 하위 집합을 정의하기 위해 시도했다 :

SQL :

select * from table where value1 is NULL offset N limit 100; 
... 
update table where id1 = ... and id2 = ... 
     set value1 = value...; 

파이썬 :

from sqlalchemy import create_engine, bindparam, select, func 
from distributed import Executor, progress 

def process(offset, limit): 
    engine = create_engine(...) 

    # get next piece of work 
    query = select(...).where(...).limit(limit).offset(offset) 

    rows = engine.execute([select]).fetchall() 
    # process rows 

    # submit values to table 
    update_stmt = table.update().where(...).where(...).values(...) 
    up_values = ... 
    engine.execute(update_stmt, up_values) 

if __name__ == '__main__': 
    e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'), 
             port=config('SERVER_PORT'))) 
    n_rows = count_rows_to_process() 
    chunk_size = 100 
    progress(e.map(process, range(0, n_rows, chunk_size))) 

그러나 이것은 작동하지 않았습니다.

함수는 계산이 시작되기 전에 오프셋 목록을 반환하고 map 함수는 작업 시작 전에 process 함수를 배포했습니다.

일부 작업자는 작업 청크 처리를 성공적으로 마치고 결과를 테이블에 제출하고 값을 업데이트했습니다.

새 반복이 시작되고 새 SELECT ...WHERE value1 is NULL LIMIT 100 OFFSET ... 쿼리가 데이터베이스로 보내지지만 이전 작업자가 테이블을 업데이트하기 전에 계산 되었기 때문에 오프셋이 유효하지 않습니다. 이제 NULL 값의 양이 줄어들고 작업자는 데이터베이스에서 빈 세트를받을 수 있습니다.

계산을 시작하기 전에 하나의 SELECT 쿼리를 사용할 수 없습니다. RAM에 맞지 않는 거대한 테이블을 반환하기 때문에 쿼리를 사용할 수 없습니다.

또한 SQLAlchemy 매뉴얼에서는 분산 처리를 위해 엔진 인스턴스를 각 파이썬 프로세스에 대해 로컬로 만들어야한다고 말합니다. 따라서 데이터베이스를 한 번 쿼리하고 process 함수에 반환 된 커서를 보낼 수 없습니다.

따라서 솔루션은 SQL 쿼리의 올바른 구성입니다. 당신이 병렬로 같은 일을 계산하는 여러 직원이있을 것이다

SELECT * 
FROM table 
WHERE value1 IS NULL 
ORDER BY random() 
LIMIT 100; 

최악의 경우, : 고려

+1

알고리즘에 분산 처리의 이점이 있습니까? 상당히 CPU 바인딩되어 있습니까? 매우 자주 데이터 세트의 상당 부분 (수백만 행의 순서조차도)을 메모리에 넣을 수 있습니다. 오버 헤드 때문에 단일 프로세스를 사용하는 경우보다 빠릅니다. 'LIMIT'과'OFFSET' ('OFFSET' 행을 건너 뛰기 때문에 속도가 느리다) 대신에 기본 키를'(id1, id2)'로 가정하고'(id1 , id2) BETWEEN (1, 2) AND (3, 4)'. – univerio

+0

제 경우에 대한 분산 처리에 대한 대안이 없습니다. 이 알고리즘은 C로 구현되고 Python에서 호출되는 PC 및 ARM 용 바이너리 실행 파일로 컴파일됩니다. – wl2776

답변

1

하나의 옵션은 무작위입니다. 그것이 당신을 괴롭히지 않으면 이것은 가장 간단한 방법 중 하나입니다.

다른 옵션

은 특정 근로자에 ​​개별 행을 전용 :

UPDATE table 
SET value1 = -9999 
WHERE id IN (
    SELECT id 
    FROM table 
    WHERE value1 IS NULL 
    ORDER BY random() 
    LIMIT 100 
) RETURNING * ; 

이 방법은 "표시"행 특정 직원이 -9999으로 "촬영"했다. 다른 모든 작업자는 value1이 더 이상 NULL이 아니므로이 행을 건너 뜁니다. 작업자가 실패 할 경우이 행으로 돌아갈 수있는 간단한 방법이 없을 것입니다. 수동으로 다시 NULL로 업데이트해야합니다.

+0

두 번째 옵션이 오늘 아침에 내 마음에 왔습니다. – wl2776

관련 문제