2014-01-29 2 views
9

배경

나는 거대한 텍스트 파일 (~ 30GB)을 처리하기 위해 셀러리 (3.1.8)를 사용하려고합니다. 이 파일들은 fastq 형식으로 약 118M 시퀀싱 "읽기"를 포함하며, 본질적으로 헤더, DNA 시퀀스 및 품질 문자열의 조합입니다. 또한, 이러한 시퀀스는 paired-end sequencing run에서 발생하므로 itertools.izip을 통해 동시에 두 개의 파일을 반복합니다. 내가 할 수 있기를 원하는 것은 각 읽기 쌍을 가져 와서 대기열로 보내고 클러스터의 컴퓨터 중 하나에서 처리되도록합니다 (어느 쪽이 상관하지 않더라도) 정리 된 버전을 반환합니다. (예를 들어, 품질에 기초한) 청소가 필요할 경우 판독 값을 읽습니다.셀러리를 사용하여 거대한 텍스트 파일 처리

나는 셀러리와 rabbitmq을 설정 한, 내 근로자가 시작되어 다음과 같이

celery worker -A tasks --autoreload -Q transient 

과 같은 구성 :

from kombu import Queue 

BROKER_URL = 'amqp://[email protected]' 
CELERY_RESULT_BACKEND = 'rpc' 
CELERY_TASK_SERIALIZER = 'pickle' 
CELERY_RESULT_SERIALIZER = 'pickle' 
CELERY_ACCEPT_CONTENT=['pickle', 'json'] 
CELERY_TIMEZONE = 'America/New York' 
CELERY_ENABLE_UTC = True 
CELERYD_PREFETCH_MULTIPLIER = 500 

CELERY_QUEUES = (
    Queue('celery', routing_key='celery'), 
    Queue('transient', routing_key='transient',delivery_mode=1), 
) 

나는 RPC 백엔드와 피클을 사용하여 선택한 성능을위한 직렬화뿐만 아니라 은 'transient'대기열 (delivery_mode를 통해)에 디스크에 아무 것도 쓰지 않습니다.

셀러리 시작

셀러리 프레임 워크를 설정하려면, 내가 먼저 고속/tmp를 디스크에 로그 파일을 작성하는 64 방법 상자에서 rabbitmq 서버 (3.2.3, 얼랑 R16B03-1)를 시작합니다. 위와 같은 작업자 프로세스는 클러스터의 각 노드 (약 34 개)에서 총 688 개의 코어에 대해 8-way에서 64-way SMP까지 실행됩니다. 따라서 작업자가 대기열을 처리하는 데 사용할 수있는 많은 CPU가 있습니다. 셀러리가 실행되면

작업 제출/성능

, I는 아래 ipython 노트북을 통해 작업을 제출 :이 읽기의 약 1.5 초 만에 쌍을합니다

files = [foo, bar] 
f1 = open(files[0]) 
f2 = open(files[1]) 
res = [] 
count = 0 
for r1, r2 in izip(FastqGeneralIterator(f1), FastqGeneralIterator(f2)): 
    count += 1 
    res.append(tasks.process_read_pair.s(r1, r2)) 
     if count == 10000: 
     break 
t.stop() 
g = group(res) 
for task in g.tasks: 
    task.set(queue="transient") 

. 그럼, 아래와 같이, 20 대 정도 걸립니다 노동자에 제출 그룹에 지연 전화 :

result = g.delay() 

이 rabbitmq 콘솔 모니터링, 내가 아니라 거의 충분히 빨리 확인을하고있어 것을 알 수있다.

rabbitmq graph

질문 그래서

이 속도를 높일 수있는 방법이 있나요? 적어도 500 개가 아닌 500,000 개의 읽기 쌍이 초당 처리되는 것을보고 싶습니다. 셀러리 구성에서 내가 빠진 것이 분명합니까? 내 작업자와 토끼 로그는 본질적으로 비어 있습니다. 내 실적을 높이는 방법에 대한 조언을 많이 듣겠다. 각 개인도 꽤 빨리 쌍의 프로세스를 읽어

[2014-01-29 13:13:06,352: INFO/Worker-1] tasks.process_read_pair[95ec7f2f-0143-455a-a23b-c032998951b8]: HWI-ST425:143:C04A5ACXX:3:1101:13938:2894 1:N:0:ACAGTG HWI-ST425:143:C04A5ACXX:3:1101:13938:2894 2:N:0:ACAGTG 0.00840497016907 sec 

업이 시점에

이 시점에, 나는 모든 봤 한 최대

그래서 내가 셀러리, 성능, 라우팅, rabbitmq 등으로 생각할 수 셀러리 웹 사이트와 문서를 살펴 봤습니다. 성능을 더 높일 수 없다면이 방법을 포기해야합니다. 기본적으로 작업을 여러 개의 작은 실제 파일로 나눠서 다중 처리 등으로 각 계산 노드에서 직접 처리합니다.하지만이로드를 클러스터에 퍼뜨릴 수없는 것은 수치 스럽습니다. 게다가, 이것은 절묘하게 우아한 해결책처럼 보입니다.

미리 도움을 청하십시오! 당신이

+0

해야한다. 연결을 다시 사용하기 시작하면 게시 속도가 2 배 증가했습니다. 사용중인 라이브러리에 익숙하지 않지만 연결을 다시 사용하는지 확인하십시오. – Basic

+0

셀러리에는 [기본 브로커 풀] (http://docs.celeryproject.org/en/latest/configuration.html#std)이 있습니다. : 설정 -BROKER_POOL_LIMIT). 나는 그것을 증가시키고 어떤 일이 일어나는지 보려고 노력할 것이다. –

+0

'BROKER_POOL_LIMIT = 1000'을 추가하고 직원들을 반송했습니다. 불행히도, 어떤 차이를 만들지 않았다. –

답변

1

하나 개의 솔루션은 높은 압축 그래서

res.append(tasks.process_bytes(zlib.compress(pickle.dumps((r1, r2))), 
             protocol = pickle.HIGHEST_PROTOCOL), 
         level=1)) 

하여 다음

res.append(tasks.process_read_pair.s(r1, r2)) 

를 교체하고 반대편에 pickle.loads(zlib.decompress(obj))를 호출 읽는 것입니다.

길이가 충분하지 않은 경우 DNA 서열이 길어지면 큰 요인 주위에 요소를 얻어야 만 덤프하고 압축하는 배열에서 청크로 그룹화 할 수 있습니다.

아직 수행하지 않은 경우 다른 전송에서는 zeroMQ를 사용하여 전송할 수 있습니다.

나는

+0

그냥 zlib를 사용하면 약 100/s가 들었습니다. 기본적으로 같은 것이 아닌가? 'res.append (tasks.process_read_pair.apply_async (args = (r1, r2), queue = "transient", compression = 'zlib'))' –

+0

또한 작성한 방식대로 작동하는지 확신 할 수 없습니다. 전체 작업을 압축하는 대신 r1, r2를 압축해야합니까? –

+0

당신은 압축 = 'zlib'tryied 모르겠다. 당신은 맞습니다 zlib은 레벨 = 9와 같지만 레벨 = 1로 지정하면 성능에 거의 영향을주지 않습니다. –

2

하지 대답하지만, 주석 너무 오래해야하는지 process_byte 확실하지 않다.

첫째, 모든 일반 논리/메시지 준비를 건너 뛰는 시도하고 당신의 현재 라이브러리와 소감 가능한 출판 루프를 할 ...의 조금 아래로 문제를 좁혀 보자. 당신이 얻는 속도를보십시오. 이렇게하면 대기열과 관련이없는 코드에 문제가 있는지 여부가 식별됩니다.

여전히 느린 경우 새 파이썬 스크립트를 설정하지만 셀러리 대신 amqplib을 사용하십시오. 중역 데스크톱에서 유용한 작업 (및 json 인코딩)을 수행하는 동안 6000/s 이상으로 퍼블리싱 할 수있게되었으므로 성능이 뛰어난 것으로 알고 있습니다. 이렇게하면 문제가 셀러리 라이브러리에 있는지 확인할 수 있습니다. 당신이 위의 두 가지 방법 사이

from amqplib import client_0_8 as amqp 
try: 
    lConnection = amqp.Connection(
     host=###, 
     userid=###, 
     password=###, 
     virtual_host=###, 
     insist=False) 
    lChannel = lConnection.channel() 
    Exchange = ### 

    for i in range(100000): 
     lMessage = amqp.Message("~130 bytes of test data..........................................................................................................") 
     lMessage.properties["delivery_mode"] = 2 
     lChannel.basic_publish(lMessage, exchange=Exchange) 

    lChannel.close() 
    lConnection.close() 

except Exception as e: 
    #Fail 

는에 문제를 추적 할 수 있어야한다 (당신에게 시간을 절약하기 위해, 나는 ... 광산의 프로젝트에서 다음 냈다 단순화 할 때 희망을 깨진 적이 없다) 대기열, 라이브러리 또는 코드 중 하나를 선택합니다.

+1

고마워요! 좋은 테스트. 귀하의 코드를 사용하여, 나는 교환기로 7k/sec 이상을 받고 있으며, 전달 모드가 디스크로 떨어지면 약간 적습니다. [모드 2] (https://dl.dropboxusercontent.com/u/861789/Screen%20Shot%202014-01-30%20at%2012.36.52%20PM.png), [모드 1] (https : // dl .dropboxusercontent.com/u/861789/Screen % 20Shot % 202014-01-30 % 20at % 2012.35.35 % 20 PM.png) –

+0

올바른 방향으로 나아가는 단계입니다. 적어도 대기열이 작동하고 있다는 것을 알고 있습니다. 다음 번에 가장 쉬운 단계는 원래 코드에 amqplib을 놓고 속도가 어떤지 확인하는 것입니다. 그것이 높으면, 우리는 그것을 승리라고 부를 것입니다. 그렇지 않은 경우 문제는 대기열과 관련이없고 코드 대신 사용됩니다. 데이터를 저장하는 데 사용하는 구조가 느리거나 너무 비슷합니다. 그것이 당신의 코드라면, [프로파일 링은 갈 길입니다] (http://stackoverflow.com/a/582337/156755). 훌륭한 프로파일 러는 아니지만 지금까지 파이썬에서 발견 한 최고입니다. – Basic

0

다시 답변이지만 댓글은 너무 길어요.

from amqplib import client_0_8 as amqp 
try: 
    lConnection = amqp.Connection() 
    lChannel = lConnection.channel() 
    Exchange = 'celery' 

    for i in xrange(1000000): 
     lMessage = amqp.Message("~130 bytes of test data..........................................................................................................") 
     lMessage.properties["delivery_mode"] = 1 
     lChannel.basic_publish(lMessage, exchange=Exchange, routing_key='transient') 

    lChannel.close() 
    lConnection.close() 

except Exception as e: 
    print e 

당신은 그것을 잘 따라 흔들 있다고 볼 수 있습니다 Basic's 코멘트를/아래 답변, 내 응용 프로그램과 동일한 교환 및 라우팅을 사용하여 다음 테스트를 설정합니다.

test

나는 지금이 차이를 알아 내기까지의 어떤 나는 내 논리에 AMQP를 추가

0

의 내부에서 일어나고, 그리고 빠른 것 같아요. FML.

from amqplib import client_0_8 as amqp 
try: 
    import stopwatch 
    lConnection = amqp.Connection() 
    lChannel = lConnection.channel() 
    Exchange = 'celery' 

    t = stopwatch.Timer() 
    files = [foo, bar] 
    f1 = open(files[0]) 
    f2 = open(files[1]) 
    res = [] 
    count = 0 
    for r1, r2 in izip(FastqGeneralIterator(f1), FastqGeneralIterator(f2)): 
     count += 1 
     #res.append(tasks.process_read_pair.s(args=(r1, r2))) 
     #lMessage = amqp.Message("~130 bytes of test data..........................................................................................................") 
     lMessage = amqp.Message(" ".join(r1) + " ".join(r2)) 
     res.append(lMessage) 
     lMessage.properties["delivery_mode"] = 1 
     lChannel.basic_publish(lMessage, exchange=Exchange, routing_key='transient') 
     if count == 1000000: 
      break 
    t.stop() 
    print "added %d tasks in %s" % (count, t) 

    lChannel.close() 
    lConnection.close() 

except Exception as e: 
    print e 

img

그래서 저는 아래와 같이 루프에 셀러리에 비동기 작업을 제출하기 위해 변경 한 :

res.append(tasks.speed.apply_async(args=("FML",), queue="transient")) 

속도 방법이 바로 이것이다 :

@app.task() 
def speed(s): 
    return s 

작업 제출 중 느림 ag 아인!

img

그래서, 함께 할 수있는 아무것도 표시되지 않습니다

내가 큐에 내가를 제출하고있어
  • 메시지를 제출 반복하고있어 어떻게

    대신 오히려 함수의 대기열과 관련이 있습니다. 나는 혼란스러워.

  • 0

    다시 대답은 아니지만 더 많은 관찰이 필요합니다.

    with app.producer_or_acquire() as producer: 
        task.apply_async(producer=producer) 
    

    는 또한 작업이 될 수있다 : 당신에게 약간의 성능 향상을 제공한다 제작자 인스턴스를 재사용

    img

    2

    : 단순히 레디 스하는 RPC에서 내 백엔드를 변경하여, 나는 더 이상 내 처리량을 배로 프록시 오브젝트 등 모든 호출에 대해 평가해야하는 경우 :

    task = task._get_current_object() 
    

    자동으로 다시 것 group을 사용하여 생산자를 사용하고 이 같은 루프에서 어떻게 할 것인지 보통 : 가능한 경우

    process_read_pair = tasks.process_read_pair.s 
    g = group(
        process_read_pair(r1, r2) 
        for r1, r2 in islice(
         izip(FastGeneralIterator(f1), FastGeneralIterator(f2)), 0, 1000) 
    ) 
    result = g.delay() 
    
    또한 C. 에 기록 된 librabbitmq 모듈의 설치를 고려할 수

    amqp:// 전송이 자동으로 사용합니다 (또는 librabbitmq://를 사용하여 수동으로 지정할 수 있습니다

    pip install librabbitmq 
    

    출판 메시지를 직접 등등 셀러리 라우팅 도우미를 우회하기 때문에 빨리 될 수있는 기본 라이브러리를 사용하여,하지만 난않을 것그것이 훨씬 느린 것 같아요. 그렇다면 샐러리의 최적화를위한 여지가 확실히 있습니다. 저는 지금까지 소비자 측면을 최적화하는 데 주로 집중했습니다.

    당신이, 등등 CPU/메모리 캐시에 도움이 될 수 있고 를 거친 작업 단위를 사용하는 것과 같은 작업에 여러 DNA 쌍을 처리 할 수 ​​있으며, 그이 때문에 종종 어쨌든 병렬화를 포화 것 또한

    주 유한 자원.

    참고 : 과도 큐 내가 rabbitmq을 사용하고 내 성능 병목 현상 중 하나가 각 메시지 큐에 대한 연결을 재개/폐쇄 행위에서 온 것으로 나타났습니다 durable=False

    +0

    작업자는 솔로 풀을 사용할 때 초당 50.000 개의 작업을 사용할 수 있어야하며 결과는 없습니다. 그러나 벤치마킹을하지 않아도 성능이 다소 저하 될 수 있습니다. 게시 할 때 여러 연결을 사용하는 것도 도움이 될 수 있습니다. – asksol

    +0

    감사합니다. 시도 할 물건이 더 필요합니다. 나는 내 코드 스 니펫에서 명확하지 않은 C 클라이언트를 사용하고있다. rpc 대신 redis backend를 사용하고 50k 읽기 쌍의 그룹으로 그룹을 사용하여 제출하는 현재의 테스트 라운드에서는 5000-ish/sec로 제출할 수 있습니다. 그다지 좋지는 않지만 이전보다 확실히 좋습니다. –