0

PySpark 2.0에서 수행하려는 작업이 df.rdd.map으로 수행하기 쉽지만 Dataframe 실행 내부에 머물고 싶습니다. 엔진 때문에 성능상의 이유로 데이터 프레임 작업 만 사용하여이 작업을 수행 할 방법을 찾고 싶습니다. 내 문자열에 대한 정밀도 포맷 작업을해야 무엇을 각 행에 대해, 나에게 알려주는 열을 가지고 있고, 기본적으로Spark Dataframe 열의 데이터를 조건으로 사용하거나 다른 열 표현식으로 입력하십시오.

def precision_formatter(row): 
    formatter = "%.{}f".format(row.precision) 
    return row + [formatter % row.amount_raw/10 ** row.precision] 
df = df.rdd.map(precision_formatter) 

:

RDD 스타일의 작업은,이 같은 것입니다 그 정밀도에 따라 'amount_raw'열을 선택적으로 문자열 형식으로 지정하고 싶습니다.

답변

0

하나 이상의 열의 내용을 다른 열 작업의 입력으로 사용하는 방법을 모르겠습니다. 가능한 가장 가까운 것은 Column.when의 사용을 열 또는 열 내의 가능한 부울 조건/사례 집합에 해당하는 부울 연산 집합으로 외부에서 정의 된 집합으로 제안하는 것입니다.

예를 들어 가능한 모든 값이 row.precision이면 얻을 수있는 경우 해당 집합을 반복하고 집합의 각 값에 Column.when 연산을 적용 할 수 있습니다. 이 세트는 df.select('precision').distinct().collect()으로 얻을 수 있다고 생각합니다. pyspark.sql.functions.whenColumn.when 작업 자체가 Column 개체를 반환

때문에 당신이 세트를 소진 할 때까지, 당신은 프로그래밍 서로 when 작업을 '추가'(그러나 그것은 얻었다) 세트의 항목을 반복하고 유지할 수 있습니다 :

import pyspark.sql.functions as PSF 

def format_amounts_with_precision(df, all_precisions_set): 
    amt_col = PSF.when(df['precision'] == 0, df['amount_raw'].cast(StringType())) 
    for precision in all_precisions_set: 
     if precision != 0: # this is a messy way of having a base case above 
      fmt_str = '%.{}f'.format(precision) 
      amt_col = amt_col.when(df['precision'] == precision, 
          PSF.format_string(fmt_str, df['amount_raw']/10 ** precision) 

    return df.withColumn('amount', amt_col) 
0

파이썬 UDF로 할 수 있습니다. 그들은 많은 입력 값 (행의 열에서 값)을 취해 단일 출력 값을 낼 수 있습니다. 대신 열 정밀 가치는 글로벌 하나를 사용하기를 원한다면 당신은 다음과 같이 호출 할 때 불이 (..) 함수를 사용할 수 있습니다,

from pyspark.sql import types as T, functions as F 
from pyspark.sql.function import udf, col 

# Create example data frame 
schema = T.StructType([ 
    T.StructField('precision', T.IntegerType(), False), 
    T.StructField('value', T.FloatType(), False) 
]) 

data = [ 
    (1, 0.123456), 
    (2, 0.123456), 
    (3, 0.123456) 
] 

rdd = sc.parallelize(data) 
df = sqlContext.createDataFrame(rdd, schema) 

# Define UDF and apply it 
def format_func(precision, value): 
    format_str = "{:." + str(precision) + "f}" 
    return format_str.format(value) 

format_udf = F.udf(format_func, T.StringType()) 

new_df = df.withColumn('formatted', format_udf('precision', 'value')) 
new_df.show() 

또한 : 그것은 다음과 같이 보일 것이다

new_df = df.withColumn('formatted', format_udf(F.lit(2), 'value')) 
관련 문제