2017-02-22 3 views
2

자바 스파크에서 여러 행에 하나의 행을 분할 :어떻게이처럼 보이는 불꽃의 데이터 프레임이

id var_a var_b 
-- ----- ----- 
01 1  2 
02 3  0 

을하고 나는이

id var_name var_value 
-- -------- --------- 
01 var_a 1 
01 var_b 2 
02 var_a 3 
02 var_b 0 
처럼 여러 행에 값을 분할 할

Java Spark 1.6 API를 사용하여 가장 좋은 방법은 무엇입니까?

+0

유무도 [피벗 (한 번 봐 https://spark.apache.org/docs/1.6.0/api/java/org/apache/ spark/sql/GroupedData.html # pivot (java.lang.String)) 함수를 호출합니다. – pheeleeppoo

답변

1

새로운 FlatMapFunction이 일을했다 :

import java.util.ArrayList; 
import java.util.List; 

import org.apache.commons.lang3.ArrayUtils; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.RowFactory; 

/** 
* id var_a var_b 
* -- ----- ----- 
* 01 1  2 
* 02 3  0 
* 
* becomes 
* 
* id var_name var_value 
* -- -------- --------- 
* 01 var_a 1 
* 01 var_b 2 
* 02 var_a 3 
* 02 var_b 0 
* 
*/ 
public class OneToManyMapFunction implements FlatMapFunction<Row, Row> { 

    //indexes of fields that won't change in the new rows (id) 
    private int[] fixedFields = {0}; 
    //indexes of fields that will create new rows (var_a, var_b) 
    private int[] dynamicFields = {1, 2}; 
    //names of the dynamic fields 
    private String[] dynamicFieldsName = {"var_a", "var_b"}; 

    public OneToManyMapFunction() {} 

    @Override 
    public Iterable<Row> call(Row row) throws Exception { 

     List<Row> rows = new ArrayList<Row>(); 
     Object[] fixedValues = ArrayUtils.EMPTY_OBJECT_ARRAY; 

     //add values that won't change in the new rows 
     for (int i = 0; i < fixedFields.length; i++) { 
      fixedValues = ArrayUtils.add(fixedValues, row.get(fixedFields[i])); 
     } 

     //create new rows 
     for (int i = 0; i < dynamicFields.length; i++) { 
      //copy fixed values (id) 
      Object[] values = ArrayUtils.clone(fixedValues); 

      //add dynamic value name (var_a or var_b) 
      values = ArrayUtils.add(values, dynamicFieldsName[i]); 
      //add dynamic value 
      values = ArrayUtils.add(values, row.get(dynamicFields[i])); 

      //create new row for dynamic val 
      Row newRow = RowFactory.create(values); 
      rows.add(newRow); 
     } 

     return rows; 
    } 

} 
1

flatMap은 찾고있는 기능입니다.

하나의 레코드에서 여러 레코드를 생성 할 수 있습니다.

+0

그래도 맵핑을 수행하는 올바른 함수는 무엇입니까? new Function >()? – koopa

+0

'flatMap'의 서명을보세요 : https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/api/java/JavaRDDLike.html#flatMap(org.apache. spark.api.java.function.FlatMapFunction) 'FlatMapFunction'을 원합니다 : https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/api/java/function/ FlatMapFunction.html 그래서 새로운 함수 >이 필요합니다. –

관련 문제