2014-01-08 4 views
0

IP 때문에 전체 소스 코드를 게시 할 수 없습니다. 그러나 지금 완료되면서 실행되는 Amazon Elastic Map Reduce Job (EMR)을 제출하라는 요청을 받았습니다. 이전에는 파일을 찾을 수 없으므로 오류가 발생했습니다.EMR 작업을 완료하더라도 EMR 결과가 항상 빈 상태 컬렉션을 반환합니다.

RunJobFlowResult result=emr.runJobFlow(request); 

성공하면 작업 흐름 ID를 얻을 수 있습니다.

나중에, 상태에 대한 루프 폴링을 먼저 수행합니다. DescribeJobFlowsRequest request = new DescribeJobFlowsRequest (jobFlowIdArray); 을 호출하여 루프의 각 상태를 확인합니다. request.getJobFlowStates()

불행히도이 호출은 작업 실행 중, 실패 또는 성공 여부와 상관없이 항상 빈 콜렉션을 반환합니다. 무슨 일이 일어나고 있는지 적어도 어떻게 표시 할 수 있습니까?

AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey); 
AmazonElasticMapReduceClient client = new AmazonElasticMapReduceClient(credentials); 
client.setEndPoint("elasticmapreduce.us-east-1.amazonaws.com"); 

StepFactory stepFactory = new StepFactory(); 
StepConfig enableDebugging = new StepConfig() 
.withActionOnFailure("TERMINATE_JOB_FLOW") 
.withHadoopjJarStep(stepFactory.newEnableDebuggingStep()); 


String[] arguments={...} // Custom jar arguments 

HadoopJarStepConfig jarConfig = new HadoopJarStepConfig(); 
jarConfig.setJar(JAR_NAME); 
jarConfig.setArgs(Arrays.asList(arguments)); 

StepConfig runJar = new  StepConfig(JAR_NAME.substring(JAR_NAME.indexOf('/')+1),jarConfig); 

RunJobFlowRequest request = new RunJobFlowRequest() 
.withName("...") 
.withSteps(runJar) 
.withLogUri("...") 
.withInstances(
    new JobFlowInstancesCOnfig() 
     .withHadoopVersion("1.0.3") 
     .withInstanceCount(5) 
     .withKeepJobFlowAliveWhenNoSteps(false) 
     .withMasterInstanceType("m1.small") 
      .withSlaveInstanceType("m1.small"); 

RunJobFlowResult result = client.runJobFlow(request); 
String jobFlowID=result.getJobFlowID(); 
List<String> describeJobFlowIdList=new ArrayList<String>(1); 
describeJobFlowIdList.add(jobFlowID); 

String lastState=""; 
boolean jobMonitoringNotDone=true; 
while(jobMonitoringNotDone){ 
    SescribeJobFlowsRequest describeJobFlowsRequest= 
     new DescribeJobFlowsRequest(describeJobFlowIdList); 
    // Call to describeJobFlowsRequest.getJobFlowStates() always returns 
    // empty list even when job succeeds or fails. 
    for(String state : describeJobFlowsRequest.getJobFlowStates()){ 
     if(DONE_STATES.contains(state)){ 
      jobMonitoringNotDone=false; 
     } else if(!lastState.equals(state)){ 
      lastState = state; 
      System.out.println("Job "+state + " at "+ new Date().toString()); 
     } 
    } 
    try { 
    Thread.sleep(10000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} 
+0

문제를 재현하는 최소한의 기능적 예도 만들 수 없습니까? –

+0

내가하려고했던 것을 최소한으로 보여주기 위해 그것을 업데이트했다. –

답변

0

위의 코드는

DescribeJobFlowsResult describeJobFlowsResult =  client.describeJobFlows(describeJobFlowsRequest); 

이 유사한 전화를 누락 나에게 작동하는 솔루션을 가지고,하지만 불행하게도 아마존은 방법을 사용되지하지만 대안을 제공하지 않았다. 이 부분적인 대답 일 뿐이므로 사용하지 않는 솔루션이 있었으면합니다.

관련 문제