2016-08-03 3 views

답변

4

다음은 Python (Spark 1.6 +)의 Dataframe API를 사용한 구현 예입니다.

import pyspark.sql.functions as F 
import numpy as np 
from pyspark.sql.types import FloatType 

의 우리가 "급여"스파크 dataframe 등의 고객을위한 월급이 가정 해 봅시다 :

월 | 고객 ID | 급여

우리는 모든 개월

을 통해 고객 당 평균 급여를 찾고 싶은

1 단계 : 급여에 집계 : 중간

def find_median(values_list): 
    try: 
     median = np.median(values_list) #get the median of values in a list in each row 
     return round(float(median),2) 
    except Exception: 
     return None #if there is anything wrong with the given values 

median_finder = F.udf(find_median,FloatType()) 

2 단계를 계산하는 사용자 정의 함수를 작성

salaries_list = salaries.groupBy("customer_id").agg(F.collect_list("salary").alias("salaries")) 

3 단계 : 각 행의 급여 목록에 그들을 수집하여 열 급여 COL에 median_finder의 UDF를 호출 새로운 열로 중간 값을 추가합니다.

salaries_list = salaries_list.withColumn("median",median_finder("salaries")) 
+1

np.nanmedian (values_list)을 사용하면 NaN이 무시되고 때로는 더 나은 선택입니다. –

관련 문제