2017-12-01 2 views
0

Nifi/HDF를 사용하여 MS SQL에서 델타 기록을 읽는 방법,이 테이블은 매 초마다 업데이트 및 선택 내부를 가정하자 다소 모양이내가 MS SQL에 몇 테이블이

SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID 
WHERE table2.UpdateTime >= ${lastUpdateTime} AND table2.G_ID > ${lastID} 

같은 쿼리됩니다 아래와 같이 쿼리 결과 5 개 레코드를 조인합니다.

쿼리가 처음 실행되는 경우 ${lastUpdateTime}${lastG_ID}이 0으로 설정되고 5 개의 레코드 아래로 반환됩니다. 레코드를 처리 한 후 쿼리는 max(G_ID) 즉 5와 max(UpdateTime)을 저장합니다. 즉, etl_stat 테이블에 1512010479가 저장됩니다. 아래 그림과 같이 테이블이 다른 5 개 개의 새 레코드를 추가 할 경우

G_ID  UpdateTime ID   Name    T_NAME 
------------------------------------------------------------------- 
1   1512010470 12591225  DUMMY_DATA  DUMMY_ID  
2   1512096873 12591538  DUMMY_DATA  DUMMY_ID  
3   1512096875 12591539  DUMMY_DATA  DUMMY_ID  
4   1512010477 12591226  DUMMY_DATA  DUMMY_ID  
5   1512010479 12591227  DUMMY_DATA  DUMMY_ID  

:

G_ID  UpdateTime ID   Name    T_NAME 
------------------------------------------------------------------- 
1   1512010470 12591225  DUMMY_DATA  DUMMY_ID  
2   1512096873 12591538  DUMMY_DATA  DUMMY_ID  
3   1512096875 12591539  DUMMY_DATA  DUMMY_ID  
4   1512010477 12591226  DUMMY_DATA  DUMMY_ID  
5   1512010479 12591227  DUMMY_DATA  DUMMY_ID 
6   1512010480 12591230  DUMMY_DATA  DUMMY_ID 
7   1512010485 12591231  DUMMY_DATA  DUMMY_ID 
8   1512010490 12591232  DUMMY_DATA  DUMMY_ID 
9   1512010493 12591233  DUMMY_DATA  DUMMY_ID 
10   1512010500 12591234  DUMMY_DATA  DUMMY_ID 
쿼리는 먼저 etl_stat table에서 max(G_ID)max(UpdateTime)을 읽을 수 및 SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5을 다음과 같이 쿼리를 프레임합니다

있도록 쿼리 반환 아래 그림과 같이 5 개의 델타 레코드 만 있습니다.

G_ID  UpdateTime ID   Name    T_NAME 
------------------------------------------------------------------- 
6   1512010480 12591230  DUMMY_DATA  DUMMY_ID 
7   1512010485 12591231  DUMMY_DATA  DUMMY_ID 
8   1512010490 12591232  DUMMY_DATA  DUMMY_ID 
9   1512010493 12591233  DUMMY_DATA  DUMMY_ID 
10   1512010500 12591234  DUMMY_DATA  DUMMY_ID 

그래서 쿼리를 실행할 때마다 먼저 etl_stat 테이블에서 max(G_ID)max(UpdateTime)을 읽고 위의 그림과 델타 변경을 얻을로 선택 내부 쿼리에 가입 프레임한다. 다음 I 위의 사용 사례를 구현

SPARK의 SQL을 사용 ARCHITECTURE 그대로

는 :

1) JDBC를 스파크 etl_stat 테이블에서 max(G_ID)max(UpdateTime) 얻을 피닉스 테이블을 판독한다.

2) JDBC 스파크는 2 내부 쿼리를 조인 단계를 실행하는 선택 내부)이이 SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5

3처럼 쿼리에 가입 JDBC 불꽃 프레임 MS SQL 서버 프로세스에서 델타 메시지 HBase를에 기록하고 삽입을 읽습니다.

4) HBase를에 성공적으로 삽입 한 후, 스파크 최신 G_ID 즉 10 UpdateTime 즉 1512010500.

5)이 작업이되어 크론 1 분마다 실행되도록 예약 한과 etl_stat 테이블을 업데이트합니다.

내가 Nifi이 사용 사례를 이동할 NIFI

을 사용 ARCHITECTURE하기 위해, 나는 MS SQL DB에서 레코드를 읽고 카프카이 기록을 보내 NiFi를 사용하고 싶습니다.

성공적으로 카프카에 게시하면 NiFi가 G_ID 및 UpdateTime을 데이터베이스에 저장합니다.

메시지가 카프카에 도착하면 스파크 스트리밍은 카프카의 메시지를 읽고 기존 비즈니스 로직을 사용하여 HBase에 저장합니다.

Nifi 프로세서는 매번 델타 레코드를 가져와 Kafka에 게시하기 위해 max(G_ID)max(UpdateTime)을 사용하여 프레임 선택 내부 조인 쿼리를 실행해야합니다.

Nifi/HDF를 처음 사용했습니다. Nifi/HDF를 사용하여이를 구현하려면 귀하의 도움과 안내가 필요합니다. 이 유스 케이스에 대한 더 나은 솔루션/아키텍처가 있다면 제안하십시오.

죄송합니다.

답변

0

당신이 설명하는 것은 JDBC Kafka Connect connector의 기능입니다. 구성 파일을 설정하고로드하십시오. 끝난. Kafka Connect는 Apache Kafka의 일부입니다. 추가 도구 및 기술이 필요하지 않습니다.

적절한 변경 데이터 캡처 (CDC)를 고려할 수도 있습니다. 독점적 인 RDBMS (Oracle, DB2, MS SQL 등)에는 GoldenGate, Attunity, DBVisit 등과 같은 상용 도구가 있습니다. 오픈 소스 RDBMS (예 : MySQL, PostgreSQL)의 경우 오픈 소스 Debezium 도구를 살펴 봐야합니다. 이러한 모든 CDC 도구는 Kafka와 직접 통합됩니다.

+0

빠른 응답 주셔서 감사합니다.이 사용 사례에 대해 Nifi/HDF를 사용하는 솔루션이 필요합니다. – nilesh1212

+0

NiFi를 사용하여이 문제를 해결할 사람이 있다면이 문제에 대해 도움을 받으십시오. – nilesh1212

+0

@daggett – nilesh1212

관련 문제