2017-01-11 2 views
4

나는 hbase에서 데이터를 얻으려고합니다. 나는 Hbase의 데이터를 가지고 있기 때문에 카프카를 거쳐야한다는 것을 알았습니다. spark streaming과 hbase를 직접 통합하지 않고도 가능합니다. 카프카 체인에 감사합니다.Hbase로 스파크 스트리밍

+0

예는 수를 참조하십시오. 아래 예제를 참조하십시오 –

답변

3

직접

예 .. 그것의 가능한 우리는 카프카 사용하지 않고 동일했던 것처럼 카프카

을 포함하지 않고 스파크 스트리밍 및 HBase를 사이가 가능 통합이다. 우리는 카프카 사용하지 않고 동일한 작업을했던 것처럼 아래의 예를 JavaHBaseStreamingBulkPutExample

package org.apache.hadoop.hbase.spark.example.hbasecontext; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.TableName; 
import org.apache.hadoop.hbase.client.Put; 
import org.apache.hadoop.hbase.spark.JavaHBaseContext; 
import org.apache.hadoop.hbase.util.Bytes; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 

/** 
* This is a simple example of BulkPut with Spark Streaming 
*/ 
final public class JavaHBaseStreamingBulkPutExample { 

    private JavaHBaseStreamingBulkPutExample() {} 

    public static void main(String[] args) { 
    if (args.length < 4) { 
     System.out.println("JavaHBaseBulkPutExample " + 
       "{host} {port} {tableName}"); 
     return; 
    } 

    String host = args[0]; 
    String port = args[1]; 
    String tableName = args[2]; 

    SparkConf sparkConf = 
      new SparkConf().setAppName("JavaHBaseStreamingBulkPutExample " + 
        tableName + ":" + port + ":" + tableName); 

    JavaSparkContext jsc = new JavaSparkContext(sparkConf); 

    try { 
     JavaStreamingContext jssc = 
       new JavaStreamingContext(jsc, new Duration(1000)); 

     JavaReceiverInputDStream<String> javaDstream = 
       jssc.socketTextStream(host, Integer.parseInt(port)); 

     Configuration conf = HBaseConfiguration.create(); 

     JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); 

     hbaseContext.streamBulkPut(javaDstream, 
       TableName.valueOf(tableName), 
       new PutFunction()); 
    } finally { 
     jsc.stop(); 
    } 
    } 

    public static class PutFunction implements Function<String, Put> { 

    private static final long serialVersionUID = 1L; 

    public Put call(String v) throws Exception { 
     String[] part = v.split(","); 
     Put put = new Put(Bytes.toBytes(part[0])); 

     put.addColumn(Bytes.toBytes(part[1]), 
       Bytes.toBytes(part[2]), 
       Bytes.toBytes(part[3])); 
     return put; 
    } 

    } 
} 
+0

감사합니다 램 그것은 나를 위해 모두 그게 내가 필요 :) :) :) –