2017-09-15 7 views
0

에 기초한 평균 계산 I 두 dataframes 있습니다분할 스파크 dataframe 한 열 값

Class, Calculation 
first, Average 
Second, Sum 
Third, Average 

둘째 dataframe studentRecord는 다음과 같이 약 50K 엔트리 갖는다 :

우선 dataframe classRecord는 다음과 같이 10 개 항목이
Name, height, Camp, Class 
Shae, 152, yellow, first 
Joe, 140, yellow, first 
Mike, 149, white, first 
Anne, 142, red, first 
Tim, 154, red, Second 
Jake, 153, white, Second 
Sherley, 153, white, Second 

클래스 유형에 따라 두 번째 데이터 프레임부터 캠프를 따로 따로 높이 (클래스 우선 : 평균, 클래스 두 번째 : 합계 등)에 대한 계산을 수행하고 싶습니다 (클래스가 전나무 평균, 흰색, 노란색, 흰색 등). 여기

//function to calculate average 
def averageOnName(splitFrame : org.apache.spark.sql.DataFrame) : Array[(String, Double)] = { 
    val pairedRDD: RDD[(String, Double)] = splitFrame.select($"Name",$"height".cast("double")).as[(String, Double)].rdd 
    var avg_by_key = pairedRDD.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(y => 1.0 * y._1/y._2).collect 
    return avg_by_key 
} 

//required schema for further modifications 
val schema = StructType(
StructField("name", StringType, false) :: 
StructField("avg", DoubleType, false) :: Nil) 

// for each loop on each class type 
classRecord.rdd.foreach{ 
    //filter students based on camps 
    var campYellow =studentRecord.filter($"Camp" === "yellow") 
    var campWhite =studentRecord.filter($"Camp" === "white") 
    var campRed =studentRecord.filter($"Camp" === "red") 

    // since I know that calculation for first class is average, so representing calculation only for class first 
    val avgcampYellow = averageOnName(campYellow) 
    val avgcampWhite = averageOnName(campWhite) 
    val avgcampRed = averageOnName(campRed) 

    // union of all 
    val rddYellow = sc.parallelize (avgcampYellow).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue())) 
    //conversion of rdd to frame 
    var dfYellow = sqlContext.createDataFrame(rddYellow, schema) 
    //union with yellow camp data 
    val rddWhite = sc.parallelize (avgcampWhite).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue())) 
    //conversion of rdd to frame 
    var dfWhite = sqlContext.createDataFrame(rddWhite, schema) 
    var dfYellWhite = dfYellow.union(dfWhite) 
    //union with yellow,white camp data 
    val rddRed = sc.parallelize (avgcampRed).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue())) 
    //conversion of rdd to frame 
    var dfRed = sqlContext.createDataFrame(rddRed, schema) 
    var dfYellWhiteRed = dfYellWhite .union(dfRed) 
    // other modifications and final result to hive 
} 

내가 고민하고 있습니다 :

1.hardcoding Yellow, red and white, there may be other camp type also. 
2. Filtering same dataframe many times 
3. Not able to figure out how to calculate differently according to class calculation type. 

도움말 감사 나는 다음 시도했다. 감사.

+0

정확하게 이해한다면 캠프와 수업 모두에 따라 평균 높이 또는 합계를 원하십니까? 캠프/클래스의 모든 조합에 대해 두 가지를 모두 계산하고,이를 데이터 프레임에 넣은 다음 'classRecord' df를 별도로 읽는 것은 어떻습니까? – Shaido

답변

0

클래스/캠프의 모든 조합에 대한 평균 및 합계 계산을 수행 한 다음 classRecord 데이터 프레임을 따로 구문 분석하고 필요한 것을 추출 할 수 있습니다. groupBy() 메서드를 사용하여 스파크에서이를 쉽게 수행하고 값을 집계 할 수 있습니다.

귀하의 예를 들어 dataframe 사용 :

val spark = SparkSession.builder.getOrCreate() 
import spark.implicits._ 

studentRecord.show() 

+-------+------+------+------+ 
| Name|height| Camp| Class| 
+-------+------+------+------+ 
| Shae| 152|yellow| first| 
| Joe| 140|yellow| first| 
| Mike| 149| white| first| 
| Anne| 142| red| first| 
| Tim| 154| red|Second| 
| Jake| 153| white|Second| 
|Sherley| 153| white|Second| 
+-------+------+------+------+ 

val df = studentRecord.groupBy("Class", "Camp").agg(
    sum($"height").as("Sum"), 
    avg($"height").as("Average"), 
    collect_list($"Name").as("Names")) 
df.show() 

+------+------+---+-------+---------------+ 
| Class| Camp|Sum|Average|   Names| 
+------+------+---+-------+---------------+ 
| first| white|149| 149.0|   [Mike]| 
| first| red|142| 142.0|   [Anne]| 
|Second| red|154| 154.0|   [Tim]| 
|Second| white|306| 153.0|[Jake, Sherley]| 
| first|yellow|292| 146.0| [Shae, Joe]| 
+------+------+---+-------+---------------+ 

이 작업을 수행 한 후, 당신은 단순히 당신이 필요로하는 행을 한 후에 고객의 첫 번째 classRecord dataframe을 확인 할 수 있습니다. 실제 모습이 바뀌면 예를 들면 다음과 같이 바뀔 수 있습니다.

// Collects the dataframe as an Array[(String, String)] 
val classRecs = classRecord.collect().map{case Row(clas: String, calc: String) => (clas, calc)} 

for (classRec <- classRecs){ 
    val clas = classRec._1 
    val calc = classRec._2 

    // Matches which calculation you want to do 
    val df2 = calc match { 
    case "Average" => df.filter($"Class" === clas).select("Class", "Camp", "Average") 
    case "Sum" => df.filter($"Class" === clas).select("Class", "Camp", "Sum") 
    } 

// Do something with df2 
} 

희망이 있습니다.

+0

부분적으로 이것과 함께, 나는 모든 이름이 "클래스, 캠프, 이름, 평균"과 같은 경우에도 속해야합니다. 최종 DF를 받더라도. 클래스에 대해 먼저 평균 (버림 합계)을 선택해야하며, 두 번째에는 합계 (버림 평균)가 필요하다는 것을 결정할 것입니다. – Swati

+0

위의 솔루션을 시도해 본 결과, 오류가 표시됩니다. 값 groupby는 org.apache.spark.rdd.RDD [String]의 멤버가 아닙니다. 감사. – Swati

+0

@ 스와 티 죄송 합니다만, 'groupBy()'는 대문자 'B'여야합니다. 솔루션 목록에 이름 목록을 추가했습니다. – Shaido