2017-05-05 4 views
1

사례 클래스의 스키마로 행을 만들어 내지도 함수 중 하나를 테스트하고 싶습니다. 나는이 일을 생각할 수있는 가장 간단한 방법은 다음과 같습니다 Spark-Scala에서 정의 된 스키마로 행을 작성하려면 어떻게해야합니까?

import org.apache.spark.sql.Row 

case class MyCaseClass(foo: String, bar: Option[String]) 

def buildRowWithSchema(record: MyCaseClass): Row = { 
    sparkSession.createDataFrame(Seq(record)).collect.head 
} 

그러나 이것은 단지 하나의 행을 얻을 수있는 오버 헤드가 많은 것 같았다, 그래서 내가 직접 스키마가있는 행을 만들 수있는 방법으로 보았다. 이로 인해 다음과 같은 메시지가 나타납니다.

import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 
import org.apache.spark.sql.{Encoders, Row} 

def buildRowWithSchemaV2(record: MyCaseClass): Row = { 
    val recordValues: Array[Any] = record.getClass.getDeclaredFields.map((field) => { 
     field.setAccessible(true) 
     field.get(record) 
    }) 
    new GenericRowWithSchema(recordValues, Encoders.product[MyCaseClass].schema) 
} 

불행히도 두 번째 버전에서 반환하는 행은 첫 번째 행과 다릅니다. 첫 번째 버전의 옵션 필드는 기본 값으로 줄어들지 만 두 번째 버전의 옵션 필드는 계속 유지됩니다. 또한 두 번째 버전은 상당히 다루기 힘듭니다.

더 좋은 방법이 있나요?

답변

0

두 번째 버전은 bar 사례 클래스 필드에 대해 Option 자체를 반환하므로 첫 번째 버전으로 기본 값을 가져 오지 못합니다. 당신은 원시 값

def buildRowWithSchemaV2(record: MyCaseClass): Row = { 
    val recordValues: Array[Any] = record.getClass.getDeclaredFields.map((field) => { 
    field.setAccessible(true) 
    val returnValue = field.get(record) 
    if(returnValue.isInstanceOf[Option[String]]){ 
     returnValue.asInstanceOf[Option[String]].get 
    } 
    else 
     returnValue 
    }) 
    new GenericRowWithSchema(recordValues, Encoders.product[MyCaseClass].schema) 
} 

에 대해 다음 코드를 사용할 수 있습니다 그러나 그 사이에 당신이 사용하는 제안 DataFrame 또는 DataSet.

DataFrameDataSet은 각각 Row with schema의 모음입니다. 당신이 case class 정의가있을 때
그래서, 당신은 단지 예를 들어 case class 으로 입력 데이터를 encode해야합니다 은 텍스트 파일은 당신이 그것을 읽을 수있는 경우가

val data = Seq(("test1", "value1"),("test2", "value2"),("test3", "value3"),("test4", null)) 

등의 입력 데이터를 갖고 있다고 할 수 있습니다 sparkContext.textFilesplit을 사용하십시오.
당신은 dataframe 또는 dataset로 변환하는 dataset 은 따라서 당신이 확인을 위해 Rows with schema
의 컬렉션이 생성하는 것 두 줄의 코드

import sqlContext.implicits._ 
val dataFrame = data.map(d => MyCaseClass(d._1, Option(d._2))).toDF 

.toDS이다, RDD에 데이터를 변환 한 지금 때 당신은 다음을 수행 할 수 있습니다

println(dataFrame.schema) //for checking if there is schema 

println(dataFrame.take(1).getClass.getName) //for checking if it is a collection of Rows 

옳은 대답을 희망합니다.

+0

'take (1)'은'collect.head'보다 훨씬 효율적입니다. 나는 영업 사원이 RDD 창설을 완전히 건너 뛰고 싶어한다고 믿는다. –

+0

@HristoIliev, 나는'dataframe' 또는'dataset'이'schema'를 가진'Row' 콜렉션이라는 것을 검증/보여주기 위해'take (1)'을했습니다. –

+1

그러나 OP는 'DataSet'또는 'DataFrame'이 아닌 하나의 'Row'를 원합니다. 이제는 왜 단일 요소가있는 분산 컬렉션에서지도 함수를 쉽게 테스트 할 수 있기 때문에 그 이유가 확실하지 않습니다. –

관련 문제