2017-03-29 1 views
1

텍스트 파일과 mysql 레코드를 모두 읽고 레코드를 읽는 간단한 파이프 라인을 가지고 있습니다. 즉, DB에 레코드가 없으면 레코드를 삽입하고 DB의 레코드를 파일로 업데이트하고 다른 업데이트를 수행합니다 파일에 존재하지 않는 DB의 레코드빔 작업을 Spark에 균등하게 분배하는 방법은 무엇입니까?

enter image description here

내 직감은 다음과 같은 코드가 불균형 여기

 final TupleTag<FileRecord> fileTag = new TupleTag<>(); 
     final TupleTag<MysqlRecord> mysqlTag = new TupleTag<>(); 
     PCollection<KV<Integer, CoGbkResult>> joinedRawCollection = 
       KeyedPCollectionTuple.of(fileTag, fileRecords) 
         .and(mysqlTag, mysqlRecords) 
         .apply(CoGroupByKey.create()); 

을 생산하고있는 스파크입니다 :

스파크에서 2M 기록으로 실행할 때 발생하는 문제는이 다음이다 집행자 DAG 시각화

enter image description here

결국 한 명의 작업자가 메모리가 부족합니다. Spark에서 기본적으로 Partitioners를 지정하여 작업 부하를 작업자에게 배포 할 수 있습니다. 그러나 빔에서 어떻게 할 수 있습니까?

편집 : 나는 JDBCIo 제대로 하나 개의 쿼리를 배포 할 수 없음을 의심

그래서 난 여러 PCollections로를 분할 한 후 나중에 평평. MySQL에서 훨씬 빨리 읽었지만 같은 문제가 발생했습니다. enter image description here

그러나 각 단계는 아직 분화하는 내 자신의 실패의 실현과 내 자신의 질문에 대답하기 위해 그 불균형? enter image description here

+0

실제로 많은 레코드를 가진 MySQL의 읽기 단계이기 때문에 이러한 불균형의 원인처럼 보일 수 있습니다. JDBCIO는 아마 하나의 SELECT 질의를 배포하지 않을 것이므로, 우리는 그 경합을 보았습니다. 나눌거야. – nambrot

답변

0

을 앓고 : 여기

는, 작업중인 단계입니다 스파크 단계와 과제 사이. 작업이 실제로 퍼져 나갔고, 실제로 드라이버 프로그램에 충분한 메모리를 할당하지 않았습니다.

관련 문제