2016-09-12 2 views
6

내가 가진 RDD [행] :스칼라 : String 값에 대한 GroupBy 합계를 계산하는 방법?

|---itemId----|----Country-------|---Type----------| 
    |  11  |  US   |  Movie  | 
    |  11  |  US   |  TV   | 
    |  101  |  France  |  Movie  |  

각 행은 별도의 JSON 객체 (RDD의 각 행)입니다 내가 JSON 목록으로 결과를 저장할 수 있도록 GROUPBY 해당 itemId을 수행하는 방법 :

{"itemId" : 11, 
"Country": {"US" :2 },"Type": {"Movie" :1 , "TV" : 1} }, 
{"itemId" : 101, 
"Country": {"France" :1 },"Type": {"Movie" :1} } 

RDD :

는 I 시도 :

import com.mapping.data.model.MappingUtils 
import com.mapping.data.model.CountryInfo 


val mappingPath = "s3://.../"  
val input = sc.textFile(mappingPath) 

입력의 목록은

val MappingsList = input.map(x=> { 
        val countryInfo = MappingUtils.getCountryInfoString(x); 
        (countryInfo.getItemId(), countryInfo) 
       }).collectAsMap 

MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo] 


def showCountryInfo(x: Option[CountryInfo]) = x match { 
     case Some(s) => s 
    } 


val events = sqlContext.sql("select itemId EventList") 

val itemList = events.map(row => { 
    val itemId = row.getAs[String](1); 
    val çountryInfo = showTitleInfo(MappingsList.get(itemId)); 
    val country = if (countryInfo.getCountry() == 'unknown)' "US" else countryInfo.getCountry() 
    val type = countryInfo.getType() 

    Row(itemId, country, type) 
     }) 

어떤 사람은 제가 이것을 달성 할 수있는 방법을 알려 수 : 내가 JSON 구문 분석 및 변환을 담당 MappingUtils를 사용 CountryInfo POJO와 클래스에 매핑하고 각 줄은 JSON은 jsons?

감사합니다!

+0

RDD [행]이 DataFrame/DataSet에서 왔습니까? RDD로 작업 [행]은 여전히 ​​실행 가능하지만 일반적으로 이상적이지 않습니다. –

+0

데이터 세트에서 RDD를 만들었습니다. –

+0

@ASpotySpot이 내 RDD로 업데이트되었습니다. –

답변

3

나는 이것을 완료하는 데 여분의 시간을 가질 여유가 없지만 시작을 줄 수 있습니다.

RDD[Row]을 JSON 구조를 나타내는 단일 맵으로 집계하는 것이 좋습니다.

  1. seqOp

    방법 대상 유형 대상 종류의 두 병합 방법
  2. combOp으로 요소의 컬렉션 폴드 : 집계 두 함수의 매개 변수를 요구하는 배이다. 당신이 seqOp에서 볼 수있는 값의 수를 축적해야하므로, 병합 중에

까다로운 부분은 combOp 제공됩니다. 나는 이것을 잡을 비행기가있는 것처럼 운동으로 남겨 두었습니다! 문제가 생기면 다른 사람들이 그 간격을 메울 수 있기를 바랍니다.

case class Row(id: Int, country: String, tpe: String) 

    def foo: Unit = { 

    val rows: RDD[Row] = ??? 

    def seqOp(acc: Map[Int, (Map[String, Int], Map[String, Int])], r: Row) = { 
     acc.get(r.id) match { 
     case None => acc.updated(r.id, (Map(r.country, 1), Map(r.tpe, 1))) 
     case Some((countries, types)) => 
      val countries_ = countries.updated(r.country, countries.getOrElse(r.country, 0) + 1) 
      val types_ = types.updated(r.tpe, types.getOrElse(r.tpe, 0) + 1) 
      acc.updated(r.id, (countries_, types_)) 
     } 
    } 

    val z = Map.empty[Int, (Map[String, Int], Map[String, Int])] 

    def combOp(l: Map[Int, (Map[String, Int], Map[String, Int])], r: Map[Int, (Map[String, Int], Map[String, Int])]) = { 
     l.foldLeft(z) { case (acc, (id, (countries, types))) => 
      r.get(id) match { 
      case None => acc.updated(id, (countries, types)) 
      case Some(otherCountries, otherTypes) => 
       // todo - continue by merging countries with otherCountries 
       // and types with otherTypes, then update acc 
      } 
     } 
    } 

    val summaryMap = rows.aggregate(z) { seqOp, combOp } 
관련 문제