2017-03-09 2 views
0

년 및 한달 동안 사전을 생성하려고합니다. 내가 요구할 수있는 매크로의 종류. 년과 달의. 그런 내가, 내가 팬더에서이 작업을 수행 할 수 아래 그림과 같이 하나의 pyspark의 dataframe에 모든 dataframe을 연결하려는pyspark에서 데이터 프레임 사전 생성하기

df = spark.createDataFrame([(1, "foo1",'2016-1-31'),(1, "test",'2016-1-31'), (2, "bar1",'2012-1-3'),(4, "foo2",'2011-1-11')], ("k", "v","date")) 
w = Window().partitionBy().orderBy(col('date').desc())   
df = df.withColumn("next_date",lag('date').over(w).cast(DateType())) 
df = df.withColumn("next_name",lag('v').over(w)) 
df = df.withColumn("next_date",when(col("k") != lag(df.k).over(w),date_add(df.date,605)).otherwise(col('next_date'))) 
df = df.withColumn("next_name",when(col("k") != lag(df.k).over(w),"").otherwise(col('next_name'))) 

import copy 
dict_of_YearMonth = {} 

for yearmonth in [200901,200902,201605 .. etc]: 

    key_name = 'Snapshot_'+str(yearmonth) 
    dict_of_YearMonth[key_name].withColumn("test",yearmonth) 
    dict_of_YearMonth[key_name].withColumn("test_date",to_date(''+yearmonth[:4]+'-'+yearmonth[4:2]+'-1'+'')) 
# now i want to add a condition 
    if(dict_of_YearMonth[key_name].test_date >= dict_of_YearMonth[key_name].date) and (test_date <= next_date) then output snapshot_yearmonth /// i.e dataframe which satisfy this condition i am able to do it in pandas but facing challenge in pyspark 
dict_of_YearMonth[key_name] 
dict_of_YearMonth 

df라고 pyspark 동적 열을 추가하는 동안 나는 도전에 직면하고있다하지만 난 pyspark 수행해야

snapshots=pd.concat([dict_of_YearMonth['Snapshot_201104'],dict_of_YearMonth['Snapshot_201105']]) 

동적 인 데이터 프레임의 사전을 생성하고 컬럼을 동적으로 추가하고 조건을 수행하고 연도 기반 데이터 프레임을 생성하고 단일 데이터 프레임으로 병합하는 다른 방법이 있다면. 어떤 도움을 주시면 감사하겠습니다.

답변

0

내가 코드를 아래의 시도가하여 여러개의 dataframes 코드 아래 사용 추가하려면

// Function to append all the dataframe using union 
def unionAll(*dfs): 
return reduce(DataFrame.unionAll, dfs) 

// convert dates 
def is_date(x): 
    try: 
     x= str(x)+str('01') 
     parse(x) 
     return datetime.datetime.strptime(x, '%Y%m%d').strftime("%Y-%m-%d") 
    except ValueError: 
     pass # if incorrect format, keep trying other format 

dict_of_YearMonth = {} 
for yearmonth in [200901,200910]: 
key_name = 'Snapshot_'+str(yearmonth) 
dict_of_YearMonth[key_name]=df 
func = udf(lambda x: yearmonth, StringType()) 
dict_of_YearMonth[key_name] = df.withColumn("test",func(col('v'))) 
default_date = udf (lambda x : is_date(x)) 
dict_of_YearMonth[key_name] = dict_of_YearMonth[key_name].withColumn("test_date",default_date(col('test')).cast(DateType())) 
dict_of_YearMonth 

을하고있다 :

final_df = unionAll(dict_of_YearMonth['Snapshot_200901'], dict_of_YearMonth['Snapshot_200910']) 
+0

덕분에 일했다! –

관련 문제