2016-11-10 6 views
-2

두 가지 조건에서 집계 (집계)하고 두 개의 다른 열에 할당하려고합니다. 아무도 나에게 쉬운 방법을 제안 할 수 있습니까? 이 있었어야Pyspark 데이터 프레임 집계 - 다른 조건으로 계산하십시오.

num_ins_rec_cnt = F.count(col("ins_upd_flag") == "I").alias("ins_rec_cnt") 
num_upd_rec_cnt = F.count(col("ins_upd_flag") == "U").alias("upd_rec_cnt") 
delta_process_max_ld_df = cdc_all_record_sk_ld_df.agg(F.max('delta_account_sk_id').alias("max_account_sk_id"),(num_ins_rec_cnt),(num_upd_rec_cnt)).withColumn("lkp_process_name",lit(process_name)).withColumn("history_tbl_cnt",lit(base_rec_count)).withColumn("delta_tbl_cnt",lit(delta_rec_count)) 

출력은,

+-----------------+-----------+-----------+--------------------+---------------+-------------+ 
|max_account_sk_id|ins_rec_cnt|upd_rec_cnt| lkp_process_name|history_tbl_cnt|delta_tbl_cnt| 
+-----------------+-----------+-----------+--------------------+---------------+-------------+ 
|   25099|  5100|  5100|amc_account_delta_ld|   19999|  20099| 
+-----------------+-----------+-----------+--------------------+---------------+-------------+ 

하지만 필터 조건이 다르지만 카운트가 동일오고있다 시도하는 아래의 솔루션에서

,

+-------+---------------+--+ 
| _c0 | ins_upd_flag | 
+-------+---------------+--+ 
| 5100 | I    | 
| 5000 | U    | 

Sample Data: 
+--------------+-------------------+--------------+-------------------+--------------------+------------------+-------------------+--------------------+--------------+------------+ 
|delta_acct_nbr|delta_account_sk_id|delta_zip_code|delta_primary_state|delta_eff_start_date|delta_eff_end_date|  delta_load_tm|  delta_hash_key|delta_eff_flag|ins_upd_flag| 
+--------------+-------------------+--------------+-------------------+--------------------+------------------+-------------------+--------------------+--------------+------------+ 
| ID330020000|    20000|   02345|     CA|   2016-11-10|  3099-12-31|2016-11-10 14:53:52|19DEDD4F9A55845E8...|    Y|   I| 
| ID330020001|    20001|   02345|     CA|   2016-11-10|  3099-12-31|2016-11-10 14:53:52|19DEDD4F9A55845E8...|    Y|   I| 
| ID330020002|    20002|   02345|     CA|   2016-11-10|  3099-12-31|2016-11-10 14:53:52|19DEDD4F9A55845E8...|    Y| 
+0

예 .. 올바른 .. 내가 별도의 컬럼으로 테이블에 작성해야한다. 필자는 필터 출력을 두 개의 별도 컬럼으로 필요로한다. – user3858193

답변

-1

아래의 방법으로 문제를 해결했습니다.

delta_process_max_ld_df cdc_all_record_sk_ld_df.withColumn = ('ins_upd_flag_cnt'**** F.when (cdc_all_record_sk_ld_df.ins_upd_flag == 'I'는, 1) .when (cdc_all_record_sk_ld_df.ins_upd_flag == 'U', 0) .otherwise (0)) .agg ('del_account_sk_id') 별칭 ("max_surrogate_id"), F.sum ('ins_upd_flag_cnt') 별칭 ("insert_record_cnt"), F.count ('*'). 별칭 ("process_run_date", lit (load_dt))) .Column ("base_tbl_cnt", lit (base_rec_count)). withColumn ("delta_tbl_cnt")) .withColumn ("process_name", lit (process_name) (* status_tbl_columns) delta_process_max_ld_df1 = (선택 항목) * 선택 (* status_tbl_columns) * 선택 (* status_tbl_columns) delta_process_max_ld_df.withColu 미네소타 ("upd_record_cnt"조명 (delta_process_max_ld_df.ins_upd_count - delta_process_max_ld_df.insert_record_cnt)) ALL 기타 사항 서보 -OFF (* status_tbl_columns)

관련 문제