2016-08-09 3 views
1

나는 spark 스트리밍 작업을하고 있으며 일부는 일부 집계 작업을 수행 중이다. 이제는 HBase에 레코드를 삽입하고 싶다. 전형적인 삽입이 아니다. rowkey에 대한 컬럼 값이 합계보다 많으면 UPSERT를하고 싶다. (newvalue + oldvalue)가 발생해야합니다. 아무도 자바에서 의사 코드를 공유합니까? 어떻게해야합니까?Hbase Upsert with Spark

답변

1

이런 식으로 뭔가 ...

byte[] rowKey = null; // Provided 
Table table = null; // Provided 
long newValue = 1000; // Provided 
byte[] FAMILY = new byte[]{0}; // Defined 
byte[] QUALIFIER = new byte[]{1}; // Defined 

try { 
    Get get = new Get(rowKey); 
    Result result = table.get(get); 
    if (!result.isEmpty()) { 
     Cell cell = result.getColumnLatestCell(FAMILY, QUALIFIER); 
     newValue += Bytes.bytesToLong(cell.getValueArray(),cell.getValueOffset()); 
    } 
    Put put = new Put(rowKey); 
    put.addColumn(FAMILY,QUALIFIER,Bytes.toBytes(newValue)); 
    table.put(put); 
} catch (Exception e) { 
    // Handle Exceptions... 
} 

우리는 (스플 라이스 기계 [오픈 소스]) HBase를에 데이터를 저장하는 스파크 스트리밍을 사용하여 꽤 멋진 튜토리얼이있다.

체크 아웃 it. 흥미로울 수 있습니다.

+0

, 당신은 HBase를 증가하면 추가 유형 ... –

+0

안녕 존에 따라 고려할 수 있습니다, 귀하의 회신 그 작업 만 500 시간을내어 주셔서 감사합니다 그것의 약 1hr 스파크 복용 mb 당신이 일괄 업데이트 종류에 대해 아는 어떤 방법이 있습니까? – ankitbeohar90

+0

Splice Machine을 탐험 중이고 "Lambda-in-a-box"웹 세미나에 참석하여 "Thomas Ryan"에게 이메일을 보내고 그의 답변을 기다리고 있습니다. Splice Machine의 작품에 감사드립니다. 내가 많은 것을 체크했기 때문에 체크하지만 특정 하나를 찾을 수 없었다. 다시 한번 감사드립니다. – ankitbeohar90

0

나는 아래의 방법은 의사 코드입니다 발견 : - UPSERT를 들어

=========== (업데이트 및 삽입) ===========

공공 무효 HbaseUpsert (JavaRDD < 행> javaRDD는) ========== IOException이, ServiceException {구성

 JavaPairRDD < ImmutableBytesWritable, Put > hbasePuts1 = javaRDD.mapToPair(

     new PairFunction < Row, ImmutableBytesWritable, Put >() { 

     private static final long serialVersionUID = 1L; 
    public Tuple2 < ImmutableBytesWritable, Put > call(Row row) throws Exception { 

      if(HbaseConfigurationReader.getInstance()!=null) 
      { 
      HTable table = new HTable(HbaseConfigurationReader.getInstance().initializeHbaseConfiguration(), "TEST"); 

     try { 

      String Column1 = row.getString(1); 
      long Column2 = row.getLong(2); 
      Get get = new Get(Bytes.toBytes(row.getString(0))); 
       Result result = table.get(get); 
       if (!result.isEmpty()) { 
        Cell cell = result.getColumnLatestCell(Bytes.toBytes("cf1"), Bytes.toBytes("Column2")); 
        Column2 += Bytes.toLong(cell.getValueArray(),cell.getValueOffset()); 
       } 
      Put put = new Put(Bytes.toBytes(row.getString(0))); 
      put.add(Bytes.toBytes("cf1"), Bytes.toBytes("Column1"), Bytes.toBytes(Column1)); 
      put.add(Bytes.toBytes("cf1"), Bytes.toBytes("Column2"), Bytes.toBytes(Column2)); 
      return new Tuple2 < ImmutableBytesWritable, Put > (new ImmutableBytesWritable(), put); 

     } catch (Exception e) { 

      e.printStackTrace(); 
     } 
     finally { 
      table.close(); 
     } 
      } 
     return null; 
     } 
     }); 

    hbasePuts1.saveAsNewAPIHadoopDataset(HbaseConfigurationReader.initializeHbaseConfiguration()); 

    } 

==============를 던졌습니다 ===== 공용 클래스 HbaseConfigurationReader 구현 가능 Serializable {

static Job newAPIJobConfiguration1 =null; 
private static Configuration conf =null; 
private static HTable table= null; 
private static HbaseConfigurationReader instance= null; 

private static Log logger= LogFactory.getLog(HbaseConfigurationReader.class); 

HbaseConfigurationReader()는 MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException { initializeHbaseConfiguration()을 발생시킵니다. }

공용 static HbaseConfigurationReader의 getInstance()가 MasterNotRunningException, ZooKeeperConnectionException, ServiceException IOException을 {

if (instance == null) { 
    instance = new HbaseConfigurationReader(); 
} 

return instance; 

} 공개 정적 구성 initializeHbaseConfiguration() MasterNotRunningException, ZooKeeperConnectionException, ServiceException을 발생 IOException을 { 을 던지면 (conf의 = = null) { conf = HBaseConfiguration.create(); conf.set ("hbase.zookeeper.quorum", "localhost"); conf.set ("hbase.zookeeper.property.clientPort", "2181"); HBaseAdmin.checkHBaseAvailable (conf); table = new HTable (conf, "TEST"); conf.set (org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, "TEST"); 시도 { newAPIJobConfiguration1 = Job.getInstance (conf); newAPIJobConfiguration1.getConfiguration(). set (TableOutputFormat.OUTPUT_TABLE, "TEST"); newAPIJobConfiguration1.setOutputFormatClass (org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class); } catch (IOException e) { e.printStackTrace(); }

} 

else 
    logger.info("Configuration comes null"); 

return newAPIJobConfiguration1.getConfiguration(); 

} } 또한