2016-11-14 3 views
0

다른 방식으로 집계 된 각 필드 :Pyspark 집계 - 나는 네 가지 기본 필드가 일부 데이터가

  1. 필드 1은 2가 모든 고유 값의 집합이어야한다
  2. 필드 데이터에 대한 열쇠이다 이 필드
  3. 필드 3 최소 값은

원본 코드는 다음과 같습니다 (타임 스탬프)

  • 필드 4는 최대 값입니다 (타임 스탬프)입니다 :

    data = (
        dataframe 
        .rdd 
        # flatten rows 
        .flatMap(lambda x: x) 
        # Parse JSON 
        .flatMap(lambda x: encode_json(x)) 
        # Capture values 
        .map(lambda x: [ 
         # Merge 'field1', 'field2' --> 'field1, field2' 
         ','.join(_ for _ in [x.get('metadata_value'), x.get('field2')]), 
         # Create pairing of low and high timestamps 
         [x.get('low'), x.get('high')] 
        ]) 
        # Aggregate into a list of low/high timestamps per 'field1, field2' 
        .aggregateByKey(list(), lambda u, v: u + [v], lambda u1, u2: u1 + u2) 
        # Flatten keys 'ip,guid' --> 'ip', 'guid' 
        .map(lambda x: (x[0].split(',')[0], x[0].split(',')[1], x[1], sum(1 for _ in x[1]))) 
        # Reduce timestamps to single values: [s1, e1], [s2, e2], ... --> s_min, e_max 
        .map(lambda x: (x[0], x[1], min(_[0] for _ in x[2]), max(_[1] for _ in x[2]), x[3])) 
    ) 
    

    원본 출력은 다음과 같습니다

    a | x| 20160103 | 20160107 
    a | x013579 | 20160101 | 20160106 
    

    새로운 출력은, 다음과 같이

    a | {x,x013579} | 20160101 | 20160107 
    
  • 답변

    1

    는 한 쌍의 RDD 매핑, 현재 출력이 2 변환을 추가, 해당 필드 (사전, 최소, 최대)로 각 필드를 줄입니다.

    data.map(lambda reg: (reg[0],[reg[1],reg[2],reg[3]])) .reduceByKey(lambda v1,v2: ({v1[0],v2[0]},min(v1[1],v2[1]), max(v1[2],v2[2])))

    관련 문제