0
다른 방식으로 집계 된 각 필드 :Pyspark 집계 - 나는 네 가지 기본 필드가 일부 데이터가
- 필드 1은 2가 모든 고유 값의 집합이어야한다
- 필드 데이터에 대한 열쇠이다 이 필드
- 필드 3 최소 값은
원본 코드는 다음과 같습니다 (타임 스탬프)
data = (
dataframe
.rdd
# flatten rows
.flatMap(lambda x: x)
# Parse JSON
.flatMap(lambda x: encode_json(x))
# Capture values
.map(lambda x: [
# Merge 'field1', 'field2' --> 'field1, field2'
','.join(_ for _ in [x.get('metadata_value'), x.get('field2')]),
# Create pairing of low and high timestamps
[x.get('low'), x.get('high')]
])
# Aggregate into a list of low/high timestamps per 'field1, field2'
.aggregateByKey(list(), lambda u, v: u + [v], lambda u1, u2: u1 + u2)
# Flatten keys 'ip,guid' --> 'ip', 'guid'
.map(lambda x: (x[0].split(',')[0], x[0].split(',')[1], x[1], sum(1 for _ in x[1])))
# Reduce timestamps to single values: [s1, e1], [s2, e2], ... --> s_min, e_max
.map(lambda x: (x[0], x[1], min(_[0] for _ in x[2]), max(_[1] for _ in x[2]), x[3]))
)
원본 출력은 다음과 같습니다
a | x| 20160103 | 20160107
a | x013579 | 20160101 | 20160106
새로운 출력은, 다음과 같이
a | {x,x013579} | 20160101 | 20160107