2016-07-12 4 views
0

pyspark를 사용하여 Spark에서 알고리즘을 학습하는 기계에 대한 데이터를 준비하려고합니다. 나는 날짜와 시간, ID와 클래스 (0 또는 1)를 포함하는 RDD를 가지고 있습니다. 간단히 아래와 같습니다 :Pyspark의 RDD에서 시퀀스 추출하기

rdd = sc.parallelize([Row(my_date=1465488788,id=4,my_class=1), Row(my_date=1465488790,id=5,my_class=0), Row(my_date=1465488801,id=23,my_class=1), Row(my_date=1465488805,id=23,my_class=1), Row(my_date=1465488809,id=5,my_class=0), Row(my_date=1465488810,id=32,my_class=0),Row(my_date=1465488826,id=38,my_class=1)]) 

이 데이터는 날짜순으로 정렬됩니다. 내가 원하는 것은이 데이터에서 시퀀스를 추출하는 것입니다.이 RDD에서 같은 시간 창 내의 연속적인 항목은 동일한 순서로 이동해야합니다. 예를 들어, 20 초 시간 간격 내의 모든 데이터는 동일한 순서로 있어야합니다. 시퀀스 시간 창은 위의 데이터 세트에 대한 20 초입니다 경우에, 나는 3 시퀀스 다음 만들어야합니다 :

[Row(my_seq=[4,5,23,23], my_class=1),Row(my_seq=[5,23,23,5,32], my_class=0),Row(my_seq=[5,32,38], my_class=1)] 

클래스 번호는 순서의 최신 요소의 클래스이어야한다. 데이터 세트가 상당히 커질 것이므로이 작업을 병렬로 수행해야합니다. 나는 날짜 필드에 데이터를 그룹화하여이 작업을 수행하기 위해 시도했지만 작동하지 않았다 같은 순서로

def createSequences(in_arr): 
    in_arr = list(in_arr) 
    seq_arr = [] 
    sorted_arr = [] 
    for e in in_arr: 
     seq_arr.append(e["id"]) 
     sorted_arr.append(int(str(e["my_date"]) + str(e["my_class"]))) 
    sorted_arr.sort() 
    if str(sorted_arr[-1])[-1] == "1": 
     return Row(is_class = 1, seq = seq_arr, sorted_arr = sorted_arr) 
    else: 
     return Row(is_class = 0, seq = seq_arr, sorted_arr = sorted_arr) 
offset = 1465488788 
time_window = 20 
grouped = clustered_logs.map(lambda row: (int((row["my_date"] - offset)/time_window), row)) \ 
         .groupByKey() \ 
         .map(lambda l: createSequences(l[1])) 

그것은 단지 그룹에 동일한 창 크기 내의 모든 항목을 나는 최신 사용할 수 없습니다 시퀀스가 다음 첫 번째 시퀀스의 시간 창 내에있는 경우 시퀀스의 항목 스파크에서 이것을 달성 할 수있는 방법이 있다면, 제발 도와주세요. 감사합니다 ...

답변

0
time_window = 20 
clustered_logs.groupBy(lambda x: (int(x /time_window), x.my_class))\ 
    .map(lambda x: Row(my_class=x[0][1], my_seq=[ t.id for t in x[1]])) 
  • 는 창 클래스 int(x /time_window)
  • 그룹 창을 식별합니다. 그룹화 키는 튜플 인 (window, class)
  • ((73274439, 1) , itrable<Row>)과 같은 튜플입니다. 튜플의 두 번째 값은 for의 이해력을 사용하여 반복되어 목록을 만듭니다.

참조 : 대답에 대한 Scala doc RDD

+0

감사합니다. 하지만이 방법으로 두 시간 창 사이에 겹치면 일부 (실제로는 많은) 시퀀스가 ​​손실됩니다. 예를 들어 내 데이터가 ABBCDB이고 ABBC가 첫 번째 시간 창에 속하면 두 번째 DB 부분부터 시작하여 다음 시퀀스를 그룹화합니다. 하지만 ABBC, BBCD, BCDB 등으로하고 싶습니다. 가능하지 않다고 생각하니? 감사... –

관련 문제