2016-07-27 3 views
1

나는 PySpark에서 HBase 테이블을 읽으려고합니다.HBase · PySpark 테이블로드 오류

이것은 내 코드입니다.

from pyspark.sql.types import *  
host = 'localhost' 

keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" 

valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" 

testdata_conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": "test", "hbase.mapreduce.scan.columns": "cf:a"} 

testdata_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=testdata_conf) 

output = cmdata_rdd.collect() 
output 

그러나 오류가 발생합니다.

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. 
: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.io.ImmutableBytesWritable 

테이블로드에 대해서는이 링크 RDD is having only first column value : Hbase, PySpark을 참조하십시오.
자바와 스칼라를 사용하지 않습니다. 따라서 오류가 발생한 이유를 이해할 수 없습니다.

누구든지 조언을 해주면 알려주세요.
감사합니다.

+0

spark-submit을 호출 할 때 병을 추가 했습니까? –

+0

나는 pyspark를 사용하고, pyspark는 쉘이다. 따라서 spark-submit 명령을 사용하지 마십시오. – penlight

+0

자, pySpark를 호출 할 때 항아리를 추가 했습니까? –

답변

0

나는 (py) Spak로 시작한 이래로 나는 너와 함께 느낄 수있다. - jars 옵션을 사용하여 pySpark 명령에 항아리를 추가해야합니다. 여기 는 (우리가 단지 당신이 도달 할 수있는 장소에 있는지 확인하십시오, 클라우 데라를 사용하는)는 예입니다

pyspark --jars /opt/cloudera/parcels/CDH/jars/spark-examples-1.6.0-cdh5.10.0-hadoop2.6.0-cdh5.10.0.jar,/opt/cloudera/parcels/CDH/jars/hbase-examples-1.2.0-cdh5.10.0.jar 

귀하의 예외 그 항아리가 없다는 것을 의미 있도록 "의 ClassNotFoundException은"이었다 너의 classpath.

추가 병이 클래스 패스에 올바르게 추가되는지 확인하려면 pySpark가 시작될 때 표시되는 정보 메시지를 살펴보십시오.

정보 ui.SparkUI : 시작 SparkUI http://xx.xx.xx.xx:4040 에서 x는 클러스터의 IP입니다 같은 메시지가 있어야합니다.

브라우저를 열고이 주소로 이동하십시오. 클러스터 상태가 표시됩니다. 환경을 클릭하고 페이지의 맨 끝으로 이동하십시오. pyspark 가져 오기 SparkContext, HiveContext 에서 수입 StreamingContext에게

def main(): 
    # The SparkContext might be initialized by the spark Shell 
    sc = SparkContext("local[2]", appName='SparkHBaseWriter') 
    # Config to write to a hBaseFile 
    conf = {"hbase.zookeeper.qourum": "quickstart.cloudera:2181",\ 
       "zookeeper.znode.parent": "/hbase",\ 
       "hbase.mapred.outputtable": "test",\ 
       "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",\ 
       "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\ 
       "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"} 
    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" 
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" 
    rdd = sc.parallelize((("row1", ["row1", "cf1", "cel1", "value from PySpark"]),)) 
    rdd.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv) 

    # read data from hBase 
    conf = {"hbase.zookeeper.qourum": "sbglboclomd0002.santanderde.dev.corp:2181",\ 
       "zookeeper.znode.parent": "/hbase",\ 
       "hbase.mapred.outputtable": "test",\ 
       "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",\ 
       "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\ 
       "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"} 

    host = 'localhost' 

    keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" 

    valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" 

    testdata_conf = { 
     "hbase.zookeeper.quorum": "quickstart.cloudera:2181", 
     "hbase.mapreduce.inputtable": "test", 
     "hbase.mapreduce.scan.columns": "cf1:cel1" 
     } 

    testdata_rdd = sc.newAPIHadoopRDD(
     "org.apache.hadoop.hbase.mapreduce.TableInputFormat", 
     "org.apache.hadoop.hbase.io.ImmutableBytesWritable", 
     "org.apache.hadoop.hbase.client.Result", 
     keyConverter=keyConv, 
     valueConverter=valueConv, 
     conf=testdata_conf) 

    output = testdata_rdd.collect() 
    print(output[0]) 
    # Output in console: 
    # >>> testdata_rdd.collect()[0] 
    # (u'row1', u'{"qualifier" : "cel1", "timestamp" : "1499151230221", "columnFamily" : "cf1", "row" : "row1", "type" : "Put", "value" : "value from PySpark"}') 


if __name__ == '__main__': 
    main() 

I를 pyspark.streaming에서 : 당신은 당신의 항아리

이 아무 문제없이 나를 위해 일한 내 스크립트입니다 "사용자가 추가 한"다음에 볼 수 있습니다 귀하의 질문이 매우 오래된 것을 알고 있지만, 나는 이것이 다른 사람들을 도울 수 있기를 바랍니다.