번 변수 (변수, 타임 스탬프, 값)으로 수천 개의 변수 값 관측치를 포함하는 대형 데이터 세트 (> 40GB 쪽모 파일)를 상상해보십시오.PySpark로 중첩 된 for-each 루프를 수행하는 방법
이제 500 개 변수의 하위 집합에 관심이있는 쿼리를 생각해보십시오. 특정 시점 (관측 창 또는 시간대)에 대한 변수에 대한 관측치 (값 -> 시간 시리즈)를 검색하려고합니다. 시작과 종료 시간이 있습니다. 분산 컴퓨팅 (스파크)없이
,이처럼 코드 수 :
for var_ in variables_of_interest:
for incident in incidents:
var_df = df_all.filter(
(df.Variable == var_)
& (df.Time > incident.startTime)
& (df.Time < incident.endTime))
내 질문은 : 방법 스파크/PySpark으로 그렇게 할 수 있나요? 다음 중 하나를 생각하고있었습니다.
- 어떻게 든 변수와 인시던트를 결합하고 나중에 데이터 프레임을 필터링합니다.
- 입사 데이터 프레임을 브로드 캐스팅하고 변수 관측 (df_all)을 필터링 할 때 맵 기능 내에서 사용합니다.
- 어떻게 든 RDD.cartasian 또는 RDD.mapParitions를 사용합니다 (주석 : 쪽매 파일이 변수에 의해 분할 저장되었습니다).
예상 출력 같아야
dataframe 1 입사광의 기간 내에 입사 1 dataframe 2 이들 값의 시간 프레임 내의 모든 변수와 관측 된 값을 포함incident1 --> dataframe 1
incident2 --> dataframe 2
...
2.
아이디어를 얻길 바랍니다.
UPDATE 나는 zero323에 의해 주어진 대답에서 아이디어 # 1을 기반으로하는 솔루션과 코드를 코딩했습니다. 작품은 꽤 좋지만 마지막 단계에서 사건을 집계/그룹화하는 방법을 궁금해합니다. 각 사건에 일련 번호를 추가하려고했지만 마지막 단계에서 오류가 발생했습니다. 코드를 검토하고 완성 할 수 있다면 멋지겠습니까? 따라서 샘플 데이터와 스크립트를 업로드했습니다. 환경은 1.4 스파크입니다 (PySpark) :
- 사건 : incidents.csv
- 변수 값 관측 데이터 (77메가바이트) : parameters_sample.csv (HDFS에 넣어)
- Jupyter 노트북 : nested_for_loop_optimized.ipynb
- 파이썬 스크립트 : nested_for_loop_optimized.py 스크립트의
- PDF 내보내기 : nested_for_loop_optimized.pdf
대용량 데이터 세트가 아니기 때문에 테라도 아닙니다. 큰 하나! :) 지금까지 뭐 해봤 어..? – gsamaras
나는 그것을 해결할 수있는 아이디어를 얻기 위해 수 많은 게시물과 예제를 읽었습니다. 아직 아무 것도 구현하지 않았습니다. 하지만 첫 번째 시도는 isin-function을 사용하여 변수를 df2로 필터링 한 다음 사건 데이터 프레임을 브로드 캐스팅하고 df2에 대한 맵을 사용합니다. 그러나 각 사건에 대한 데이터 프레임 (관찰)을 얻는 방법을 확신하지 못합니다. 어떻게 든 붙어있다. – Matthias
'join'은 합리적인 시작 pont처럼 보입니다. 당신은 Cartesian 제품을 피하기에 충분하며 500 레코드가 있으면 브로드 캐스트 조인에 쉽게 최적화 될 수 있습니다. – zero323