텍스트 파일과 mysql 레코드를 모두 읽고 레코드를 읽는 간단한 파이프 라인을 가지고 있습니다. 즉, DB에 레코드가 없으면 레코드를 삽입하고 DB의 레코드를 파일로 업데이트하고 다른 업데이트를 수행합니다 파일에 존재하지 않는 DB의 레코드빔 작업을 Spark에 균등하게 분배하는 방법은 무엇입니까?
내 직감은 다음과 같은 코드가 불균형 여기
final TupleTag<FileRecord> fileTag = new TupleTag<>();
final TupleTag<MysqlRecord> mysqlTag = new TupleTag<>();
PCollection<KV<Integer, CoGbkResult>> joinedRawCollection =
KeyedPCollectionTuple.of(fileTag, fileRecords)
.and(mysqlTag, mysqlRecords)
.apply(CoGroupByKey.create());
을 생산하고있는 스파크입니다 :
스파크에서 2M 기록으로 실행할 때 발생하는 문제는이 다음이다 집행자 DAG 시각화
결국 한 명의 작업자가 메모리가 부족합니다. Spark에서 기본적으로 Partitioners를 지정하여 작업 부하를 작업자에게 배포 할 수 있습니다. 그러나 빔에서 어떻게 할 수 있습니까?
편집 : 나는 JDBCIo 제대로 하나 개의 쿼리를 배포 할 수 없음을 의심
그래서 난 여러 PCollections로를 분할 한 후 나중에 평평. MySQL에서 훨씬 빨리 읽었지만 같은 문제가 발생했습니다.
그러나 각 단계는 아직 분화하는 내 자신의 실패의 실현과 내 자신의 질문에 대답하기 위해 그 불균형?
실제로 많은 레코드를 가진 MySQL의 읽기 단계이기 때문에 이러한 불균형의 원인처럼 보일 수 있습니다. JDBCIO는 아마 하나의 SELECT 질의를 배포하지 않을 것이므로, 우리는 그 경합을 보았습니다. 나눌거야. – nambrot