다음 코드가 있습니다.Spark 1.6 scala 데이터 행을 만듭니다.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val baseDF = sqlContext.read.json(fileFullPath)
My json에는 ProductId 및 Quantity라는 2 개의 관심 분야가 있습니다. 나는 2 열, 수량에 따라 제품 ID 및 수량하지만 여러 행이있는 스파크 RDD 또는 DF로 변경하려면 내가
{
"sales": {
"saledate": "17Mar2008",
"sale": [{
"productid": 1,
"quantity": 10
}, {
"productid": 2,
"quantity": 1
}, {
"productid": 3,
"quantity": 3
}, {
"productid": 4,
"quantity": 5
}]
}
}
찾고 있습니다. 각 수량마다 1을 원합니다. 상기 예에서, 제품 1은 10 개의 행을 가지며, 제품 2는 1이고, 제품 3은 3이고, 제품 4는 총 19 개의 행, 즉 5 개의 행 = 합 (양)에 대해 5 개의 행을 갖는다.
도움을 주시면 감사하겠습니다. spark 1.6.2와 scala를 사용하고 있습니다.
import org.apache.spark.sql.functions._
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
import sqlContext.implicits._
val baseDF = sqlContext.read.json(fileFullPath)
val listFromQuantity = udf { quantity: Int => List.fill(quantity)(quantity) }
baseDF.select(explode($"sales.sale")).select($"col.productId", explode(listFromQuantity($"col.quantity"))).show()
반환 :이 일해야
으로 바꿔야합니다. 현재 질문을 다시 작성하십시오. –
죄송합니다 ... 첫 번째 게시물 스택에 .. 감사합니다 : @ 가스파스 – SSC
아무 문제가 - 다른 사람들이 downvote 수있는 형식 때문에, downvote 수 있습니다.) –