안녕하세요 저는 종종 StackoverflowError로 인해 실패하는 긴 sparkjob을 실행하려고합니다. 작업은 parquetfile을 읽고 foreach 루프에서 rdd를 만듭니다. 몇 가지 조사를 한 후에 각 rdd에 대한 체크 포인트를 만드는 것이 내 기억 문제를 해결하는 데 도움이 될 것이라고 생각했습니다. (나는 다른 메모리, 오버 헤드 메모리, paralellism, repartition을 시도하고 작업을위한 가장 잘 작동하는 설정을 찾았지만 때로는 여전히 클러스터의 부하에 따라 실패합니다.)rdd.checkpoint가 스파크 작업에서 건너 뛰었습니다.
이제 진짜 문제가 생겼습니다. 체크 포인트를 만들려고합니다. 먼저 마루에서 RDD를 읽은 다음 캐싱하고 체크 포인트 기능을 실행 한 다음 작업을 먼저 호출하여 체크 포인트를 발생시킵니다. 내가 지정한 경로에 검사 점이 만들어지지 않으며 YARN UI에서 해당 단계를 건너 뛰었습니다. 누구나 내가이 문제를 이해하도록 도울 수있다.
ctx.getSparkContext().setCheckpointDir("/tmp/checkpoints");
public static void writeHhidToCouchbase(DataFrameContext ctx, List<String> filePathsStrings) {
filePathsStrings
.forEach(filePath -> {
JavaPairRDD<String, String> rdd =
UidHhidPerDay.getParquetFromPath(ctx, filePath);
rdd.cache();
rdd.checkpoint();
rdd.first();
rdd.foreachPartition(p -> {
CrumbsClient client = getClient();
p.forEachRemaining(uids -> {
Crumbs crumbs = client.getAsync(uids._1)
.timeout(10, TimeUnit.SECONDS)
.toBlocking()
.first();
String hHid = uids._2;
if (hHid != null) {
crumbs.getOrCreateSingletonCrumb(HouseholdCrumb.class).setHouseholdId(hHid);
client.putSync(crumbs);
}
});
client.shutdown();
});
});
}
체크 포인트는 첫 번째 반복에서는 한 번만 생성되지만 다시는 발생하지 않는다. KR
확실한 건지는 모르겠지만'rdd.checkpoint()'에 대한 호출이 중복되는 것처럼 보입니다. rdd에는 잘라내 기위한 부모 계보가 없습니다. 그리고 메모리 사용에 도움이되지 않습니다. – ImDarrenG
@ImDarrenG 감사합니다! 코드로 어떻게 할 수 있는지 보여 주시겠습니까? – Smastik
checkpoint() 호출을 제거하면 도움이되지 않습니다. – ImDarrenG