2016-08-25 2 views
2

번 변수 (변수, 타임 스탬프, 값)으로 수천 개의 변수 값 관측치를 포함하는 대형 데이터 세트 (> 40GB 쪽모 파일)를 상상해보십시오.PySpark로 중첩 된 for-each 루프를 수행하는 방법

이제 500 개 변수의 하위 집합에 관심이있는 쿼리를 생각해보십시오. 특정 시점 (관측 창 또는 시간대)에 대한 변수에 대한 관측치 (값 -> 시간 시리즈)를 검색하려고합니다. 시작과 종료 시간이 있습니다. 분산 컴퓨팅 (스파크)없이

,이처럼 코드 수 :

for var_ in variables_of_interest: 
    for incident in incidents: 

     var_df = df_all.filter(
      (df.Variable == var_) 
      & (df.Time > incident.startTime) 
      & (df.Time < incident.endTime)) 

내 질문은 : 방법 스파크/PySpark으로 그렇게 할 수 있나요? 다음 중 하나를 생각하고있었습니다.

  1. 어떻게 든 변수와 인시던트를 결합하고 나중에 데이터 프레임을 필터링합니다.
  2. 입사 데이터 프레임을 브로드 캐스팅하고 변수 관측 (df_all)을 필터링 할 때 맵 기능 내에서 사용합니다.
  3. 어떻게 든 RDD.cartasian 또는 RDD.mapParitions를 사용합니다 (주석 : 쪽매 파일이 변수에 의해 분할 저장되었습니다).

예상 출력 같아야

dataframe 1 입사광의 기간 내에 입사 1 dataframe 2 이들 값의 시간 프레임 내의 모든 변수와 관측 된 값을 포함
incident1 --> dataframe 1 
incident2 --> dataframe 2 
... 

2.

아이디어를 얻길 바랍니다.

UPDATE 나는 zero323에 의해 주어진 대답에서 아이디어 # 1을 기반으로하는 솔루션과 코드를 코딩했습니다. 작품은 꽤 좋지만 마지막 단계에서 사건을 집계/그룹화하는 방법을 궁금해합니다. 각 사건에 일련 번호를 추가하려고했지만 마지막 단계에서 오류가 발생했습니다. 코드를 검토하고 완성 할 수 있다면 멋지겠습니까? 따라서 샘플 데이터와 스크립트를 업로드했습니다. 환경은 1.4 스파크입니다 (PySpark) :

+0

대용량 데이터 세트가 아니기 때문에 테라도 아닙니다. 큰 하나! :) 지금까지 뭐 해봤 어..? – gsamaras

+0

나는 그것을 해결할 수있는 아이디어를 얻기 위해 수 많은 게시물과 예제를 읽었습니다. 아직 아무 것도 구현하지 않았습니다. 하지만 첫 번째 시도는 isin-function을 사용하여 변수를 df2로 필터링 한 다음 사건 데이터 프레임을 브로드 캐스팅하고 df2에 대한 맵을 사용합니다. 그러나 각 사건에 대한 데이터 프레임 (관찰)을 얻는 방법을 확신하지 못합니다. 어떻게 든 붙어있다. – Matthias

+0

'join'은 합리적인 시작 pont처럼 보입니다. 당신은 Cartesian 제품을 피하기에 충분하며 500 레코드가 있으면 브로드 캐스트 조인에 쉽게 최적화 될 수 있습니다. – zero323

답변

1

일반적으로 오직 첫 번째 접근 방식은 나에게 합리적인 보인다.정확한 기록 및 유통의 수에 전략을 결합하지만 당신은 최고 수준의 데이터 프레임을 만들 수 있습니다 :

incidents_ = sc.parallelize([ 
    (incident,) for incident in incidents 
]).toDF(["incident"]) 

for var_ in variables_of_interest: 
    df = spark.read.parquet("/some/path/Variable={0}".format(var_)) 
    df.join(incidents_, same_time) 
:

ref = sc.parallelize([(var_, incident) 
    for var_ in variables_of_interest: 
    for incident in incidents 
]).toDF(["var_", "incident"]) 

단순히 join

same_var = col("Variable") == col("var_") 
same_time = col("Time").between(
    col("incident.startTime"), 
    col("incident.endTime") 
) 

ref.join(df.alias("df"), same_var & same_time) 

또는 수행하는 특정 파티션에 대해 조인

선택적으로 marking one side as small enough to be broadcasted.

+0

흠, 예를 들어 주셔서 감사합니다. 그것을 이해할 시간이 필요합니다. Skype에서 저에게 연락 할 수 있습니까? 그냥 몇 가지 세부 사항을 논의하기 위해? nagilo12345 – Matthias

+0

죄송합니다 @Matthias, 더 이상 계정이 없습니다. – zero323

+0

안녕하세요. 내가 스크립트에서 코드를 사용하여 시도하고 지금까지 잘 작동합니다. 내가 얻지 못하는 유일한 방법은 최종 합류 단계 후에 인시던트 번호별로 결과 프레임에서 데이터를 선택하는 데 사용할 수있는 각 인시던트에 숫자를 추가하는 방법입니다. 위의 업데이트 된 질문에서 스크립트를 찾을 수 있습니다. 고맙습니다. 감사합니다. – Matthias

관련 문제