2015-01-22 2 views
0

스파크를 처음 사용 해보니 기간 계산에 대한 질문이있었습니다. IP 주소와 서버 연결 시간 (날짜 시간 필드)이있는 서버 로그가 있습니다. 로그의 각 레코드에 대한 연결 시간 간격을 계산하려고합니다. 필자는 필요한 모든 데이터를 설정하고 서식을 지정할 수 있지만 주어진 IP 주소에 대해 서로 다른 두 행 사이의 값을 비교하는 방법을 모르겠습니다.서버 로그 - 지속 시간 계산 (Spark)

내 설정은 다음과 같습니다.

IP Activity 
235.325.23.22, 2014-09-01 03:31 
235.325.23.22, 2014-09-01 03:39 
235.325.23.22, 2014-09-01 03:43 
235.325.23.22, 2014-09-01 03:46 
235.325.23.22, 2014-09-01 03:55 
235.423.25.44, 2014-09-01 17:21 
235.423.25.44, 2014-09-01 17:30 
235.423.25.44, 2014-09-01 17:34 
235.423.25.44, 2014-09-01 17:42 
235.423.25.44, 2014-09-01 17:51 

나는 다음과 같은 결과를 얻을 싶습니다 어떤 도움을 주시면 더 좋구요

235.325.23.22, 2014-09-01 03:31am,0 base--start of the 235.325.23.22 IP set 
235.325.23.22, 2014-09-01 03:39am,8 minutes 
235.325.23.22, 2014-09-01 03:43am,4 minutes 
235.325.23.22, 2014-09-01 03:46am,3 minutes 
235.325.23.22, 2014-09-01 03:55am,9 minutes 
235.423.25.44, 2014-09-01 17:21pm,0 base-- start of the new 235.423.25.44 IP set 
235.423.25.44, 2014-09-01 17:30pm,9 minutes 
235.423.25.44, 2014-09-01 17:34pm,4 minutes 
235.423.25.44, 2014-09-01 17:42pm,8 minutes 
235.423.25.44, 2014-09-01 17:51pm,9 minutes 

합니다.

답변

1

이 전체 대답하지만 방향되지 않습니다 :

는이 값으로 날짜 문자열 IP에 키가 RDD가되도록,의 당신의 초기 설정 RDD를 호출 할 수 있습니다.

키별로 정렬 된 파티션을 만들려면 val sortedRDD = rdd.repartitionAndSortWithinPartitions (새 HashPartitioner (numPartitions))를 사용하십시오. numPartitions를 키보다 훨씬 더 많이 선택하십시오. 그런 다음 sortedRDD에서 mapPartitions를 사용하십시오. 정렬 된 반복자를 얻을 수 있습니다. 따라서 실제로는 이전 값을 추적하고 이동하면서 감할 필요가 있습니다.

sortedRDD.mapPartitions(iter=>{ var prev=""; iter.map{i=>val t =(i._1,i._2,if (prev=="") 0 else prev);prev=i._2;t}}) 

같은 매우 게으른 시도 뭔가에 (I 차이를하지 않았고 날짜를 구문 분석하지 않았지만 희망이 아이디어로 당신을 도움)