나는 spark 스트리밍 작업을하고 있으며 일부는 일부 집계 작업을 수행 중이다. 이제는 HBase에 레코드를 삽입하고 싶다. 전형적인 삽입이 아니다. rowkey에 대한 컬럼 값이 합계보다 많으면 UPSERT를하고 싶다. (newvalue + oldvalue)가 발생해야합니다. 아무도 자바에서 의사 코드를 공유합니까? 어떻게해야합니까?Hbase Upsert with Spark
답변
이런 식으로 뭔가 ...
byte[] rowKey = null; // Provided
Table table = null; // Provided
long newValue = 1000; // Provided
byte[] FAMILY = new byte[]{0}; // Defined
byte[] QUALIFIER = new byte[]{1}; // Defined
try {
Get get = new Get(rowKey);
Result result = table.get(get);
if (!result.isEmpty()) {
Cell cell = result.getColumnLatestCell(FAMILY, QUALIFIER);
newValue += Bytes.bytesToLong(cell.getValueArray(),cell.getValueOffset());
}
Put put = new Put(rowKey);
put.addColumn(FAMILY,QUALIFIER,Bytes.toBytes(newValue));
table.put(put);
} catch (Exception e) {
// Handle Exceptions...
}
우리는 (스플 라이스 기계 [오픈 소스]) HBase를에 데이터를 저장하는 스파크 스트리밍을 사용하여 꽤 멋진 튜토리얼이있다.
체크 아웃 it. 흥미로울 수 있습니다.
나는 아래의 방법은 의사 코드입니다 발견 : - UPSERT를 들어
=========== (업데이트 및 삽입) ===========
공공 무효 HbaseUpsert (JavaRDD < 행> javaRDD는) ========== IOException이, ServiceException {구성
JavaPairRDD < ImmutableBytesWritable, Put > hbasePuts1 = javaRDD.mapToPair(
new PairFunction < Row, ImmutableBytesWritable, Put >() {
private static final long serialVersionUID = 1L;
public Tuple2 < ImmutableBytesWritable, Put > call(Row row) throws Exception {
if(HbaseConfigurationReader.getInstance()!=null)
{
HTable table = new HTable(HbaseConfigurationReader.getInstance().initializeHbaseConfiguration(), "TEST");
try {
String Column1 = row.getString(1);
long Column2 = row.getLong(2);
Get get = new Get(Bytes.toBytes(row.getString(0)));
Result result = table.get(get);
if (!result.isEmpty()) {
Cell cell = result.getColumnLatestCell(Bytes.toBytes("cf1"), Bytes.toBytes("Column2"));
Column2 += Bytes.toLong(cell.getValueArray(),cell.getValueOffset());
}
Put put = new Put(Bytes.toBytes(row.getString(0)));
put.add(Bytes.toBytes("cf1"), Bytes.toBytes("Column1"), Bytes.toBytes(Column1));
put.add(Bytes.toBytes("cf1"), Bytes.toBytes("Column2"), Bytes.toBytes(Column2));
return new Tuple2 < ImmutableBytesWritable, Put > (new ImmutableBytesWritable(), put);
} catch (Exception e) {
e.printStackTrace();
}
finally {
table.close();
}
}
return null;
}
});
hbasePuts1.saveAsNewAPIHadoopDataset(HbaseConfigurationReader.initializeHbaseConfiguration());
}
==============를 던졌습니다 ===== 공용 클래스 HbaseConfigurationReader 구현 가능 Serializable {
static Job newAPIJobConfiguration1 =null;
private static Configuration conf =null;
private static HTable table= null;
private static HbaseConfigurationReader instance= null;
private static Log logger= LogFactory.getLog(HbaseConfigurationReader.class);
HbaseConfigurationReader()는 MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException { initializeHbaseConfiguration()을 발생시킵니다. }
공용 static HbaseConfigurationReader의 getInstance()가 MasterNotRunningException, ZooKeeperConnectionException, ServiceException IOException을 {
if (instance == null) {
instance = new HbaseConfigurationReader();
}
return instance;
} 공개 정적 구성 initializeHbaseConfiguration() MasterNotRunningException, ZooKeeperConnectionException, ServiceException을 발생 IOException을 { 을 던지면 (conf의 = = null) { conf = HBaseConfiguration.create(); conf.set ("hbase.zookeeper.quorum", "localhost"); conf.set ("hbase.zookeeper.property.clientPort", "2181"); HBaseAdmin.checkHBaseAvailable (conf); table = new HTable (conf, "TEST"); conf.set (org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, "TEST"); 시도 { newAPIJobConfiguration1 = Job.getInstance (conf); newAPIJobConfiguration1.getConfiguration(). set (TableOutputFormat.OUTPUT_TABLE, "TEST"); newAPIJobConfiguration1.setOutputFormatClass (org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class); } catch (IOException e) { e.printStackTrace(); }
}
else
logger.info("Configuration comes null");
return newAPIJobConfiguration1.getConfiguration();
} } 또한
- 1. Spark Streaming with HBase with filtering logic
- 2. 입력으로 Hbase 테이블이있는 Spark Graphx
- 3. Spark Streaming에서 Hbase 데이터 읽기
- 4. mongoimport - $ addToSet/$ push with upsert?
- 5. MongoDB C# Upsert with Guid
- 6. hbase with jvmti agent
- 7. MeteorJS (MongoDB) with Spark
- 8. Spark with Cython
- 9. KMeans with Spark 1.6.2 VS Spark 2.0.0
- 10. spark-submit 할 hbase-site.xml 지정
- 11. HBase Filter Language with Thrift
- 12. Kickstart @ Apache spark with java
- 13. Kafka Streaming with apache spark
- 14. Play with Spark 2.0! 2.5
- 15. Upsert
- 16. DataStax Enterprise with HDFS 및 Spark with Cassandra
- 17. 하이브 칼럼 레벨 encake with spark java
- 18. HBase 및 Spark - 테이블에 쓰려고하는 동안 NullPointerException이 발생했습니다.
- 19. Spark Hbase Rdd를 필터링하고 결과를 얻는 방법은 무엇입니까?
- 20. spark newAPIHadoopRDD에서 hbase 셀의 모든 버전을 얻는 방법?
- 21. SQLAlchemy upsert
- 22. Meteor upsert equivalent
- 23. 스칼라에서 SBT와의 HBase 종속성
- 24. 스칼라에서 HBase 읽기 - it.nerdammer
- 25. upsert 함수에 오류가 없습니다.
- 26. Hbase 테이블을 Spark에로드 -
- 27. Node.js MongoDB Upsert 업데이트
- 28. SQL 표준 UPSERT 호출
- 29. 관계에 대해 'UPSERT'
- 30. Upsert SQL Server 2005의
, 당신은 HBase를 증가하면 추가 유형 ... –
안녕 존에 따라 고려할 수 있습니다, 귀하의 회신 그 작업 만 500 시간을내어 주셔서 감사합니다 그것의 약 1hr 스파크 복용 mb 당신이 일괄 업데이트 종류에 대해 아는 어떤 방법이 있습니까? – ankitbeohar90
Splice Machine을 탐험 중이고 "Lambda-in-a-box"웹 세미나에 참석하여 "Thomas Ryan"에게 이메일을 보내고 그의 답변을 기다리고 있습니다. Splice Machine의 작품에 감사드립니다. 내가 많은 것을 체크했기 때문에 체크하지만 특정 하나를 찾을 수 없었다. 다시 한번 감사드립니다. – ankitbeohar90