2016-10-10 3 views
2

Scala 2.10은 여기에서 Spark 1.6.2를 사용합니다. 나는 과 비슷한 (그러나 동일하지는 않음)이라는 질문을 this one과 같이 가지고 있지만, 받아 들인 대답은 SSCCE이 아니며, Spark에 대한 "사전 지식"을 일정하다고 가정합니다. 그러므로 나는 그것을 재현하거나 이해할 수 없다. 더 중요한 것은입니다.이 질문은 단지 기존 데이터 프레임에 새로운 열을 추가하는 것으로 제한됩니다. 반면 열과 데이터 프레임의 모든 기존 행에 대한 값을 추가해야합니다.StringType 열을 기존 Spark DataFrame에 추가 한 다음 기본값을 적용하십시오.


은 그래서 기존의 스파크 DataFrame에 열을 추가하고 모든 행에 그 새 열에 대한 초기 ('기본') 값을 적용 할.

val json : String = """{ "x": true, "y": "not true" }""" 
val rdd = sparkContext.parallelize(Seq(json)) 
val jsonDF = sqlContext.read.json(rdd) 

jsonDF.show() 

나는이 (.show()를 통해) 출력으로 다음 얻을 것을 실행하면 :이 생성되어 json 문자열을 수정하지 않고있어 후

+----+--------+ 
| x|  y| 
+----+--------+ 
|true|not true| 
+----+--------+ 

지금 내가 jsonDF에 새 필드를 추가하려면, 결과 DF는 다음과 같이 것 같은 :

+----+--------+----+ 
| x|  y| z| 
+----+--------+----+ 
|true|not true| red| 
+----+--------+----+ 

의미, 나는 새로운 "z"기둥, 트러스, 빔을 추가 할 DF에 mn을 입력하고 을 입력 한 다음 z - 값이 "red" 인 모든 행을 기본값으로 설정합니다.

내가 함께 다음의 의사 코드 재현 한 다른 질문에서

: 또한

org.apache.spark.sql.AnalysisException: Cannot resolve column name "col" among (x, y); 
    at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152) 
    at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151) 
    at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664) 
    at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:652) 

I : 나는이 프로그램을 실행할 때

val json : String = """{ "x": true, "y": "not true" }""" 
val rdd = sparkContext.parallelize(Seq(json)) 
val jsonDF = sqlContext.read.json(rdd) 

//jsonDF.show() 

val newDF = jsonDF.withColumn("z", jsonDF("col") + 1) 

newDF.show() 

는하지만, 내가 그 .withColumn(...) 방법에 컴파일러 오류를 "red"을 기본값으로 설정할 수있는 API 메소드가 보이지 않습니다. 내가 어디에서 잘못 왔는지에 대한 어떤 생각?

답변

8

lit 기능을 사용할 수 있습니다. 첫째로 당신은 자동으로 유추됩니다 컬럼의

jsonDF.withColumn("z", lit("red")) 

유형 아래와 같이 그것을

import org.apache.spark.sql.functions.lit 

를 가져오고 그것을 사용해야합니다.

관련 문제