2015-01-20 4 views
0

는 여기 "fN" & 일반적인 열 "c1""c2"이 c는 파일을하지만, 그들 중 일부는 다른 열이있을 수도스파크에서 쪽매 파일을 어떻게 투영합니까?

val sqc = new org.apache.spark.sql.SQLContext(sc) 
val data = sqc.parquetFile("f1,f2,f3,f4,f5") 

Parquet files에서 데이터 세트를로드합니다.

따라서, 나는

data.registerAsTable("MyTable") 

을 할 때 나는 오류를 얻을 :

java.lang.RuntimeException: could not merge metadata: key pig.schema has conflicting values 

질문은 : 는 어떻게 두 열이있는 단일 테이블 에 그 마루 파일을받을 수 있나요?

즉, 방법 그 I 프로젝트를합니까?

"fN"을 하나씩로드하고 프로젝트 한 다음 을 병합하여 unionAll을 병합하는 것이 합리적 인 것처럼 보입니다.

답변

3

SchemaRDD에서 프로젝트의 거친 상당 ALL 기타 사항 서보 -OFF이다() 필터링 된 필드. 선택을 한 후에는 제안 된대로 unionAll을 사용할 수 있습니다. 예 :

val sqc = new org.apache.spark.sql.SQLContext(sc) 
import sqc._ 
val file1 = sqc.parquetFile("file1").select('field1, 'field2) 
val file2 = sqc.parquetFile("file2").select('field1, 'field2) 
val all_files = file1.unionAll(file2) 

기호에서 Expression 인스턴스를 작성하는 데 암시 적 함수를로드하려면 import sqc._가 필요합니다.

1

이러한 파일이 어떻게 생성되는지 알고 계십니까?

알고 계시다면 이미 스키마와 카테고리를 알고 계셔야합니다.

그렇지 않으면 나는 다른 방법이 있다고 생각하지 않습니다. 하나씩로드해야합니다. schemaRDD에서 데이터를 추출한 후에도 동일한 스키마에 속하면 unionAll을 caltl 할 수 있습니다.

쪽매 파일이 처리되는 곳의github 프로젝트의 샘플 코드를 확인하십시오.

var path ="/home/infoshore/java/Trends/urls" 
var files =new java.io.File(path).listFiles() 
var parquetFiles =   files.filter(file=>file.isDirectory).map(file=>file.getName) 
var tweetsRDD= parquetFiles.map(pfile=>sqlContext.parquetFile(path+"/"+pfile)) 
var allTweets =tweetsRDD.reduce((s1,s2)=>s1.unionAll(s2)) 
allTweets.registerAsTable("tweets") 
sqlContext.cacheTable("tweets") 
import sqlContext._ 
val popularHashTags = sqlContext.sql("SELECT hashtags,usersMentioned,Url FROMtweets") 

내가 UnionAll을 어떻게 호출했는지 확인하십시오. 다른 스키마를 나타내는 schemaRDD에서 unionAll을 호출 할 수 없습니다.

특정 도움이 필요하면 알려주세요

안부와 함께 새로운 SchemaRDD을 표현식 객체 인스턴스를 받아 반환 판 카즈

+0

필자는'tweetsRDD.reduce + unionAll' 전에 한 걸음 더 나아가'parquetFiles'에서 불필요한 열을 모두 삭제할 것입니다. 어떻게해야합니까? – sds

+0

나는 havent 같은 시도했지만 예외를 얻을 것이라고 생각합니다. 나는 여전히 대체 방법을 찾고 너에게 돌아 가려고 노력할 것이다. –

관련 문제