2017-10-24 2 views
1

시간 기간 (나노초로 표현 : 두 개의 Long, 시작에 하나, 끝으로 하나)과 측정 값을 보유한 레코드 목록을 포함하는 하나의 데이터 세트가 있습니다. 값이 변경되는 기간 만 보유하는 새로운 집계 데이터 집합을 만들어야합니다. 예를 들어 :Apache Spark에서 timeseries 데이터를 집계하는 방법

input dataset: 
    +-----+-----+-----+ 
    |start|end |value| 
    +-----+-----+-----+ 
    |123 |124 |1 | 
    |124 |128 |1 | 
    |128 |300 |2 | 
    |300 |400 |2 | 
    |400 |500 |3 | 

    result dataset: 
    +-----+-----+-----+ 
    |start|end |value| 
    +-----+-----+-----+ 
    |123 |128 |1 | 
    |128 |400 |2 | 
    |400 |500 |3 | 

나는 작은 데이터 세트에이 작업을 수행하는 방법을 알고 있지만, 맵리 듀스 패러다임, 그리고 아파치 스파크를 사용하는 방법을 모른다.

Apache Spark에서 자바를 구현하는 방법을 알려주세요.

+0

예를 들어, 지연을 사용하여 마지막 값을 얻은 다음 현재 및 최종 값이 다른지 여부에 따라 데이터 세트를 필터링하십시오. –

+0

만약 다른 행이 130 200 1이라면 출력은 무엇입니까 –

+0

우리는 창 함수를 사용할 수 있지만 스파크는 시계열 데이터를 처리하는데 최적이 아닙니다. 선택의 여지가없는 경우 https : // github .com/sryza/spark-timeseries – SanthoshPrasad

답변

1

매우 간단합니다. groupBy를 사용하여 min과 max를 찾은 다음 데이터 세트를 결합하십시오.

// df is original dataset 
Dataset<Row> df_start = df.groupBy("value").min("start").withColumnRenamed("min(start)", "start").withColumnRenamed("value", "value_start"); 
Dataset<Row> df_end = df.groupBy("value").max("end").withColumnRenamed("max(end)", "end").withColumnRenamed("value", "value_end"); 

Dataset<Row> df_combined = df_start.join(df_end, df_start.col("value_start").equalTo(df_end.col("value_end"))).drop("value_end").withColumnRenamed("value_start", "value").orderBy("value"); 

df_combined.show(false); 
+-----+-----+---+ 
|value|start|end| 
+-----+-----+---+ 
|1 |123 |128| 
|2 |128 |400| 
|3 |400 |700| 
+-----+-----+---+ 
0

하나의 접근법은 "각 고유 값에 대해 인접한 모든 시간 범위를 찾아 값을 합친 것"으로 문제를 표현하는 것입니다. 이러한 이해를 통해 groupBy 값을 사용하여 각 값에 startend의 목록을 만들 수 있습니다. 그런 다음 사용자 정의 함수를 사용하여 이러한 함수를 연속 시간 범위로 축소 할 수 있습니다.

극단적으로 데이터 세트에서 디스크 전용 지속성 레벨을 사용하는 경우 유일한 요구 사항은 메모리에 start_end 초의 단일 행을 넣을 수 있어야한다는 것입니다. 따라서 대부분의 클러스터에 대해이 접근법의 상한선을 start_end 쌍의 값으로 설정합니다. 여기

가 구현 한 예이다 (요청에 따라 자바 API를 사용하여 - 스칼라는 상당히 덜 장황 것) :

public class JavaSparkTest { 

    public static void main(String[] args){ 
     SparkSession session = SparkSession.builder() 
       .appName("test-changes-in-time") 
       .master("local[*]") 
       .getOrCreate(); 
     StructField start = createStructField("start", DataTypes.IntegerType, false); 
     StructField end = createStructField("end", DataTypes.IntegerType, false); 
     StructField value = createStructField("value", DataTypes.IntegerType, false); 
     StructType inputSchema = createStructType(asList(start,end,value)); 
     StructType startEndSchema = createStructType(asList(start, end)); 
     session.udf().register("collapse_timespans",(WrappedArray<Row> startEnds) -> 
       JavaConversions.asJavaCollection(startEnds).stream() 
        .sorted((a,b)->((Comparable)a.getAs("start")).compareTo(b.getAs("start"))) 
        .collect(new StartEndRowCollapsingCollector()), 
       DataTypes.createArrayType(startEndSchema) 
     ); 
     Dataset<Row> input = session.createDataFrame(asList(
       RowFactory.create(123, 124, 1), 
       RowFactory.create(124, 128, 1), 
       RowFactory.create(128, 300, 2), 
       RowFactory.create(300, 400, 2), 
       RowFactory.create(400, 500, 3), 
       RowFactory.create(500, 600, 3), 
       RowFactory.create(600, 700, 3) 
     ), inputSchema); 
     Dataset<Row> startEndByValue = input.selectExpr("(start start, end end) start_end", "value"); 
     Dataset<Row> startEndsByValue = startEndByValue.groupBy("value").agg(collect_list("start_end").as("start_ends")); 
     Dataset<Row> startEndsCollapsed = startEndsByValue.selectExpr("value", "explode(collapse_timespans(start_ends)) as start_end"); 
     Dataset<Row> startEndsInColumns = startEndsCollapsed.select("value", "start_end.start", "start_end.end"); 
     startEndsInColumns.show(); 
    } 

    public static class StartEndRowCollapsingCollector implements Collector<Row, List<Row>, List<Row>>{ 

     @Override 
     public Supplier<List<Row>> supplier() { 
      return()-> new ArrayList<Row>(); 
     } 

     @Override 
     public BiConsumer<List<Row>, Row> accumulator() { 
      return (rowList, row) -> { 
       // if there's no rows in the list or the start doesn't match the current end 
       if(rowList.size()==0 || 
         !rowList.get(rowList.size()-1).getAs(1).equals(row.getAs(0))){ 
        rowList.add(row); 
       } else { 
        Row lastRow = rowList.remove(rowList.size()-1); 
        rowList.add(RowFactory.create(lastRow.getAs(0), row.getAs(1))); 
       } 
      }; 
     } 

     @Override 
     public BinaryOperator<List<Row>> combiner() { 
      return (a,b)->{ throw new UnsupportedOperationException();}; 
     } 

     @Override 
     public Function<List<Row>, List<Row>> finisher() { 
      return i->i; 
     } 

     @Override 
     public Set<Characteristics> characteristics() { 
      return Collections.EMPTY_SET; 
     } 
    } 
} 

그리고 프로그램 출력 :

+-----+-----+---+ 
|value|start|end| 
+-----+-----+---+ 
| 1| 123|128| 
| 3| 400|700| 
| 2| 128|400| 
+-----+-----+---+ 

공지 값이 없습니다 순서대로. 이는 스파크가 데이터 세트를 분할하고 값 행을 처리했기 때문이며 행 순서에 어떤 중요성도 지정하지 않도록했기 때문입니다. 당신은 시간 또는 값을 정렬 된 출력을 요구해야합니다. 물론 평소와 같이 정렬 할 수 있습니다.

관련 문제