2017-11-27 4 views
0

pyspark에서 병합 쿼리를 실행 중이지만 "merge"키워드가 spark에서 인식되지 않습니다.PySpark의 병합 쿼리가 실패했습니다.

17/11/27 14:39:34 ERROR JobScheduler: Error running job streaming job 1511793570000 ms.1 
org.apache.spark.SparkException: An exception was raised by Python: 
Traceback (most recent call last): 
    File "/usr/hdp/2.6.1.0- 
129/spark2/python/lib/pyspark.zip/pyspark/streaming/util.py", line 65, in call 
r = self.func(t, *rdds) 
    File "/usr/hdp/2.6.1.0-129/spark2/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 159, in <lambda> 
func = lambda t, rdd: old_func(rdd) 
    File "/usr/repos/dataconnect/connect/spark/stream_kafka_consumer.py", line 66, in sendRecord 
COLUMNS='sub.id, sub.name, sub.age')) 
    File "/usr/hdp/2.6.1.0-129/spark2/python/lib/pyspark.zip/pyspark/sql/context.py", line 384, in sql 
return self.sparkSession.sql(sqlQuery) 
    File "/usr/hdp/2.6.1.0-129/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 545, in sql 
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) 
    File "/usr/hdp/2.6.1.0-129/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ 
answer, self.gateway_client, self.target_id, self.name) 
    File "/usr/hdp/2.6.1.0-129/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 73, in deco 
raise ParseException(s.split(': ', 1)[1], stackTrace) 

ParseException: u"\nmismatched input 'merge' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 1, pos 0)\n\n== SQL ==\nmerge into customer_partitioned using (select case when all_updates.age <> customer_partitioned.age then 1  else 0  end as delete_flag,  all_updates.id as match_key,  all_updates.* from all_updates left join customer_partitioned on all_updates.id = customer_partitioned.id  union all  select 0, null, all_updates.*  from all_updates, customer_partitioned where  all_updates.id = customer_partitioned.id) sub on customer_partitioned.id = sub.match_key when matched and delete_flag=1 then delete when matched and delete_flag=0 then update set name=sub.name when not matched then insert values(sub.id, sub.name, sub.age);\n^^^\n" 

해당 쿼리를 HIVE보기로 바로 복사 할 수 있으며 아무런 문제가 없습니다.

merge into customer_partitioned using (select case when all_updates.age <> customer_partitioned.age then 1  else 0  end as delete_flag,  all_updates.id as match_key,  all_updates.* from all_updates left join customer_partitioned on all_updates.id = customer_partitioned.id  union all  select 0, null, all_updates.*  from all_updates, customer_partitioned where  all_updates.id = customer_partitioned.id) sub on customer_partitioned.id = sub.match_key when matched and delete_flag=1 then delete when matched and delete_flag=0 then update set name=sub.name when not matched then insert values(sub.id, sub.name, sub.age); 

내 코드는 다음과 같다 :

from pyspark.sql import HiveContext 
sqlcontext = HiveContext(sc) 
sql = 'merge into customer_partitioned using (select case when all_updates.age <> customer_partitioned.age then 1  else 0  end as delete_flag,  all_updates.id as match_key,  all_updates.* from all_updates left join customer_partitioned on all_updates.id = customer_partitioned.id  union all  select 0, null, all_updates.*  from all_updates, customer_partitioned where  all_updates.id = customer_partitioned.id) sub on customer_partitioned.id = sub.match_key when matched and delete_flag=1 then delete when matched and delete_flag=0 then update set name=sub.name when not matched then insert values(sub.id, sub.name, sub.age);' 
sqlcontext.sql(sql) 

답변

1

내가 바로 HIVE보기에 해당 쿼리를 복사 할 수 있으며, 그것은 아무 문제가 실행되지 않습니다.

스파크는 하이브가 아니며 (하이브 지원을 사용하는 경우에도 마찬가지 임). 쿼리 언어는 SQL03 표준의 하위 집합을 구현하도록 설계되었으며 HQL과 부분적으로 만 호환됩니다.

결과적으로 MERGE 및 업데이트 또는 미세 입자 삽입을 비롯한 많은 하이브 기능이 지원되지 않습니다.

TL : DR 하이브에서 뭔가를 할 수 있기 때문에 Spark SQL에서도 동일한 작업을 수행 할 수있는 것은 아닙니다.

관련 문제