2016-09-08 3 views
0

나는 한 다음 항목 : 그룹에 내가 원하는그룹 RDD 항목 RDD에서

(111,List(List(1473163148,abc))) 
(111,List(List(1473163143,def))) 
(111,List(List(1473163143,abd))) 
(111,List(List(1473163139,asd))) 
(111,List(List(1473163696,rtf))) 
(111,List(List(1473163700,rgd))) 
(111,List(List(1473163703,dmf)))  

새 항목에 이러한 항목, 각각의 새로운 항목 목록을 포함 할 수 있도록 30 분 이내에 이전 항목을 그것은 간단 해 보이지만 실제로는 코드가 트릭을하지 않습니다.

1473163143 1473163143 1473163148  
1473163139    
1473163696 1473163700 1473163703  
1473168932 

이 타임 스탬프 초이기 때문에, 그들이해야 :

val grouped = processed.reduceByKey((x,y) => x ++ y) 
val separated = grouped.flatMap { case (k, l) => MyFuncObj.createGroups(l).map(sublist => (k, sublist)) } 

object MyFuncObj { 

    def createGroups(l: List[List[Any]]): Iterable[List[List[Any]]] = { 
    l.groupBy(_.productElement(0).toString.toLong/30*60).values 
    } 

} 

은 위의 데이터에이 코드를 적용한 후, 나는 다음과 같은 결과 (이것이 핵심이기 때문에 나는 단지 타임 스탬프를 제공)를 얻을

1473163143 1473163143 1473163148 1473163139 1473163696 1473163700 1473163703 
1473168932 

이 작업을 해결하는 방법은 무엇입니까?

UPDATE :

더 명확하게하려면 : 나는 첫 번째 레코드의 시간에서 시작 30 분 버킷을 얻을 것으로 기대합니다.

+0

문제는 여전히 불분명하다 : 예상 출력이 입력에 나타나지 않습니다 타임 스탬프를'1473168932'이 포함되어 있습니다. –

답변

0

여기에 두 가지 문제가 있습니다 : 첫 번째 항목의 시간에 시작하는 "버킷"을 원하는 경우

  1. 이 - 당신이 전에 각 타임 스탬프와 첫 타임 스탬프 사이의 델타를 사용해야은

    : 대신 (30*60)로 나누어의 60에 의해 그 결과를 곱한 후 로 나누어있어 - 분할 괄호를 누락

  2. 30*60 주변을

    scala> 5000/30*60 
    res0: Int = 9960 
    
    scala> 5000/(30*60) 
    res1: Int = 2 
    

은 전부 - 이것은 당신이 무엇을해야 할 것 같다

// sample data: 
val processed = sc.parallelize(List(
    (111,List(List(1473163148L, "abc"))), 
    (111,List(List(1473163143L,"def"))), 
    (111,List(List(1473163143L,"abd"))), 
    (111,List(List(1473163139L,"asd"))), 
    (111,List(List(1473163696L,"rtf"))), 
    (111,List(List(1473163700L,"rgd"))), 
    (111,List(List(1473168932L,"dmf")))) 
) 


// first - find the lowest timsestamp: 
// if input isn't ordered: 
val firstTimestamp: Long = processed.values.map { case List((l: Long) :: _) => l }.min() 

// if input is sorted by timestamp: 
val firstTimestamp: Long = processed.first()._2.head.head.toString.toLong 

def createGroups(l: List[List[Any]]): Iterable[List[List[Any]]] = { 
    // divide the DELTA between each timestamp and first one by 30 minutes to find bucket: 
    l.groupBy(t => (firstTimestamp - t.productElement(0).toString.toLong)/(30*60)).values 
} 

// continue as you did: 
val grouped: RDD[(Int, List[List[Any]])] = processed.reduceByKey((x, y) => x ++ y) 
val separated: RDD[(Int, List[List[Any]])] = grouped.flatMap { 
    case (k, l) => createGroups(l).map(sublist => (k, sublist)) 
} 

separated.foreach(println) 
// prints: 
// (111,List(List(1473168932, dmf))) 
// (111,List(List(1473163148, abc), List(1473163143, def), List(1473163143, abd), List(1473163139, asd), List(1473163696, rtf), List(1473163700, rgd))) 
+0

감사합니다. 유일한 문제는'val firstTimestamp : Long = processed.values.map {case List ((l : Long) :: _) => l} .min()'행에 있습니다. 'Scruntinee가 패턴 유형과 호환되지 않습니다. 발견 : Long, required : String' 오류가 발생합니다. – Lobsterrrr

+0

나는 타임 스탬프가 입력 데이터에 'Long' 타입으로 있다고 가정했다. 질문이 타입을 정의하지 않았다. 만약 그들이 문자열이라면, 그 라인의 타입을 String으로 변경하고'l.toLong'을 호출한다. –

+0

'val firstTimestamp : Long = processed.values.map {case List ((l : String) :: _) => l.toLong} .min()'. 이것은'java.lang.UnsupportedOperationException : empty collection' 오류를줍니다. – Lobsterrrr