나는 (py) Spak로 시작한 이래로 나는 너와 함께 느낄 수있다. - jars 옵션을 사용하여 pySpark 명령에 항아리를 추가해야합니다. 여기 는 (우리가 단지 당신이 도달 할 수있는 장소에 있는지 확인하십시오, 클라우 데라를 사용하는)는 예입니다
pyspark --jars /opt/cloudera/parcels/CDH/jars/spark-examples-1.6.0-cdh5.10.0-hadoop2.6.0-cdh5.10.0.jar,/opt/cloudera/parcels/CDH/jars/hbase-examples-1.2.0-cdh5.10.0.jar
귀하의 예외 그 항아리가 없다는 것을 의미 있도록 "의 ClassNotFoundException은"이었다 너의 classpath.
추가 병이 클래스 패스에 올바르게 추가되는지 확인하려면 pySpark가 시작될 때 표시되는 정보 메시지를 살펴보십시오.
정보 ui.SparkUI : 시작 SparkUI http://xx.xx.xx.xx:4040 에서 x는 클러스터의 IP입니다 같은 메시지가 있어야합니다.
브라우저를 열고이 주소로 이동하십시오. 클러스터 상태가 표시됩니다. 환경을 클릭하고 페이지의 맨 끝으로 이동하십시오. pyspark 가져 오기 SparkContext, HiveContext 에서 수입 StreamingContext에게
def main():
# The SparkContext might be initialized by the spark Shell
sc = SparkContext("local[2]", appName='SparkHBaseWriter')
# Config to write to a hBaseFile
conf = {"hbase.zookeeper.qourum": "quickstart.cloudera:2181",\
"zookeeper.znode.parent": "/hbase",\
"hbase.mapred.outputtable": "test",\
"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",\
"mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
rdd = sc.parallelize((("row1", ["row1", "cf1", "cel1", "value from PySpark"]),))
rdd.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
# read data from hBase
conf = {"hbase.zookeeper.qourum": "sbglboclomd0002.santanderde.dev.corp:2181",\
"zookeeper.znode.parent": "/hbase",\
"hbase.mapred.outputtable": "test",\
"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",\
"mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
host = 'localhost'
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
testdata_conf = {
"hbase.zookeeper.quorum": "quickstart.cloudera:2181",
"hbase.mapreduce.inputtable": "test",
"hbase.mapreduce.scan.columns": "cf1:cel1"
}
testdata_rdd = sc.newAPIHadoopRDD(
"org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
keyConverter=keyConv,
valueConverter=valueConv,
conf=testdata_conf)
output = testdata_rdd.collect()
print(output[0])
# Output in console:
# >>> testdata_rdd.collect()[0]
# (u'row1', u'{"qualifier" : "cel1", "timestamp" : "1499151230221", "columnFamily" : "cf1", "row" : "row1", "type" : "Put", "value" : "value from PySpark"}')
if __name__ == '__main__':
main()
I를 pyspark.streaming에서 : 당신은 당신의 항아리
이 아무 문제없이 나를 위해 일한 내 스크립트입니다 "사용자가 추가 한"다음에 볼 수 있습니다 귀하의 질문이 매우 오래된 것을 알고 있지만, 나는 이것이 다른 사람들을 도울 수 있기를 바랍니다.
spark-submit을 호출 할 때 병을 추가 했습니까? –
나는 pyspark를 사용하고, pyspark는 쉘이다. 따라서 spark-submit 명령을 사용하지 마십시오. – penlight
자, pySpark를 호출 할 때 항아리를 추가 했습니까? –