2017-05-14 4 views
1

큰 팬더 데이터 프레임을 함수 인수로 전달하여 dask 분산 작업자에게 전달하려고합니다.팬더 데이터 프레임을 전달 작업자에게 노출시키는 방법은 무엇입니까?

1 기능에 직접 데이터를 전달 :

def test(X): 
    return X 
f=client.submit(test, X) 
f.result() 

2 저장 초기화 기능에 dataframe 내가 노력하는 (X 내 dataframe입니다).

def worker_init(r_X): 
    global X 
    X=r_X 
client.run(worker_init,X,y) 

3 모든 노드에 걸쳐 dataframe 산란 후 변형

def test(X): 
    return X 
f_X = client.scatter(X, broadcast=True) 
f = client.submit(test,f_X) 
f.result() 

없음 내 경우에 작동하지 선물을 통해 그것을 사용. 변형 1과 2는 거의 동일하게 작동합니다. dask-scheduler는 모든 작업을 수행 할 때마다 메모리를 늘리고 메모리가 부족해지면 작업을 중단합니다.

팬더 데이터 프레임을 전달하는 대신 일부 가비지가 발생하므로 변형 3이 작동하지 않습니다.

데이터 프레임을 작업자에게 보내고 스케줄러에 MemoryError가없는 방법은 무엇입니까?

메모리를 효율적으로하도록되어 있지만, 심지어 dataframe 통과하지 못한 변형 3의 전체 코드 : 그래서 당신이 dataframe에서 통과 할 때, 입력의 목록을

import pandas as pd 
import numpy as np 
from distributed import Client 
client = Client('localhost:8786') 
X = np.random.rand(10000,100) 
X=pd.DataFrame(X) 
f_X = client.scatter(X, broadcast=True) 
def test(X): 
    return X 
f = client.submit(test,f_X) 
f.result()[:10] 

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 

답변

2

client.scatter 검사를하면 실수로 그것을 일련의 목록으로 푸는 중입니다. 해야 할 일 f_X = client.scatter([X], broadcast=True)

이제 모든 작업자에게 하나의 데이터 프레임이 있습니다. 여기 f_X는 하나의 미래를 포함하는 목록이기도하므로 f = client.submit(test,f_X[0])을 원할 것입니다.

일반적으로 클라이언트에서 함수를 전달하는 대신 작업자의 함수 내에서 데이터를 생성 /로드 할 수 있습니다. 분명히 전체 데이터를 로컬 메모리에 저장하고 해당 데이터를 복사해야하며 직렬화 비용이 필요합니다. 도중에.

+0

솔루션을 점검하면서 데이터 프레임에 중복 된 열 이름이있는 경우 스 캐터가 작동하지 않는다는 것을 알았습니다. 하지만 이것은 내 코드의 버그였습니다. 중복 된 이름을 바꾼 후에도 효과가있었습니다. –

+0

'distributed .__ version__ == 1.16.3'에서 scatter는 적절하게 싱글 톤 인수를받습니다. – MRocklin

관련 문제