2017-12-19 2 views

HBase를 백엔드 영구 저장소로 사용하여 Ignite 캐시 저장소를 구현했습니다. 캐시 저장소의 코드는 다음과 같습니다.Ignite Cache Store - 리소스를 해제하는 방법

public class BitDataCachePersistentStore implements CacheStore<Long, byte[]> { 

Ignite gridReference; 

private String cacheName; 

public byte[] load(Long key) { 

    String hbaseKey; 

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) { 

     try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) { 

      hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString()); 

      Get rowToBeFetched = new Get(Bytes.toBytes(hbaseKey)); 

      Result rowFetched = bitDataPersistentTable.get(rowToBeFetched); 

      if (rowFetched == null || rowFetched.isEmpty()) { 
       return null; // Can't return an empty array as Ignite will 
           // load the entry 

      return rowFetched.getValue(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES, 


    } catch (IOException e) { 
     throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_READ_ERROR, e, 
       "Error while performing read operation for the key [ " + key + " ] of the cache [ " + cacheName 
         + " ] "); 


public Map<Long, byte[]> loadAll(Iterable<? extends Long> keys) { 

    String hbaseKey; 

    long startTime = System.currentTimeMillis(); 

    long numberOfKeysLoaded = 0l; 

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) { 

     try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) { 

      IgniteCache<Long, byte[]> cacheToBeLoaded = gridReference.cache(cacheName); 

      Get rowToBeFetched; 

      Result rowFetched; 

      for (Long key : keys) { 

       hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString()); 

       rowToBeFetched = new Get(Bytes.toBytes(hbaseKey)); 

       rowFetched = bitDataPersistentTable.get(rowToBeFetched); 




      System.out.println("LoadAll for [ " + numberOfKeysLoaded + " ] keys of the cache [ " + cacheName 
        + " ] took [ " + ((System.currentTimeMillis() - startTime)/1000.0) + " seconds ] "); 

      return null; 


    } catch (IOException e) { 
     throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_READ_ERROR, e, 
       "Error while reading multiple keys for the cache [ " + cacheName + " ] "); 


public void write(Entry<? extends Long, ? extends byte[]> entry) { 

    String hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, entry.getKey().toString()); 

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) { 

     try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) { 

      Put rowToBeWritten = new Put(Bytes.toBytes(hbaseKey)); 

        TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES, entry.getValue()); 



    } catch (IOException e) { 
     throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_WRITE_ERROR, e, 
       "Error while writing the entry for the key [ " + entry.getKey() + " ] for the cache [ " + cacheName 
         + " ] "); 


public void writeAll(Collection<Entry<? extends Long, ? extends byte[]>> entries) { 

    long startTime = System.currentTimeMillis(); 

    String hbaseKey; 

    List<Put> rowsToBeWritten = new ArrayList<>(); 

    Put currentRowToBeWritten; 

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) { 

     try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) { 

      for (Entry<? extends Long, ? extends byte[]> entryToBeInserted : entries) { 

       hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, 

       currentRowToBeWritten = new Put(hbaseKey.getBytes()); 






     System.out.println("Time taken to load [ " + entries.size() + " entries ] for the cache [ " + cacheName 
       + " ] is " + ((System.currentTimeMillis() - startTime)/1000.0) + " seconds"); 

    } catch (IOException e) { 
     throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_WRITE_ERROR, e, 
       "Error while writing multiple keys for the cache [ " + cacheName + " ] "); 


public void delete(Object key) { 

    String hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString()); 

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) { 

     try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) { 

      Delete rowToBeDeleted = new Delete(Bytes.toBytes(hbaseKey)); 



    } catch (IOException e) { 
     throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_REMOVAL_ERROR, e, 
       "Error while deleting the entry for the key [ " + hbaseKey + " ] for the cache [ " + cacheName 
         + " ] "); 


public void deleteAll(Collection<?> keys) { 

    String hbaseKey; 

    List<Delete> rowsToBeDeleted = new ArrayList<>(); 

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) { 

     try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) { 

      for (Object keyToBeDeleted : keys) { 

       hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, 

       rowsToBeDeleted.add(new Delete(hbaseKey.getBytes())); 




    } catch (IOException e) { 
     throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_REMOVAL_ERROR, e, 
       "Error while deleting entries for the cache [ " + cacheName + " ] "); 


public void loadCache(IgniteBiInClosure<Long, byte[]> clo, Object... args) { 
    // No implementation provided 

public void sessionEnd(boolean commit) { 
    // No implementation provided 


캐시 모드는 PARTITIONED입니다.

캐시 원 자성 모드는 ATOMIC입니다.

구현 된 각각의 방법에서 HBase에 대한 새로운 연결을 생성한다는 것은 저장소 구현에서 분명합니다.

데이터 소스 특정 리소스 (이 경우, HBase 연결)를 모든 메소드 호출에서 수행하는 대신 더 많은 매크로 레벨로 열고 닫는 것에 대해 더 많은 제어를 할 수있는 방법이나 방법이 있는지 알고 싶었습니다.


아마도 연결 풀링을 살펴보십시오. – GurV



상점에서 연결 풀을 사용해야합니다. c3p0을 확인하십시오.

관련 문제