0
Scala의 Spark 데이터 프레임에서 열의 모든 요소에 함수를 적용하려고합니다. 그리고 난 단지 지능 부분을 반환하고 싶습니다 -이 예 10에 나는 장난감의 예에서이 작업을 수행 할 수 있습니다 : 입력이처럼 보이는 문자열이다 "{10 카운트}"Spark 데이터 프레임의 열의 모든 요소에 맵 함수 적용
val x = List("{\"count\": 107}", "{\"count\": 9}", "{\"count\": 456}")
val _list = x.map(x => x.substring(10,x.length-1).toInt)
val getCounts: String => Int = _.substring(10,x.length-1).toInt
import org.apache.spark.sql.functions.udf
val myUDF = udf(getCounts)
df.withColumn("post_shares_int", myUDF('post_shares)).show
오류 출력 :이 작업을 수행하는 방법에 대한 도움은 매우 극명하게 될 것이다
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2060)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:56)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:187)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
....
내 dataframe에 UDF를 적용하려고 때 오류가 발생합니다.
정규식 JSON 문자열 ... JSON을 구문 분석하면 안 되는가? –
@ cricket_007 당신은 절대적으로 옳습니다. 나는 그러한 기능을 잘 몰랐습니다. – cheseaux
@ Feynman27 솔루션 편집 됨 – cheseaux