2016-06-07 2 views
0

hdfs에서 마루 파일을 만든 다음 하이브를 통해 외부 테이블로 읽어 들이고 싶습니다. 저는 마루 파일을 쓰는 동안 스파크 셸의 무대 실패에 휩싸였습니다.스파크 셸을 사용하여 hdfs에서 마루 파일을 만들 수 없습니다.

스파크 버전 : 1.5.2 스칼라 버전 : 2.10.4 자바 : 1.7

입력 파일 : (employee.txt)

1201

, 사티,
(1202) 25, 크리쉬나,
1,203 28 amith 39
1204 자 베드,
1,205 23 prudvi 23

스파크 쉘에서

:

ERROR

SPARK APPLICATION MANAGER

내가 집행 인조차 메모리를 증가하려고

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
val employee = sc.textFile("employee.txt") 
employee.first() 
val schemaString = "id name age" 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.types.{StructType, StructField, StringType}; 
val schema = StructType(schemaString.split(" ").map(fieldName ⇒ StructField(fieldName, StringType, true))) 
val rowRDD = employee.map(_.split(",")).map(e ⇒ Row(e(0).trim.toInt, e(1), e(2).trim.toInt)) 
val employeeDF = sqlContext.createDataFrame(rowRDD, schema) 
val finalDF = employeeDF.toDF(); 
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy") 
var WriteParquet= finalDF.write.parquet("/user/myname/schemaParquet") 

내가 수있는 마지막 명령을 입력, 그 여전히 실패. 또한 finalDF.show()에서 동일한 오류가 발생합니다. 그래서 논리적 오류가 발생했다고 생각합니다.

답변

1

여기에 문제를 지원하기위한

덕분에 모든 필드/열 유형의 스키마를 만드는된다 StringType에 기본값. 그러나 스키마에서 값을 전달하는 동안 Id나이의 값은 코드에 따라 Integer로 변환됩니다. 실행 중에 Matcherror가 던져집니다.

스키마의 열의 데이터 형식은 전달되는 값의 데이터 형식과 일치해야합니다. 아래 코드를 사용해보십시오.

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
val employee = sc.textFile("employee.txt") 
employee.first() 
//val schemaString = "id name age" 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.types._; 
val schema = StructType(StructField("id", IntegerType, true) :: StructField("name", StringType, true) :: StructField("age", IntegerType, true) :: Nil) 
val rowRDD = employee.map(_.split(" ")).map(e ⇒ Row(e(0).trim.toInt, e(1), e(2).trim.toInt)) 
val employeeDF = sqlContext.createDataFrame(rowRDD, schema) 
val finalDF = employeeDF.toDF(); 
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy") 
var WriteParquet= finalDF.write.parquet("/user/myname/schemaParquet") 

이 코드는 정상적으로 실행되어야합니다.

+0

답장을 보내 주셔서 감사합니다 Nitin, – Mahadevan

+0

완벽하게 작동합니다. 여기에 작은 실수가 있습니다. // val rowRDD = employee.map (_. split (",")). map (e ⇒ Row (e (0) .trim.toInt, e (1), e (2) .trim.toInt)) // 쉼표가 없지만 도움을 주셔서 대단히 감사합니다. – Mahadevan

+0

당신은 환영합니다. 사실, 필자는 테스트하는 동안 공백으로 구분 된 텍스트 파일을 작업하고있었습니다. 따라서 쉼표를 놓쳤습니다. 그것을 지적 해 주셔서 감사합니다 :) –

관련 문제