2016-08-23 6 views
2

나는 아래의 예 항목으로 RDD myRDD에서 특정 요소에 액세스하는 방법을 알아 내려고 노력하고 있어요 여러 목록을 사용하여 만드는 방법 : 나는 레디 스 DB가 3를 사용하여 일부 데이터를 추출 할지도 스파크

(600,List((600,111,7,1), (615,111,3,5)) 
(601,List((622,112,2,1), (615,111,3,5), (456,111,9,12)) 

rd 필드는 하위 목록에서 ID로 사용됩니다. 예를 들어, (600,List((600,111,1,1), (615,111,1,5))의 경우 ID는 73입니다. (601,List((622,112,2,1), (615,111,3,5), (456,111,9,12))의 경우 ID는 2, 39입니다.

문제는 여러 ID를 사용하여 값을 수집하는 방법을 모르는 것입니다. 아래의 주어진 코드에서 line._2(3)을 사용합니다. 그러나이 방법은 하위 목록의 필드 대신 하위 목록에 액세스하기 때문에 올바르지 않습니다. flatMap 또는 그 유사 물을 사용해야합니까?

val newRDD = myRDD.mapPartitions(iter => { 
    val redisPool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000)) 
    iter.map({line => (line._1, 
     redisPool.withJedisClient { client => 
     val start_date: String = Dress.up(client).hget("id:"+line._2(3),"start_date") 
     val end_date: String = Dress.up(client).hget("id:"+line._2(3),"end_date") 
     val additionalData = List((start_date,end_date)) 
     Map(("base_data", line._2), ("additional_data", additionalData)) 
     }) 
    }) 
    }) 
    newRDD.collect().foreach(println) 

우리가 레디 스 DB는 몇 가지 관련 데이터, 결과 newRDD은 다음이 될 수 포함되어 있다고 가정하면 : (line._2.map(_._3)를 사용

(600,Map("base_data" -> List((600,111,7,1), (615,111,3,5)), "additional_data" -> List((2014,2015),(2015,2016))) 
(601,Map("base_data" -> List((622,112,2,1), (615,111,3,5), (456,111,9,12)), "additional_data" -> List((2010,2015),(2011,2016),(2014,2016))) 

답변

0

line._2 각 튜플의 세 번째 요소의 목록을 얻으려면을 line 유형이 (Int, List[(Int, Int, Int, Int)]) 인 것으로 가정하면 귀하의 예처럼 보이며 Any과 같은 유형은 관련되지 않습니다. 전반적으로 코드가 비슷해야합니다.

iter.map({ case (first, second) => (first, 
    redisPool.withJedisClient { client => 
    val additionalData = second.map { tuple => 
     val start_date: String = Dress.up(client).hget("id:"+tuple._3,"start_date") 
     val end_date: String = Dress.up(client).hget("id:"+tuple._3,"end_date") 
     (start_date, end_date) 
    } 
    Map(("base_data", second), ("additional_data", additionalData)) 
    }) 
})