2017-01-07 3 views
2

나는 불꽃에 새로운 오전 나는 그런 데이터를 CSV 파일을 가지고 :pyspark의 문자열 및 집계

date,   accidents, injured 
2015/20/03 18:00 15,   5 
2015/20/03 18:30 25,   4 
2015/20/03 21:10 14,   7 
2015/20/02 21:00 15,   6 

내가이 일어난 때의 특정 시간에 의해이 데이터를 집계하고 싶습니다. 내 생각은 '년/월/일 hh'에 날짜를 문자열로 배열하여 분을 만들지 않고 키를 만들 수 있도록하는 것입니다. 나는 사고의 평균을주고 싶었고 매 시간마다 다쳤다. 아마도 pyspark와는 다른 똑똑한 방법이 있을까요?

고마워요!

답변

4

글쎄, 그것은 당신이 나중에 할 일에 달려 있다고 생각합니다. 당신이 제안으로

가장 간단한 방법은 이렇게하는 것입니다 : 당신은, 그러나, 나중에 좀 더 계산을 수행 할 경우

data = [('2015/20/03 18:00', 15, 5), 
    ('2015/20/03 18:30', 25, 4), 
    ('2015/20/03 21:10', 14, 7), 
    ('2015/20/02 21:00', 15, 6)] 
df = spark.createDataFrame(data, ['date', 'accidents', 'injured']) 

df.withColumn('date_hr', 
       df['date'].substr(1, 13) 
    ).groupby('date_hr')\ 
     .agg({'accidents': 'avg', 'injured': 'avg'})\ 
     .show() 

, 당신은에 데이터를 구문 분석 할 수 있습니다 : 날짜 문자열 다음 집계 하위 문자열 a TimestampType() 그런 다음 그 날짜와 시간을 추출하십시오.

import pyspark.sql.types as typ 
from pyspark.sql.functions import col, udf 
from datetime import datetime 

parseString = udf(lambda x: datetime.strptime(x, '%Y/%d/%m %H:%M'), typ.TimestampType()) 
getDate = udf(lambda x: x.date(), typ.DateType()) 
getHour = udf(lambda x: int(x.hour), typ.IntegerType()) 

df.withColumn('date_parsed', parseString(col('date'))) \ 
    .withColumn('date_only', getDate(col('date_parsed'))) \ 
    .withColumn('hour', getHour(col('date_parsed'))) \ 
    .groupby('date_only', 'hour') \ 
    .agg({'accidents': 'avg', 'injured': 'avg'})\ 
    .show() 
+0

은 y [0] [: 13]을 사용하여 쉽게 매핑 할 때 부분 문자열로 관리됩니다. 귀하의 솔루션이 더 우아 해 보입니다. 고맙습니다! 질문이 하나 더 있습니다. 다른 데이터가있는 다른 파일을 다른 연도에서 말하면, 그 사고와 부상의 평균은 어떻게 될까요? 하나의 파일에 모든 것을 넣은 다음 계산을 실행 하시겠습니까? – sampak

+0

나는 그 파일을 읽고 그 데이터에 대해서만 집계를하거나, 필요하다면 한번에 결과를 얻고 (그리고 Spark 2.0을 사용한다고 가정 할 때), .union (...) 2 개 (또는 그 이상)의 'DataFrames'를 함께 사용합니다. http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.union – TDrabas