나는 한 다음 항목 : 그룹에 내가 원하는그룹 RDD 항목 RDD에서
(111,List(List(1473163148,abc)))
(111,List(List(1473163143,def)))
(111,List(List(1473163143,abd)))
(111,List(List(1473163139,asd)))
(111,List(List(1473163696,rtf)))
(111,List(List(1473163700,rgd)))
(111,List(List(1473163703,dmf)))
새 항목에 이러한 항목, 각각의 새로운 항목 목록을 포함 할 수 있도록 30 분 이내에 이전 항목을 그것은 간단 해 보이지만 실제로는 코드가 트릭을하지 않습니다.
1473163143 1473163143 1473163148
1473163139
1473163696 1473163700 1473163703
1473168932
이 타임 스탬프 초이기 때문에, 그들이해야 :
val grouped = processed.reduceByKey((x,y) => x ++ y)
val separated = grouped.flatMap { case (k, l) => MyFuncObj.createGroups(l).map(sublist => (k, sublist)) }
object MyFuncObj {
def createGroups(l: List[List[Any]]): Iterable[List[List[Any]]] = {
l.groupBy(_.productElement(0).toString.toLong/30*60).values
}
}
은 위의 데이터에이 코드를 적용한 후, 나는 다음과 같은 결과 (이것이 핵심이기 때문에 나는 단지 타임 스탬프를 제공)를 얻을
1473163143 1473163143 1473163148 1473163139 1473163696 1473163700 1473163703
1473168932
이 작업을 해결하는 방법은 무엇입니까?
UPDATE :
더 명확하게하려면 : 나는 첫 번째 레코드의 시간에서 시작 30 분 버킷을 얻을 것으로 기대합니다.
문제는 여전히 불분명하다 : 예상 출력이 입력에 나타나지 않습니다 타임 스탬프를'1473168932'이 포함되어 있습니다. –