2016-12-05 5 views
2

Spark 사용하기 csv를 읽고 csv의 열에 함수를 적용하고 싶습니다. 작동하는 코드가 있지만 매우 해킹 된 코드입니다. 이 작업을 수행하는 적절한 방법은 무엇입니까?Spark에서 csv의 단일 열에 함수 적용

내 코드 그냥 line에 각 행을 매핑하고 line[index]에 함수를 호출하는 대신 열 이름의 함수를 호출 할 수 있도록하고 싶습니다

SparkContext().addPyFile("myfile.py") 
spark = SparkSession\ 
    .builder\ 
    .appName("myApp")\ 
    .getOrCreate() 
from myfile import myFunction 

df = spark.read.csv(sys.argv[1], header=True, 
    mode="DROPMALFORMED",) 
a = df.rdd.map(lambda line: Row(id=line[0], user_id=line[1], message_id=line[2], message=myFunction(line[3]))).toDF() 

. 이 포함 된 dataframe df에 새 열을 추가합니다

from pyspark.sql.types import IntegerType 
from pyspark.sql.functions import udf 

udf_myFunction = udf(myFunction, IntegerType()) # if the function returns an int 
df.withColumn("message", udf_myFunction("_3")) #"_3" being the column name of the column you want to consider 

: 나는 2.0.1

답변

7

당신은 단순히 withColumn과 함께 사용자 정의 함수 (udf)를 사용할 수 있습니다 스파크 버전을 사용하고

myFunction(line[3])의 결과입니다.

+0

감사합니다.'udf'가 존재했는지 몰랐습니다. 슈퍼 유용한. – Sal

관련 문제