2017-10-24 4 views
0

JavaRDD를 사용하여 Spark에서이 코드를 다시 작성했습니다. 나는 그 그룹을 읽었다 .ByKey는 값 비싼 조작이다.JavaPairRDD에서 그룹화를 피하십시오. Apache Spark

groupByKey를 피함으로써이를 다시 작성할 수 있습니까?

키별로 그룹화 한 후 적용 가능한 경우 키 값을 업데이트하려고합니다.

어떤 사람은 내가 비슷하지만 How to update column based on a condition (a value in a group)?

+0

(이다 나는 잘못 될 수있다, isApplicable에 달려있다.). 유사한 질문에 대한 답변은 [here] (https://stackoverflow.com/a/46823497/7579547)를 참조하십시오. – Shaido

답변

1

GroupByKey에서 피해야한다 자바에서 어떤 일을하려고

List<Items> items = getItems(); 
    Map<String, List<ItemId>> itemsByName = items.stream() 
      .collect(Collectors.groupingBy(ItemId::getName, Collectors.toList())); 

    List<ItemId> newItems = itemsByName.entrySet().stream() 
      .collect(Collectors.toMap(e -> e.getKey(), e -> { 
      //update values if applicable 
       List<ItemId> rps = e.getValue().stream().filter(s -> s.isApplicable()).collect(Collectors.toList()); 
       return rps.isEmpty() ? e.getValue() : rps; 
      })) 
      .values().stream() 
      .flatMap(x -> x.stream()).collect(Collectors.toList()); 

JavaRDD

JavaRDD<Items> items = getItemsRDD(); 
    JavaPairRDD<String, ItemId> itemsByName = 
      items.mapToPair(e -> new Tuple2<String, ItemId>(e.getName(), e)); 

    JavaRDD<ItemId> newItems= itemsByName.groupByKey().mapValues(x->{ 
     //update values if applicable 
     List<ItemId> e = new ArrayList<>(); 
     x.iterator().forEachRemaining(e::add); 
     List<ItemId> rps = e.stream().filter(s -> s.isApplicable()).collect(Collectors.toList()); 
     return rps.isEmpty() ? e: rps; 
    }).flatMap(x->x._2); 

를 할 수 있습니다. reduceByKey를 대신 사용해보십시오. 동일한 키를 사용하여 데이터를 셔플 링하기 전에 각 파티션에 함수를 적용합니다.

데이터가 덜 섞이면수록 좋습니다. 여기

는 좋은 예 나는 당신이 대폭 반환 값의 크기를 줄일 수 집계의 일종을 수행 할 것 같지 않기 때문에 당신이`groupByKey`를 사용하지 않음으로써 많은 것을 얻을 것입니다 생각하지 않습니다 https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html