큰 팬더 데이터 프레임을 함수 인수로 전달하여 dask 분산 작업자에게 전달하려고합니다.팬더 데이터 프레임을 전달 작업자에게 노출시키는 방법은 무엇입니까?
1 기능에 직접 데이터를 전달 :
def test(X):
return X
f=client.submit(test, X)
f.result()
2 저장 초기화 기능에 dataframe 내가 노력하는 (X 내 dataframe입니다).
def worker_init(r_X):
global X
X=r_X
client.run(worker_init,X,y)
3 모든 노드에 걸쳐 dataframe 산란 후 변형
def test(X):
return X
f_X = client.scatter(X, broadcast=True)
f = client.submit(test,f_X)
f.result()
없음 내 경우에 작동하지 선물을 통해 그것을 사용. 변형 1과 2는 거의 동일하게 작동합니다. dask-scheduler는 모든 작업을 수행 할 때마다 메모리를 늘리고 메모리가 부족해지면 작업을 중단합니다.
팬더 데이터 프레임을 전달하는 대신 일부 가비지가 발생하므로 변형 3이 작동하지 않습니다.
데이터 프레임을 작업자에게 보내고 스케줄러에 MemoryError가없는 방법은 무엇입니까?
메모리를 효율적으로하도록되어 있지만, 심지어 dataframe 통과하지 못한 변형 3의 전체 코드 : 그래서 당신이 dataframe에서 통과 할 때, 입력의 목록을
import pandas as pd
import numpy as np
from distributed import Client
client = Client('localhost:8786')
X = np.random.rand(10000,100)
X=pd.DataFrame(X)
f_X = client.scatter(X, broadcast=True)
def test(X):
return X
f = client.submit(test,f_X)
f.result()[:10]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
솔루션을 점검하면서 데이터 프레임에 중복 된 열 이름이있는 경우 스 캐터가 작동하지 않는다는 것을 알았습니다. 하지만 이것은 내 코드의 버그였습니다. 중복 된 이름을 바꾼 후에도 효과가있었습니다. –
'distributed .__ version__ == 1.16.3'에서 scatter는 적절하게 싱글 톤 인수를받습니다. – MRocklin