2017-11-15 2 views
2

dask를 사용하여 큰 고해상도 해양 모델 데이터 세트에서 시간 경과에 따른 선형 추세를 계산하려고합니다.축의 성능이 축을 따라 적용됩니다.

나는이 예제 (Applying a function along an axis of a dask array)를 따라 갔고 apply_along_axis 구문을 쉽게 발견했다.

현재 1 차원 배열에 numpy 함수를 래핑하고 결과 배열을 xray Dataarray에 패키지화하려면 dask.array.apply_along_axis을 사용하고 있습니다. top -u <username>을 사용하면 계산이 병렬로 실행되지 않습니다 (~ 100 % cpu 사용).

map_blocks에서 더 나은 성능을 기대합니까? 또는 apply_along_axis의 실적을 개선하는 방법에 대한 제안 사항이 있습니까? 팁을 주시면 감사하겠습니다. 내가 사용 비슷한 일을하고있다

import numpy as np 
from scipy import optimize 
import xarray as xr 
import dask.array as dsa 

def _lin_trend(y): 
    x = np.arange(len(y)) 
    return np.polyfit(x, y, 1) 



def linear_trend(da, dim, name='parameter'): 
    da = da.copy() 
    axis_num = da.get_axis_num(dim) 

    dims = list(da.dims) 
    dims[axis_num] = name 
    coords = da.rename({dim:name}).coords 
    coords[name] = ['slope', 'intercept'] 

    dsk = da.data 
    dsk_trend = dsa.apply_along_axis(_lin_trend,0,dsk) 
    out = xr.DataArray(dsk_trend, dims=dims, coords=coords) 
    return out 
+0

당신의 dask 청크 구성과 데이터 어레이의 모양을 공유 할 수 있습니까? – jhamman

답변

0

는 xarray의 apply_ufunc (나중에 xarray v0.10 이상이 필요합니다). 이것은 dask에서 apply_along_axis 함수를 사용하는 것보다 관리하기가 약간 쉬울 것입니다.

import xarray as xr 
import numpy as np 
from scipy import stats 

def _calc_slope(x, y): 
    '''wrapper that returns the slop from a linear regression fit of x and y''' 
    slope = stats.linregress(x, y)[0] # extract slope only 
    return slope 


def linear_trend(obj): 
    time_nums = xr.DataArray(obj['time'].values.astype(np.float), 
          dims='time', 
          coords={'time': obj['time']}, 
          name='time_nums') 
    trend = xr.apply_ufunc(_calc_slope, time_nums, obj, 
          vectorize=True, 
          input_core_dims=[['time'], ['time']], 
          output_core_dims=[[]], 
          output_dtypes=[np.float], 
          dask='parallelized') 

    return trend 

성능이 예상과 다른 이유는 무엇입니까? 이것은 여러 가지 이유가있을 수 있습니다. dask 배열은 어떻게 chunked됩니까? 어떤 dask 스케줄러를 사용하고 있습니까? 귀하의 구성이 무엇인지 더 잘 알게 된 후에 제 답변의 두 번째 부분을 업데이트하겠습니다.

1

나는 궁극적으로 성능이 내가 작업하고있는 파일 시스템에 의해 제한된다고 생각한다. 그래도 질문에 대답하려면 내 데이터 집합의 모양은 다음과 같습니다.

<xarray.Dataset> 
Dimensions:   (st_edges_ocean: 51, st_ocean: 50, time: 101, xt_ocean: 3600, yt_ocean: 2700) 
Coordinates: 
    * xt_ocean  (xt_ocean) float64 -279.9 -279.8 -279.7 -279.6 -279.5 ... 
    * yt_ocean  (yt_ocean) float64 -81.11 -81.07 -81.02 -80.98 -80.94 ... 
    * st_ocean  (st_ocean) float64 5.034 15.1 25.22 35.36 45.58 55.85 ... 
    * st_edges_ocean (st_edges_ocean) float64 0.0 10.07 20.16 30.29 40.47 ... 
    * time   (time) float64 3.634e+04 3.671e+04 3.707e+04 3.744e+04 ... 

그래서 다소 커서 디스크에서 읽는 데 시간이 많이 걸립니다. 시간 차원이 아직 완료 기능을 위해 약 20 시간 소요 (성능에 큰 차이를 만들하지 않은 하나의 덩어리

dask.array<concatenate, shape=(101, 50, 2700, 3600), dtype=float64, 
chunksize=(101, 1, 270, 3600)> 

가되도록 내가 rechunked 한 (즉, 독서 등 및 쓰기입니다 디스크). I는 현재 시간에 청크 없습니다입니다 예를 들어

dask.array<concatenate, shape=(101, 50, 2700, 3600), dtype=float64, 
chunksize=(1, 1, 2700, 3600)> 

나는이 두 가지 방법의 상대적 성능에 관심을 내 노트북에 테스트를 실행. 더

import xarray as xr 
import numpy as np 
from scipy import stats 
import dask.array as dsa 

slope = 10 
intercept = 5 
t = np.arange(250) 
x = np.arange(10) 
y = np.arange(500) 
z = np.arange(200) 
chunks = {'x':10, 'y':10} 

noise = np.random.random([len(x), len(y), len(z), len(t)]) 
ones = np.ones_like(noise) 
time = ones*t 
data = (time*slope+intercept)+noise 
da = xr.DataArray(data, dims=['x', 'y', 'z', 't'], 
       coords={'x':('x', x), 
         'y':('y', y), 
         'z':('z', z), 
         't':('t', t)}) 
da = da.chunk(chunks) 
da 

내가 w (timeseries의 기울기를 계산하기 위해 linregress와 polyfit를 모두 사용하는) private 함수 세트와 dask.apply_along 및 xarray.apply_ufunc를 사용하는 다른 구현을 정의했습니다. 계산 타이밍

def _calc_slope_poly(y): 
    """ufunc to be used by linear_trend""" 
    x = np.arange(len(y)) 
    return np.polyfit(x, y, 1)[0] 


def _calc_slope(y): 
    '''returns the slop from a linear regression fit of x and y''' 
    x = np.arange(len(y)) 
    return stats.linregress(x, y)[0] 

def linear_trend_along(da, dim): 
    """computes linear trend over 'dim' from the da. 
     Slope and intercept of the least square fit are added to a new 
     DataArray which has the dimension 'name' instead of 'dim', containing 
     slope and intercept for each gridpoint 
    """ 
    da = da.copy() 
    axis_num = da.get_axis_num(dim) 
    trend = dsa.apply_along_axis(_calc_slope, axis_num, da.data) 
    return trend 

def linear_trend_ufunc(obj, dim): 
    trend = xr.apply_ufunc(_calc_slope, obj, 
          vectorize=True, 
          input_core_dims=[[dim]], 
          output_core_dims=[[]], 
          output_dtypes=[np.float], 
          dask='parallelized') 

    return trend 

def linear_trend_ufunc_poly(obj, dim): 
    trend = xr.apply_ufunc(_calc_slope_poly, obj, 
          vectorize=True, 
          input_core_dims=[[dim]], 
          output_core_dims=[[]], 
          output_dtypes=[np.float], 
          dask='parallelized') 

    return trend 

def linear_trend_along_poly(da, dim): 
    """computes linear trend over 'dim' from the da. 
     Slope and intercept of the least square fit are added to a new 
     DataArray which has the dimension 'name' instead of 'dim', containing 
     slope and intercept for each gridpoint 
    """ 
    da = da.copy() 
    axis_num = da.get_axis_num(dim) 
    trend = dsa.apply_along_axis(_calc_slope_poly, axis_num, da.data) 
    return trend 

trend_ufunc = linear_trend_ufunc(da, 't') 
trend_ufunc_poly = linear_trend_ufunc_poly(da, 't') 
trend_along = linear_trend_along(da, 't') 
trend_along_poly = linear_trend_along_poly(da, 't') 

apply_along 방법은 소폭 빠를 수 있음을 나타낼 것으로 보인다. linregress 대신 polyfit을 사용하는 것은 꽤 큰 영향을 미친 것 같습니다. 왜 이것이 훨씬 더 빠르지는 모르겠지만 아마 이것은 당신에게 흥미로울 것입니다.

%%timeit 
print(trend_ufunc[1,1,1].data.compute()) 

4.89의 루프 당 180 밀리 ± (± 표준. DEV 의미한다. 7 개 실행 1 개 루프 각각의) 루프 당 182 밀리 ±

%%timeit 
trend_ufunc_poly[1,1,1].compute() 

2.74 S (STD. DEV 평균 ±.(7) 런, 1 개 루프마다)를 루프 당 193 밀리 ±

%%timeit 
trend_along[1,1,1].compute() 

4.58 초의 (STD. DEV 평균 ±. 7 개 실행 1 개 루프마다)

%%timeit 
trend_along_poly[1,1,1].compute() 

2.64의 ± 65 밀리 초마다의 루프 (평균 ± 표준 편차 7 회, 각각 1 루프)

관련 문제