2016-12-19 2 views
1

Spark에서 타임 스탬프, Integer 및 다른 데이터 프레임을 가져와 3 개의 값으로 이루어진 튜플을 반환하는 UDF를 사용해야합니다.Spark Scala에서 UDF 정의

오류가 발생하면 계속 오류가 발생하며 더 이상 올바르게 수정하려고하지 않습니다.

def determine_price (view_date: org.apache.spark.sql.types.TimestampType , product_id: Int, price_df: org.apache.spark.sql.DataFrame) : (Double, java.sql.Timestamp, Double) = { 
    var price_df_filtered = price_df.filter($"mkt_product_id" === product_id && $"created"<= view_date) 
    var price_df_joined = price_df_filtered.groupBy("mkt_product_id").agg("view_price" -> "min", "created" -> "max").withColumn("last_view_price_change", lit(1)) 
    var price_df_final = price_df_joined.join(price_df_filtered, price_df_joined("max(created)") === price_df_filtered("created")).filter($"last_view_price_change" === 1) 
    var result = (price_df_final.select("view_price").head().getDouble(0), price_df_final.select("created").head().getTimestamp(0), price_df_final.select("min(view_price)").head().getDouble(0)) 
    return result 
} 
val det_price_udf = udf(determine_price) 

이 나에게주는 오류는 다음과 같습니다 : 여기

는 기능입니다

error: missing argument list for method determine_price 
Unapplied methods are only converted to functions when a function type is expected. 
You can make this conversion explicit by writing `determine_price _` or `determine_price(_,_,_)` instead of `determine_price`. 

내가 int와는 Int.type을 찾을 것으로 예상 다른 오류가 계속 실행 인수를 추가하기 시작하면 또는 개체 DataFrame이 (가) org.apache.spark.sql 패키지의 구성원이 아닙니다.

일부 컨텍스트를 제공하는 경우 :

아이디어는 가격, 제품 ID 및 생성 날짜의 데이터 프레임과 제품 ID 및보기 날짜가 포함 된 다른 데이터 프레임이 있다는 것입니다.

보기 날짜보다 오래된 마지막 가격 항목을 기준으로 가격을 결정해야합니다.

각 제품 ID는 두 번째 데이터 프레임에 여러 개의보기 날짜가 있기 때문에. UDF가 교차 결합보다 빠르다고 생각했습니다. 누구든지 다른 아이디어가 있다면, 나는 감사 할 것입니다.

답변

0

데이터 프레임을 UDF에 전달할 수 없습니다. 특정 파티션에서 UDF가 작업자에서 실행되기 때문입니다. 또한 작업자 (RDD) (Is it possible to create nested RDDs in Apache Spark?)에서 RDD를 사용할 수 없으므로 마찬가지로 DataFrame을 작업자에게 사용할 수 없습니다.

이 문제를 해결하려면해야합니다.

+0

좋아, 나는 UDF 인수에서 데이터 프레임을 제거했습니다. 데이터 프레임은 캐싱되고 브로드 캐스트되며 함수 내에서 액세스 할 수 있어야합니다. 여전히 오류가 발생합니다. 'error : type mismatch; 발견 : Int.type 필수 : ​​Int val det_price_udf = udf (결정 팩터 (org.apache.spark.sql.types.TimestampType, Int)) ' – UrVal

+0

데이터 프레임이 UDF에 없으면 익숙한. 필자는 파이썬에서 익숙한 것처럼 "전역 변수"가 아닙니다. 이 문제를 해결하는 방법을 모릅니다. – UrVal

+0

유스 케이스는 무엇입니까? –

관련 문제