2014-11-06 3 views
4

하이브/임팔라에 두 개의 테이블이 있습니다. 테이블에서 rdd로 스파크로 데이터를 가져오고 조인 작업을 수행하려고합니다.하이브 테이블의 데이터를 스파크로 가져 와서 RDD에서 조인을 수행합니다.

하이브 컨텍스트에서 조인 쿼리를 직접 전달하고 싶지 않습니다. 이것은 단지 예일뿐입니다. 표준 HiveQL로는 사용할 수없는 사례가 더 많습니다. 모든 행을 가져와 열에 액세스하고 변환을 수행하는 방법은 무엇입니까? 나는이 "ACCOUNT_ID"

가 이상적으로 나는 불꽃을 사용하여 rdds를 사용하여이 같은 것을하고 싶지라는 컬럼에 rdds에 가입 수행 할

val table1 = hiveContext.hql("select * from tem1") 

val table2 = hiveContext.hql("select * from tem2") 

:

은 가정하자 나는 두 rdds이 껍질.

select * from tem1 join tem2 on tem1.account_id=tem2.account_id; 

답변

1

따라서 임시 테이블로 table1과 table2를 등록한 다음이 임시 테이블에서 조인을 수행 할 수 있습니다.

table1.registerTempTable("t1") 
table2.registerTempTable("t2") 
table3 = hiveContext.hql("select * from t1 join t2 on t1.account_id=t2.account_id") 
+0

안녕하세요, 고마워요.하지만 이미 이런 식으로하고 싶지 않다고 말씀 드렸습니다. 이것은 단지 간단한 예일뿐입니다. 더 복잡한 쿼리가있는 유스 케이스가 있습니다. 결과 세트에서 rdd를 생성하고 조인 및 기타 작업을 수행 할 수 있기를 원합니다. – user1189851

+1

아, 미안합니다. user1189851, 원래 하이브 테이블에 대한 조인을 피하고 싶다고 생각했습니다. 위의 코드에서 내가 table1을 게시하고 table2는 SchemaRDD가 될 수 있습니다 (그리고 우리가 작성한 쿼리 중 어떤 것도 SchemaRDD를 돌려줍니다). table1과 table2는 무엇을 원했습니까? 스파크가 아닌 SQL 소스를 원하십니까? – Holden

+0

그래서 첫 번째 rdd가 쿼리의 결과 집합 인 경우가 있습니다. 두 번째 쿼리는 다른 쿼리의 결과 집합입니다. val rdd1 = hiveContext.hql ("select * from table1") 및 val rdd2. = hiveContext.hql ("select * from table2"). account_id라는 공통 속성에서이 두 가지 rdd에 대해 조인을 수행하려고합니다. 아이디어는 하이브 컨텍스트 내부에서 조인을 원하지 않는다. 변형을 사용하여이를 수행 할 수 있어야한다. – user1189851

0

table1과 table2는 DataFrame 유형입니다. 다음을 사용하여이를 rdd로 변환 할 수 있습니다.

lazy val table1_rdd = table1.rdd 
lazy val table2_rdd = table2.rdd 

트릭을 사용해야합니다. 이러한 rdd에 당신은 어떤 rdd 작업을 사용할 수 있습니다.

은 참조 : https://issues.apache.org/jira/browse/SPARK-6608

1

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame 내가 질문을 이해하지만, 대신 당신이 DataFrames에 가입하기 위해 API를 사용할 수 있습니다, 그래서 당신은 (여러 가지 프로그래밍 방식 결정 예를 들어 join 기능을 할 수 있습니다 모르겠어요 사용자 정의 변환을 적용하는 메소드에 매개 변수로 전달 될 수 있음). 귀하의 예를 들어

,이 같은 것 :

val table1 = hiveContext.sql("select * from tem1") 
val table2 = hiveContext.sql("select * from tem2") 
val common_attributes = Seq("account_id") 
val joined = table1.join(table2, common_attributes) 

많은 공통 DataFrame의 API에서 사용할 수있는 변환이 있습니다 당신은 직접 해당 열을 선택할 수 있습니다

관련 문제