2016-10-03 2 views
0

왜 이런 일이 발생하는지 잘 모르겠습니다. PySpark에서 두 개의 데이터 프레임을 읽고 열 이름을 출력했는데 예상대로 였지만 SQL 결합시에는 입력이있는 경우 열 이름을 확인할 수없는 오류가 발생합니다. 병합 작업을 단순화했습니다. 그러나 더 많은 조인 조건을 추가해야합니다. SQL을 사용하는 이유입니다 ("and b.mnvr_bgn < a.idx_trip_id 및 b.mnvr_end> a.idx_trip_data "). AnalysisException : 그것은 열 'DEVICE_ID'이 DF mnvr_temp_idx_prev_tempSparkSQL에서 열 이름을 확인할 수 없습니다. join

mnvr_temp_idx_prev = mnvr_3.select('device_id', 'mnvr_bgn', 'mnvr_end') 
print mnvr_temp_idx_prev.columns 
['device_id', 'mnvr_bgn', 'mnvr_end'] 

raw_data_filtered = raw_data.select('device_id', 'trip_id', 'idx').groupby('device_id', 'trip_id').agg(F.max('idx').alias('idx_trip_end')) 
print raw_data_filtered.columns 
['device_id', 'trip_id', 'idx_trip_end'] 

raw_data_filtered.registerTempTable('raw_data_filtered_temp') 
mnvr_temp_idx_prev.registerTempTable('mnvr_temp_idx_prev_temp') 
test = sqlContext.sql('SELECT a.device_id, a.idx_trip_end, b.mnvr_bgn, b.mnvr_end \ 
          FROM raw_data_filtered_temp as a \ 
          INNER JOIN mnvr_temp_idx_prev_temp as b \ 
           ON a.device_id = b.device_id') 

역 추적 (가장 최근 통화 마지막)에서 '_col7'로 이름이 변경되고 있음을 표시 u는 "해결할 수없는 주어진 입력 열을 'b.device_id' : [_col7, trip_id, device_id, mnvr_end, mnvr_bgn, idx_trip_end]; line 1 pos 237 "

도움을 주시면 감사하겠습니다.

+0

전체 코드를 게시하십시오 –

+0

내 전체 코드가 약 1000 줄이므로 실제로 옵션이 아닙니다 – Amber

+0

SQL 문 대신 Join 용 DataFrames를 사용해 보셨습니까? 차이가별로 없지만 같은 문제가 데이터 프레임에서도 발생하는지 알고 싶습니다. – dheee

답변

1

적어도 하나의 데이터 프레임에서 'device_id'필드의 이름을 바꾸는 것이 좋습니다. 귀하의 쿼리를 조금 수정하고 그것을 (스칼라로) 테스트했습니다. 아래의 쿼리 작동

위의 문에서 '선택 *'을 수행하면 작동합니다. 그러나 'device_id'를 선택하려고하면 "reference 'device_id'가 모호합니다."라는 오류 메시지가 나타납니다. 위의 '테스트'데이터 프레임 정의에서 볼 수 있듯이 두 개의 필드 (device_id)는 같은 이름을 갖습니다. 따라서이를 방지하려면 데이터 프레임 중 하나에서 필드 이름을 변경하는 것이 좋습니다.

mnvr_temp_idx_prev = mnvr_3.select('device_id', 'mnvr_bgn', 'mnvr_end') 
          .withColumnRenamned("device_id","device") 

raw_data_filtered = raw_data.select('device_id', 'trip_id', 'idx').groupby('device_id', 'trip_id').agg(F.max('idx').alias('idx_trip_end')) 

지금 당신의 문제에 대한 작동 쿼리 위 SQL 컨텍스트

test = sqlContext.sql("select * FROM raw_data_filtered_temp a INNER JOIN mnvr_temp_idx_prev_temp b ON a.device_id = b.device and a. idx_trip_id < b.mnvr_bgn") 

에 dataframes 또는는 SqlContext

//using dataframes with multiple conditions 
    val test = mnvr_temp_idx_prev.join(raw_data_filtered,$"device" === $"device_id" 
                && $"mnvr_bgn" < $"idx_trip_id","inner") 

//를 사용합니다. 그리고 데이터 세트가 너무 큰 경우 조인 조건에 '>'또는 '<'연산자를 사용하지 않는 것이 좋습니다. 교차 결합은 데이터 세트가 큰 경우 값 비싼 연산입니다. 대신 WHERE 조건에서 사용하십시오.

+0

첫 코멘트에 대해서는 dataframes join을 사용해 보았는데 같은 오류가있었습니다. 데이터 프레임 중 하나에서 열의 이름을 변경하면 문제가 해결되었습니다! 이제는 모두 예상대로 실행 중입니다. 감사! '>'및 '<'을 (를) 사용하는 대신 조인이 아닌 where 문을 사용해 주셔서 감사합니다. 내 데이터가 실제로 크기가 커서 조인과 비교할 때 비용 차이가 있다는 것을 인식하지 못했습니다. 어디에. 그래서이 제안은 매우 높이 평가됩니다! – Amber

+0

@Amber 기쁜 소식입니다. – dheee

관련 문제