다음은 Python (Spark 1.6 +)의 Dataframe API를 사용한 구현 예입니다.
import pyspark.sql.functions as F
import numpy as np
from pyspark.sql.types import FloatType
의 우리가 "급여"스파크 dataframe 등의 고객을위한 월급이 가정 해 봅시다 :
월 | 고객 ID | 급여
우리는 모든 개월
을 통해 고객 당 평균 급여를 찾고 싶은
1 단계 : 급여에 집계 : 중간
def find_median(values_list):
try:
median = np.median(values_list) #get the median of values in a list in each row
return round(float(median),2)
except Exception:
return None #if there is anything wrong with the given values
median_finder = F.udf(find_median,FloatType())
2 단계를 계산하는 사용자 정의 함수를 작성
salaries_list = salaries.groupBy("customer_id").agg(F.collect_list("salary").alias("salaries"))
3 단계 : 각 행의 급여 목록에 그들을 수집하여 열 급여 COL에 median_finder의 UDF를 호출 새로운 열로 중간 값을 추가합니다.
salaries_list = salaries_list.withColumn("median",median_finder("salaries"))
np.nanmedian (values_list)을 사용하면 NaN이 무시되고 때로는 더 나은 선택입니다. –