2017-01-15 5 views
0

런타임에 읽을 수있는 스키마 파일을 사용하여 텍스트 파일의 입력을 데이터 프레임으로 변환하려고합니다. 내 입력 텍스트 파일은 다음과 같다 :textFile을 dataFrame으로 동적으로 변환합니다.

이 스키마 파일은 다음과 같습니다
John,23 
Charles,34 

:

object DynamicSchema { 
    def main(args: Array[String]) { 
    val inputFile = args(0) 
    val schemaFile = args(1) 
    val schemaLines = Source.fromFile(schemaFile, "UTF-8").getLines().map(_.split(":")).map(l => l(0) -> l(1)).toMap 
    val spark = SparkSession.builder() 
     .master("local[*]") 
     .appName("Dynamic Schema") 
     .getOrCreate() 
    import spark.implicits._ 
    val input = spark.sparkContext.textFile(args(0)) 
    val schema = spark.sparkContext.broadcast(schemaLines) 
    val nameToType = { 
     Seq(IntegerType,StringType) 
     .map(t => t.typeName -> t).toMap 
    } 
    println(nameToType) 
    val fields = schema.value 
     .map(field => StructField(field._1, nameToType(field._2), nullable = true)).toSeq 
    val schemaStruct = StructType(fields) 
    val rowRDD = input 
     .map(_.split(",")) 
     .map(attributes => Row.fromSeq(attributes)) 
    val peopleDF = spark.createDataFrame(rowRDD, schemaStruct) 
    peopleDF.printSchema() 

    // Creates a temporary view using the DataFrame 
    peopleDF.createOrReplaceTempView("people") 

    // SQL can be run over a temporary view created using DataFrames 
    val results = spark.sql("SELECT name FROM people") 
    results.show() 
    } 
} 

printSchema 원하는 결과를 제공하지만 :

name:string 
age:integer 

이것은 내가 뭘하려 result.show 오류가 발생했습니다. 저는 나이 필드가 실제로 toInt를 사용하여 변환되어야한다고 생각합니다. 스키마가 런타임에만 사용 가능할 때 동일한 방법을 얻을 수 있습니까?

+0

오류 로그를 게시하십시오. –

답변

1

val input = spark.read.schema(schemaStruct).csv(args(0)) 

val input = spark.sparkContext.textFile(args(0)) 

를 교체하고 스키마 정의 후 이동합니다.

+0

감사합니다 !! 매력처럼 작동했습니다. 그냥 관련 질문. StructFields 및 StructType을 올바르게 작성하는 방법은 무엇입니까? 아니면 더 좋은 해결책이 있습니까? – Shasankar

+0

'broadcast'는 의미가 없습니다. 나머지는 합리적으로 보입니다. – user7337271

관련 문제