2014-12-04 3 views
0

Spark에서이 코드를보다 효율적으로 만드는 방법은 무엇입니까?
데이터에서 최소, 최대, 개수, 평균을 계산해야합니다.
여기
A가 Shop001 99.99
Shop001 87.15
B Shop001는 3.99
이 ...스파크에서이 코드를 최적화하는 방법은 무엇입니까?

지금 내 데이터를 구성하려고 내 샘플 데이터,

이름 숍 돈이다 Name + Shop (키)에 의해 평균, 최소, 최대, 개수를 생성합니다.
그런 다음 collect()로 결과를 얻습니다. 여기

def tupleDivide(y): return float(y[0])/y[1] def smin(a, b): return min(a, b) def smax(a, b): return max(a, b) raw = sgRDD.map(lambda x: getVar(parserLine(x),list_C+list_N)).cache() cnt = raw.map(lambda (x,y,z): (x+"_"+y, 1)).countByKey() sum = raw.map(lambda (x,y,z): (x+"_"+y, z)).reduceByKey(add) min = raw.map(lambda (x,y,z): (x+"_"+y, z)).reduceByKey(smin) max = raw.map(lambda (x,y,z): (x+"_"+y, z)).reduceByKey(smax) raw_cntRDD = sc.parallelize(cnt.items(),3) raw_mean = sum.join(raw_cntRDD).map(lambda (x, y): (x, tupleDivide(y)))

사람은 우아한 코딩 스타일에 대한 몇 가지 제안을 제공 할 것이다, 스파크 내 코드?
감사합니다.

답변

2

최적의 처리를 위해서는 aggregateByKey을 사용해야합니다. 아이디어는 count, min, max 및 sum으로 구성된 state 벡터를 저장하고 집계 함수를 사용하여 최종 값을 얻는 것입니다. 또한 튜플을 키로 사용할 수 있으므로 키를 단일 문자열로 연결할 필요가 없습니다.

data = [ 
     ['x', 'shop1', 1], 
     ['x', 'shop1', 2], 
     ['x', 'shop2', 3], 
     ['x', 'shop2', 4], 
     ['x', 'shop3', 5], 
     ['y', 'shop4', 6], 
     ['y', 'shop4', 7], 
     ['y', 'shop4', 8] 
    ] 

def add(state, x): 
    state[0] += 1 
    state[1] = min(state[1], x) 
    state[2] = max(state[2], x) 
    state[3] += x 
    return state 

def merge(state1, state2): 
    state1[0] += state2[0] 
    state1[1] = min(state1[1], state2[1]) 
    state1[2] = max(state1[2], state2[2]) 
    state1[3] += state2[3] 
    return state1 

res = sc.parallelize(data).map(lambda x: ((x[0], x[1]), x[2])).aggregateByKey([0, 10000, 0, 0], add, merge) 

for x in res.collect(): 
    print 'Client "%s" shop "%s" : count %d min %f max %f avg %f' % (
     x[0][0], x[0][1], 
     x[1][0], x[1][1], x[1][2], float(x[1][3])/float(x[1][0]) 
    ) 
+0

알았어! 고마워요! – wlsherica

관련 문제