2016-08-04 2 views
1

타임 스탬프, 애셋 (문자열), 태그 (문자열) 및 값 (더블) 열이있는 스파크 데이터 프레임 (스칼라 인터페이스 사용)이 있습니다. 여기의 발췌 한 것입니다 :필터 스파크 목록의 모든 점 집합 사이의 데이터 프레임

+--------------------+-----+--------+-------------------+ 
|   timestamp|asset|  tag|    value| 
+--------------------+-----+--------+-------------------+ 
|2013-01-03 23:36:...| G4| BTGJ2_2|  116.985626221| 
|2013-01-15 00:36:...| G4| TTXD1_6|  66.887382507| 
|2013-01-05 13:03:...| G4|TTXD1_22|  40.913497925| 
|2013-01-12 04:43:...| G4|TTXD1_23|  60.834510803| 
|2013-01-08 17:54:...| G4| LTB1D|  106.534744263| 
|2013-01-02 04:15:...| G4| WEXH|  255.981292725| 
|2013-01-07 10:54:...| G4| BTTA1_7|  100.743843079| 
|2013-01-05 11:29:...| G4| CDFH_10|  388.560668945| 
|2013-01-10 09:10:...| G4| LTB1D|  112.226242065| 
|2013-01-13 15:09:...| G4|TTXD1_15|  63.970848083| 
|2013-01-15 01:23:...| G4| TTIB|  67.993904114| 

나는 또한이 각 List의 크기가 두이며 시작하고 관심의 간격에 대한 종료 시간 유지하는 Array[List[Timestamp]]. 예를 들어 : 2013년 1월 12일

6:00 자정 2013년 1월 10일에서 2013년 1월 2일 12:00 자정에서 하나, 또 다른 :

event_times: Array[List[java.sql.Timestamp]] = Array(List(2013-01-02 00:00:00.0, 2013-01-02 12:00:00.0), List(2013-01-10 00:00:00.0, 2013-01-12 06:00:00.0)) 

두 가지 관심의 간격을 유지

여기 내 질문 : 어떻게 타임 스탬프가 간격의 모든에있는 값을 반환하도록 데이터 프레임을 필터링 할 수 있습니까? 나는 Array (나는 얼마나 많은 간격)에 얼마나 많은 요소를 알 수 없기 때문에 하나의 간격을 위해, 내가

df.filter(df("timestamp").between(start, end)) 

을 할 수있는, 그냥 필터의 긴 시리즈를 가질 수 없습니다. 위의 예를 들어

, 나는 행 4, 6을 유지하려는, 그리고 9

내가 지금 가지고있는 것은 Array을 통해 루프이며, 각각에 해당하는 부분 집합을 얻고있다. 그러나 모든 필터를 큰 필터에 설치하는 것보다 속도가 느린 것일 수 있습니다.

+1

미안하지만 원하는 것을 얻지 못했습니다. 명확한 모범을 보여줄 수 있습니까? –

+0

@ThiagoBaldim 방금 예를 들어 업데이트했습니다. – kgully

답변

3

타임 스탬프 목록을 DataFrame으로 변환하고 해당 타임 스탬프로 초기 DataFrame과 연결할 수 있습니다. 이 프로세스를 설명하기위한 간단한 예제를 만들었습니다.

//Dummy data 
val data = List(
    ("2013-01-02 00:30:00.0", "116.985626221"), 
    ("2013-01-03 00:30:00.0", "66.887382507"), 
    ("2013-01-11 00:30:00.0", "12.3456") 
) 

//Convert data to DataFrame 
val dfData = sc.parallelize(data).toDF("timestamp", "value") 

//Timestamp intervals list 
val filterList = Array(
    List("2013-01-02 00:00:00.0", "2013-01-02 12:00:00.0"), 
    List("2013-01-10 00:00:00.0", "2013-01-12 06:00:00.0") 
) 

//Convert the intervals list to a DataFrame 
val dfIntervals = sc.parallelize(
    filterList.map(l => (l(0),l(1))) 
).toDF("start_ts","end_ts") 

//Join both dataframes (inner join, since you only want matching rows) 
val joined = dfData.as("data").join(
    dfIntervals.as("inter"), 
    $"data.timestamp".between($"inter.start_ts", $"inter.end_ts") 
) 
관련 문제