IP 때문에 전체 소스 코드를 게시 할 수 없습니다. 그러나 지금 완료되면서 실행되는 Amazon Elastic Map Reduce Job (EMR)을 제출하라는 요청을 받았습니다. 이전에는 파일을 찾을 수 없으므로 오류가 발생했습니다.EMR 작업을 완료하더라도 EMR 결과가 항상 빈 상태 컬렉션을 반환합니다.
RunJobFlowResult result=emr.runJobFlow(request);
성공하면 작업 흐름 ID를 얻을 수 있습니다.
나중에, 상태에 대한 루프 폴링을 먼저 수행합니다. DescribeJobFlowsRequest request = new DescribeJobFlowsRequest (jobFlowIdArray); 을 호출하여 루프의 각 상태를 확인합니다. request.getJobFlowStates()
불행히도이 호출은 작업 실행 중, 실패 또는 성공 여부와 상관없이 항상 빈 콜렉션을 반환합니다. 무슨 일이 일어나고 있는지 적어도 어떻게 표시 할 수 있습니까?
AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
AmazonElasticMapReduceClient client = new AmazonElasticMapReduceClient(credentials);
client.setEndPoint("elasticmapreduce.us-east-1.amazonaws.com");
StepFactory stepFactory = new StepFactory();
StepConfig enableDebugging = new StepConfig()
.withActionOnFailure("TERMINATE_JOB_FLOW")
.withHadoopjJarStep(stepFactory.newEnableDebuggingStep());
String[] arguments={...} // Custom jar arguments
HadoopJarStepConfig jarConfig = new HadoopJarStepConfig();
jarConfig.setJar(JAR_NAME);
jarConfig.setArgs(Arrays.asList(arguments));
StepConfig runJar = new StepConfig(JAR_NAME.substring(JAR_NAME.indexOf('/')+1),jarConfig);
RunJobFlowRequest request = new RunJobFlowRequest()
.withName("...")
.withSteps(runJar)
.withLogUri("...")
.withInstances(
new JobFlowInstancesCOnfig()
.withHadoopVersion("1.0.3")
.withInstanceCount(5)
.withKeepJobFlowAliveWhenNoSteps(false)
.withMasterInstanceType("m1.small")
.withSlaveInstanceType("m1.small");
RunJobFlowResult result = client.runJobFlow(request);
String jobFlowID=result.getJobFlowID();
List<String> describeJobFlowIdList=new ArrayList<String>(1);
describeJobFlowIdList.add(jobFlowID);
String lastState="";
boolean jobMonitoringNotDone=true;
while(jobMonitoringNotDone){
SescribeJobFlowsRequest describeJobFlowsRequest=
new DescribeJobFlowsRequest(describeJobFlowIdList);
// Call to describeJobFlowsRequest.getJobFlowStates() always returns
// empty list even when job succeeds or fails.
for(String state : describeJobFlowsRequest.getJobFlowStates()){
if(DONE_STATES.contains(state)){
jobMonitoringNotDone=false;
} else if(!lastState.equals(state)){
lastState = state;
System.out.println("Job "+state + " at "+ new Date().toString());
}
}
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
문제를 재현하는 최소한의 기능적 예도 만들 수 없습니까? –
내가하려고했던 것을 최소한으로 보여주기 위해 그것을 업데이트했다. –