2017-03-21 5 views
-2

다음과 같은 데이터가 있으며 다음 JSON 문서를 만들고 싶습니다. 스파크에서 어떻게하면됩니까? 스파크에서 가장 효율적인 방법은 무엇입니까? Spark의 열을 그룹화하고 집계하여 중첩 된 Json을 만드는 방법

name|contact   |type 
    jack|123-123-1234  |phone 
    jack|[email protected] |email 
    jack|123 main street |address 
    jack|34545544445  |mobile 

     { 
     "name" : "jack", 
     "contacts":[ 
     { 
      "contact" : "123-123-1234", 
      "type" : "phone" 
     }, 
     { 
      "contact" : "[email protected]", 
      "type" : "email" 
     }, 
     { 
      "contact" : "123 main street", 
      "type" : "address" 
     }, 
     { 
      "contact" : "34545544445", 
      "type" : "mobile" 
     } 
     ] 
    } 

이 내가 제공하는 단지 샘플 사용 사례입니다

. 큰 데이터를 가지고 있는데 다중 열을 일부 그룹으로 묶어 하나의 행으로 접어야합니다. 논리.

현재 접근 방식은 각 행을 읽고, 버퍼에 저장하고 병합하는 UDAF를 작성하는 것입니다. 그래서 코드는 내가 다른 방법이있을 수 있습니다 무엇을 알아 내려고 노력하고 있어요

val mergeUDAF = new ColumnUDAF 

val tempTable = inputTable.withColumn("contacts",struct($"contact",$"type") 
val outputTable = tempTable.groupby($"name").agg(mergeUDAF($"contacts").alias("contacts")) 

될 것이다. 나는 스파크 SQL을 사용하여 이것을 달성하려고 노력하고있다.

+0

질문에 대한 추가 정보를 제공해주십시오. 보여 주려는 것을 보여주십시오. –

+0

@Thiago Baldim 죄송합니다. 질문이 더 자세하게 업데이트되었습니다. 감사. – hp2326

답변

0
case class contact(contact:String,contactType:String) 
case class Person(name:String,contact:Seq[contact]) 
    object SparkTestGrouping { 

     def main(args: Array[String]): Unit = { 

     val conf = new SparkConf().setAppName("LocalTest").setMaster("local") 
     val sc = new SparkContext(conf) 
     val sqlContext = new SQLContext(sc) 
     import sqlContext.implicits._ 


     val inputData=Seq("jack|123-123-1234|phone","jack|[email protected]|email","jack|123 main street|address","jack|34545544445|mobile") 


     val finalData = sc.parallelize(inputData) 

     val convertData = finalData.map(_.split('|')) 
      .map(line => (line(0),Seq(line(1) +"|" +line(2)))) 
      .reduceByKey((x,y) => x ++: y) 

      val output = convertData.map(line => (line._1,line._2.map(_.split('|')).map(obj => contact(obj(0),obj(1))))) 

     val finalOutput = output.map(line => Person(line._1,line._2)) 

     finalOutput.toDF().toJSON.foreach(println) 

     sc.stop() 

     } 

    } 

할 수 있습니다와 데이터에서 상자 튜플 키 필드를 사용하고 데이터를 그룹화하려면 reducebyKey를 사용하십시오. 위의 예에서 튜플 (name, Seq ("contact | contactType"))을 만들고 reducebykey를 사용하여 데이터를 이름순으로 그룹화했습니다. 데이터를 그룹화 한 후 케이스 클래스를 사용하여 추가로 에 참여해야하거나 단순히 json 문서를 만들어야하는 경우 DataFrame 및 DataSets로 변환 할 수 있습니다.

+1

게시 한 코드가 OP 용으로 명확하게 작동하는 동안 사이트의 향후 방문자가 수행 한 작업을 이해할 수 있도록 설명하는 몇 가지 설명을 게시하십시오. –

0

나는 그냥지도보다는 JSON 문자열에 "이름"함으로써, RDD이 CSV 데이터를 형성 그룹을 생성해야한다고 생각 :

val data = sc.parallelize(Seq("jack|123-123-1234|phone", "jack|[email protected] |email", "david|123 main street|address", "david|34545544445|mobile")) // change to load your data as RDD 

val result = data.map(_.split('|')).groupBy(a => a(0)).map(a => { 
    val contact = a._2.map(c => s"""{"contact": "${c(1)}", "type": "${c(2)}" }""").mkString(",") 
    s"""{"name": "${a._1}", "contacts":[ ${contact}] }""" 
    }).collect.mkString(",") 

    val json = s"""[ ${result} ] """ 
관련 문제