2017-09-28 3 views
0

사용자 정의 파티션에 두 개의 다른 스키마를 가입하는 방법 안녕하세요 두 텍스트 파일을 가지고 고유 한 하나를 만들려면이 두 텍스트 파일에 가입해야합니다. 데이터 프레임을 사용하여이를 달성했습니다.withColumnRenamed 모든 열을 작성하고 스파크 데이터 프레임

텍스트 파일은 일부 필드를 제외하고는 모두 동일한 구조를 가지고 있습니다.

이제 데이터 프레임을 만들고 두 데이터 프레임을 조인해야합니다.

질문 1 : 몇 가지 추가 입력란이있는 데이터 프레임에 모두 어떻게 참여합니까? 예를 들어 내 스키마가 처음으로 TimeStamp로 저장되었지만 첫 번째 DataFrame에 TimeStamp 필드가 없습니다.

질문 2 : 조인 후 열을 선택하기 위해 모든 열의 이름을 바꾸어야하고 열 이름이 29 열이므로 29 열을 써야합니다. 많은 방법없이 이름을 바꿀 수있는 방법이 있습니다. 시간.

질문 3 : 가입 후 일부 파일을 기반으로 출력을 저장해야합니다. 예를 들어 StatementTypeCode가 BAL이면 BAL에 속한 모든 레코드는 map reduce의 사용자 정의 파티션과 마찬가지로 하나의 파일로 이동합니다. 이것이 내가이 정확해야 희망 latestForEachKey.write.partitionBy("StatementTypeCode")을 시도한 것입니다

..

나는 내가 .I 그래서 모든 구문과 모든 개념에 문제를 직면 스파크 스칼라을 배우고 하나 개의 게시물에 너무 많은 질문을 한 것을 알고있다. 제 질문이 분명하기를 바랍니다.

여기에 제가 지금하고있는 일에 대한 제 코드가 있습니다.

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
     import sqlContext.implicits._ 

     import org.apache.spark.{ SparkConf, SparkContext } 
     import java.sql.{Date, Timestamp} 
     import org.apache.spark.sql.Row 
     import org.apache.spark.sql.types.{ StructType, StructField, StringType, DoubleType, IntegerType,TimestampType } 
     import org.apache.spark.sql.functions.udf 

     val schema = StructType(Array(

    StructField("TimeStamp", StringType), 
    StructField("LineItem_organizationId", StringType), 
    StructField("LineItem_lineItemId", StringType), 
    StructField("StatementTypeCode", StringType), 
    StructField("LineItemName", StringType), 
    StructField("LocalLanguageLabel", StringType), 
    StructField("FinancialConceptLocal", StringType), 
    StructField("FinancialConceptGlobal", StringType), 
    StructField("IsDimensional", StringType), 
    StructField("InstrumentId", StringType), 
    StructField("LineItemLineItemName", StringType), 
    StructField("PhysicalMeasureId", StringType), 
    StructField("FinancialConceptCodeGlobalSecondary", StringType), 
    StructField("IsRangeAllowed", StringType), 
    StructField("IsSegmentedByOrigin", StringType), 
    StructField("SegmentGroupDescription", StringType), 
    StructField("SegmentChildDescription", StringType), 
    StructField("SegmentChildLocalLanguageLabel", StringType), 
    StructField("LocalLanguageLabel_languageId", StringType), 
    StructField("LineItemName_languageId", StringType), 
    StructField("SegmentChildDescription_languageId", StringType), 
    StructField("SegmentChildLocalLanguageLabel_languageId", StringType), 
    StructField("SegmentGroupDescription_languageId", StringType), 
    StructField("SegmentMultipleFundbDescription", StringType), 
    StructField("SegmentMultipleFundbDescription_languageId", StringType), 
    StructField("IsCredit", StringType), 
    StructField("FinancialConceptLocalId", StringType), 
    StructField("FinancialConceptGlobalId", StringType), 
    StructField("FinancialConceptCodeGlobalSecondaryId", StringType), 
    StructField("FFFFAction", StringType))) 


     val textRdd1 = sc.textFile("s3://trfsdisu/SPARK/Main.txt") 
     val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1))) 
     var df1 = sqlContext.createDataFrame(rowRdd1, schema).drop("index") 

     val textRdd2 = sc.textFile("s3://trfsdisu/SPARK/Incr.txt") 
     val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1))) 
     var df2 = sqlContext.createDataFrame(rowRdd2, schema) 

     // df2.show(false) 

     import org.apache.spark.sql.expressions._ 
     val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(TimestampType).desc) 

     val latestForEachKey = df2.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp") 
     .withColumnRenamed("StatementTypeCode", "StatementTypeCode_1").withColumnRenamed("LineItemName", "LineItemName_1").withColumnRenamed("FFAction", "FFAction_1") 

    //This is where i need help withColumnRenamed part 


    val df3 = df1.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer") 
      .select($"LineItem_organizationId", $"LineItem_lineItemId", 
      when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"), 
      when($"LineItemName_1".isNotNull, $"LineItemName_1").otherwise($"LineItemName").as("LineItemName"), 
      when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction").as("FFAction")).filter(!$"FFAction".contains("D")) 

     df3.show() 

답변

0

스키마 부분이

val df1 = sqlContext.createDataFrame(rowRdd1, new StructType(schema.tail.toArray)) 
같이 해결할 수
관련 문제