2017-04-12 3 views
3

안녕하세요 저는 종종 StackoverflowError로 인해 실패하는 긴 sparkjob을 실행하려고합니다. 작업은 parquetfile을 읽고 foreach 루프에서 rdd를 만듭니다. 몇 가지 조사를 한 후에 각 rdd에 대한 체크 포인트를 만드는 것이 내 기억 문제를 해결하는 데 도움이 될 것이라고 생각했습니다. (나는 다른 메모리, 오버 헤드 메모리, paralellism, repartition을 시도하고 작업을위한 가장 잘 작동하는 설정을 찾았지만 때로는 여전히 클러스터의 부하에 따라 실패합니다.)rdd.checkpoint가 스파크 작업에서 건너 뛰었습니다.

이제 진짜 문제가 생겼습니다. 체크 포인트를 만들려고합니다. 먼저 마루에서 RDD를 읽은 다음 캐싱하고 체크 포인트 기능을 실행 한 다음 작업을 먼저 호출하여 체크 포인트를 발생시킵니다. 내가 지정한 경로에 검사 점이 만들어지지 않으며 YARN UI에서 해당 단계를 건너 뛰었습니다. 누구나 내가이 문제를 이해하도록 도울 수있다.

ctx.getSparkContext().setCheckpointDir("/tmp/checkpoints"); 
    public static void writeHhidToCouchbase(DataFrameContext ctx, List<String> filePathsStrings) { 
    filePathsStrings 
     .forEach(filePath -> { 
      JavaPairRDD<String, String> rdd = 
       UidHhidPerDay.getParquetFromPath(ctx, filePath); 
      rdd.cache(); 
      rdd.checkpoint(); 
      rdd.first(); 
      rdd.foreachPartition(p -> { 
      CrumbsClient client = getClient(); 
      p.forEachRemaining(uids -> { 
       Crumbs crumbs = client.getAsync(uids._1) 
        .timeout(10, TimeUnit.SECONDS) 
        .toBlocking() 
        .first(); 
       String hHid = uids._2; 
       if (hHid != null) { 
       crumbs.getOrCreateSingletonCrumb(HouseholdCrumb.class).setHouseholdId(hHid); 
       client.putSync(crumbs); 
       } 
      }); 
      client.shutdown(); 
      }); 
     }); 
} 

체크 포인트는 첫 번째 반복에서는 한 번만 생성되지만 다시는 발생하지 않는다. KR

+1

확실한 건지는 모르겠지만'rdd.checkpoint()'에 대한 호출이 중복되는 것처럼 보입니다. rdd에는 잘라내 기위한 부모 계보가 없습니다. 그리고 메모리 사용에 도움이되지 않습니다. – ImDarrenG

+0

@ImDarrenG 감사합니다! 코드로 어떻게 할 수 있는지 보여 주시겠습니까? – Smastik

+0

checkpoint() 호출을 제거하면 도움이되지 않습니다. – ImDarrenG

답변

1

내 실수로 파티션이 실제로 만들어졌습니다. 위에서 언급 한 "첫 번째"파티션은 파티션이있는 디렉토리입니다. 8f987639-d5c8-46b8-a1e0-37081f9f8e00과 같은 디렉토리 이름 때문에 혼란스러워졌습니다. 그러나 @ImDarrenG의 계보 의견을 살펴보면 더 많은 통찰력을 얻었습니다. 필자는 캐시와 체크 포인트가있는 첫 번째 파티션에서 다시 파티션 된 RDD를 새로 만들었습니다. 이로 인해 응용 프로그램이 실패없이 안정적이되었습니다.

JavaPairRDD<String, String> rdd = 
      UidHhidPerDay.getParquetFromPath(ctx, filePath); 
     rdd.cache(); 
     rdd.checkpoint(); 
     rdd.first(); 
     JavaPairRDD<String, String> rddToCompute = rdd.repartition(72); 
     rddToCompute.foreachPartition... 
+0

문제가 발견되어 기쁩니다. 나는 그것이 rdd.cache()없이 안정적인지 알기를 원할 것이다; rdd.checkpoint(); rdd.first(); 하지만 파산하지 않았다면 고치지 마라. – ImDarrenG

+1

고마워요, 안정적이지 못했습니다. xnr : 실패한 작업 foreachpartition :)하지만 지금은 아무 것도 얻지 못했습니다. :) – Smastik

관련 문제