2017-03-01 1 views
0

bulkprocessor를 사용하여 Bulk를 ElasticSearch에 삽입/업데이트합니다. 나는bulkprocessor를 사용할 때 탄성 검색 벌크 오류가 발생했습니다.

  • EsRejectedExecutionException
  • VersionConflictEngineException
  • DocumentAlreadyExistsException

하지만 포기하지 않는 것을 잡으려고 싶습니다. 응답 항목에만 메시지를 설정합니다. 어떻게 제대로 처리 할 수 ​​있습니까? 예 : 성공 핸들러를 얻을 것이다, 네트워크 오류가 발생하는 경우에만

public BulkResponse response bulkUpdate(.....) { 
    BulkResponse bulkWriteResult = null; 
    long startTime = System.currentTimeMillis(); 
    AtomicInteger amountOfRequests = new AtomicInteger(); 
    long esTime; 


    ElasticBulkProcessorListener listener = new ElasticBulkProcessorListener(updateOperations); 
    BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener) 
     .setBulkActions(MAX_BULK_ACTIONS) 
     .setBulkSize(new ByteSizeValue(maxBulkSize, ByteSizeUnit.MB)) 
     .setConcurrentRequests(5) 
     .build(); 


    updateOperations.forEach(updateRequest -> { 
     bulkProcessor.add(updateRequest); 
     amountOfRequests.getAndIncrement(); 
    }); 

try { 
    boolean isFinished = bulkProcessor.awaitClose(bulkTimeout, TimeUnit.SECONDS); 
    if (isFinished) { 
     if (listener.getBulkWriteResult() != null) { 
      bulkWriteResult = listener.getBulkWriteResult(); 
     } else { 
      throw new Exception("Bulk updating failed, results are empty"); 
     } 
    } else { 
     throw new Exception("Bulk updating failed, received timeout"); 
    } 
} catch (InterruptedException e) { 
    e.printStackTrace(); 
} 

return bulkWriteResult; 
} 


public class ElasticBulkProcessorListener implements BulkProcessor.Listener { 
private long esTime = 0; 
private List<Throwable> errors; 
private BulkResponse response; 

public long getEsTime() { 
    return esTime; 
} 

@Override 
public void beforeBulk(long executionId, BulkRequest request) { 
    String description = ""; 
    if (!request.requests().isEmpty()) { 
     ActionRequest request1 = request.requests().get(0); 
     description = ((UpdateRequest) request1).type(); 
    } 

    log.info("Bulk executionID: {}, estimated size is: {}MB, number of actions: {}, request type: {}", 
      executionId, (request.estimatedSizeInBytes()/1000000), request.numberOfActions(), description); 
} 

@Override 
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 
    log.info("Bulk executionID: {}, took : {} Millis, bulk size: {}", executionId, response.getTookInMillis(), response.getItems().length); 
    esTime = response.getTookInMillis(); 
    response = createBulkUpdateResult(response); 
} 

@Override 
public void afterBulk(long executionId, BulkRequest request, Throwable failure) { 
    log.error("Bulk , failed! error: ", executionId, failure); 
    throw new DataFWCoreException(String.format("Bulk executionID: %d, update operation failed", executionId), failure); 
} 

}

답변

0

실패 핸들러가 호출됩니다 ... 거부하면 다시 시도 다른 경우를 실용적.

위에서 언급 한 예외를 처리하는 유일한 방법은 각 응답 항목을 구문 분석하고 어떤 일이 있었는지 파악하는 것입니다.

관련 문제