2016-07-05 2 views
3

줄리아에 내 R 코드 중 일부를 이식하려고합니다. 기본적으로 내가 줄리아에서 다음 R 코드를 다시 한 : 지금은 줄리아에서 동일한 작업을 수행하려고 11 하나 개의 코어에서 갈 때Julia pmap 성능

library(parallel) 

eps_1<-rnorm(1000000) 
eps_2<-rnorm(1000000) 

large_matrix<-ifelse(cbind(eps_1,eps_2)>0,1,0) 
matrix_to_compare = expand.grid(c(0,1),c(0,1)) 
indices<-seq(1,1000000,4) 
large_matrix<-lapply(indices,function(i)(large_matrix[i:(i+3),])) 

function_compare<-function(x){ 
    which((rowSums(x==matrix_to_compare)==2) %in% TRUE) 
} 

> system.time(lapply(large_matrix,function_compare)) 
    user system elapsed 
38.812 0.024 38.828 
> system.time(mclapply(large_matrix,function_compare,mc.cores=11)) 
    user system elapsed 
63.128 1.648 6.108 

하나는 내가 상당한 속도 향상을 얻고있다 알 수 있듯이 :

#Define cluster: 

addprocs(11); 

using Distributions; 
@everywhere using Iterators; 
d = Normal(); 

eps_1 = rand(d,1000000); 
eps_2 = rand(d,1000000); 


#Create a large matrix: 
large_matrix = hcat(eps_1,eps_2).>=0; 
indices = collect(1:4:1000000) 

#Split large matrix: 
large_matrix = [large_matrix[i:(i+3),:] for i in indices]; 

#Define the function to apply: 
@everywhere function function_split(x) 
    matrix_to_compare = transpose(reinterpret(Int,collect(product([0,1],[0,1])),(2,4))); 
    matrix_to_compare = matrix_to_compare.>0; 
    find(sum(x.==matrix_to_compare,2).==2) 
end 

@time map(function_split,large_matrix) 
@time pmap(function_split,large_matrix) 

    5.167820 seconds (22.00 M allocations: 2.899 GB, 12.83% gc time) 
    18.569198 seconds (40.34 M allocations: 2.082 GB, 5.71% gc time) 

필자는 pmap을 사용하여 속도가 향상되는 것을 알 수 있습니다. 어쩌면 누군가가 대안을 제안 할 수 있습니다.

+1

'large_matrix'는 250000 요소 배열 {Any, 1} : '이것이 문제일까요? – daycaster

+0

나는 Julia에게 매우 익숙하지 않다는 것을 알고있다 – Vitalijs

+0

Julia에서 0.4.6 나는'addprocs (3)'으로 다음과 같은 결과를 얻는다 : 4.173674 초 (22.97 M 할당 : 2.943 GB, 14.57 % gc 시간)'및 '0.795733 초 (292.07 k 할당 : 12.377 MB, gc 시간 0.83 %)'. 'large_matrix'의 타입은'Array {BitArray {2}, 1}'입니다. – tim

답변

1

여기에있는 문제 중 일부는 @parallel@pmap이 항상 작업자와주고받는 데이터를 잘 처리하지 못한다고 생각합니다. 따라서 실행중인 항목이 데이터 이동을 전혀 필요로하지 않는 상황에서 가장 잘 작동하는 경향이 있습니다. 또한 실적을 향상시키기 위해 할 수있는 일이 있을지도 모른다고 의심하지만 세부적인 사항은 확실하지 않습니다.

더 많은 데이터가 필요하다면 작업자의 기능을 직접 호출하는 옵션을 사용하고 그 기능을 사용하여 해당 작업자의 메모리 공간에있는 개체에 액세스하는 것이 가장 좋습니다. 아래에 하나의 예를 들어 보겠습니다.이 예는 여러 명의 근로자를 사용하여 기능을 향상시킵니다. 아마도 가장 간단한 옵션 인 @everywhere을 사용하지만 상황에 따라 @spawn, remotecall() 등도 고려해 볼 가치가 있습니다.

addprocs(11); 

using Distributions; 
@everywhere using Iterators; 
d = Normal(); 

eps_1 = rand(d,1000000); 
eps_2 = rand(d,1000000); 

#Create a large matrix: 
large_matrix = hcat(eps_1,eps_2).>=0; 
indices = collect(1:4:1000000); 

#Split large matrix: 
large_matrix = [large_matrix[i:(i+3),:] for i in indices]; 

large_matrix = convert(Array{BitArray}, large_matrix); 

function sendto(p::Int; args...) 
    for (nm, val) in args 
     @spawnat(p, eval(Main, Expr(:(=), nm, val))) 
    end 
end 

getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm))) 

@everywhere function function_split(x::BitArray) 
    matrix_to_compare = transpose(reinterpret(Int,collect(product([0,1],[0,1])),(2,4))); 
    matrix_to_compare = matrix_to_compare.>0; 
    find(sum(x.==matrix_to_compare,2).==2) 
end 


function distribute_data(X::Array, WorkerName::Symbol) 
    size_per_worker = floor(Int,size(X,1)/nworkers()) 
    StartIdx = 1 
    EndIdx = size_per_worker 
    for (idx, pid) in enumerate(workers()) 
     if idx == nworkers() 
      EndIdx = size(X,1) 
     end 
     @spawnat(pid, eval(Main, Expr(:(=), WorkerName, X[StartIdx:EndIdx]))) 
     StartIdx = EndIdx + 1 
     EndIdx = EndIdx + size_per_worker - 1 
    end 
end 

distribute_data(large_matrix, :large_matrix) 


function parallel_split() 
    @everywhere begin 
     if myid() != 1 
      result = map(function_split,large_matrix); 
     end 
    end 
    results = cell(nworkers()) 
    for (idx, pid) in enumerate(workers()) 
     results[idx] = getfrom(pid, :result) 
    end 
    vcat(results...) 
end 

## results given after running once to compile 
@time a = map(function_split,large_matrix); ## 6.499737 seconds (22.00 M allocations: 2.899 GB, 13.99% gc time) 
@time b = parallel_split(); ## 1.097586 seconds (1.50 M allocations: 64.508 MB, 3.28% gc time) 

julia> a == b 
true 

참고 :이 경우에도 속도 향상은 여러 프로세스에서 완벽하지 않습니다. 하지만 이것은 예상되는 것입니다. 함수의 결과로 반환 할 적당한 양의 데이터가 있고 그 데이터를 이동해야하고 시간이 필요하기 때문입니다.

P. 여기에 사용 된 sendtogetfrom 기능에 대한 자세한 내용은이 게시물 (Julia: How to copy data to another processor in Julia) 또는이 패키지 (https://github.com/ChrisRackauckas/ParallelDataTransfer.jl)를 참조하십시오.