Nifi/HDF를 사용하여 MS SQL에서 델타 기록을 읽는 방법,이 테이블은 매 초마다 업데이트 및 선택 내부를 가정하자 다소 모양이내가 MS SQL에 몇 테이블이
SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID
WHERE table2.UpdateTime >= ${lastUpdateTime} AND table2.G_ID > ${lastID}
같은 쿼리됩니다 아래와 같이 쿼리 결과 5 개 레코드를 조인합니다.
쿼리가 처음 실행되는 경우 ${lastUpdateTime}
및 ${lastG_ID}
이 0으로 설정되고 5 개의 레코드 아래로 반환됩니다. 레코드를 처리 한 후 쿼리는 max(G_ID)
즉 5와 max(UpdateTime)
을 저장합니다. 즉, etl_stat
테이블에 1512010479가 저장됩니다. 아래 그림과 같이 테이블이 다른 5 개 개의 새 레코드를 추가 할 경우
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
1 1512010470 12591225 DUMMY_DATA DUMMY_ID
2 1512096873 12591538 DUMMY_DATA DUMMY_ID
3 1512096875 12591539 DUMMY_DATA DUMMY_ID
4 1512010477 12591226 DUMMY_DATA DUMMY_ID
5 1512010479 12591227 DUMMY_DATA DUMMY_ID
:
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
1 1512010470 12591225 DUMMY_DATA DUMMY_ID
2 1512096873 12591538 DUMMY_DATA DUMMY_ID
3 1512096875 12591539 DUMMY_DATA DUMMY_ID
4 1512010477 12591226 DUMMY_DATA DUMMY_ID
5 1512010479 12591227 DUMMY_DATA DUMMY_ID
6 1512010480 12591230 DUMMY_DATA DUMMY_ID
7 1512010485 12591231 DUMMY_DATA DUMMY_ID
8 1512010490 12591232 DUMMY_DATA DUMMY_ID
9 1512010493 12591233 DUMMY_DATA DUMMY_ID
10 1512010500 12591234 DUMMY_DATA DUMMY_ID
쿼리는 먼저
etl_stat table
에서
max(G_ID)
및
max(UpdateTime)
을 읽을 수 및
SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5
을 다음과 같이 쿼리를 프레임합니다
있도록 쿼리 반환 아래 그림과 같이 5 개의 델타 레코드 만 있습니다.
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
6 1512010480 12591230 DUMMY_DATA DUMMY_ID
7 1512010485 12591231 DUMMY_DATA DUMMY_ID
8 1512010490 12591232 DUMMY_DATA DUMMY_ID
9 1512010493 12591233 DUMMY_DATA DUMMY_ID
10 1512010500 12591234 DUMMY_DATA DUMMY_ID
그래서 쿼리를 실행할 때마다 먼저
etl_stat
테이블에서
max(G_ID)
및
max(UpdateTime)
을 읽고 위의 그림과 델타 변경을 얻을로 선택 내부 쿼리에 가입 프레임한다. 다음 I 위의 사용 사례를 구현
SPARK의 SQL을 사용 ARCHITECTURE 그대로
는 :
1) JDBC를 스파크 etl_stat
테이블에서 max(G_ID)
및 max(UpdateTime)
얻을 피닉스 테이블을 판독한다.
2) JDBC 스파크는 2 내부 쿼리를 조인 단계를 실행하는 선택 내부)이이 SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5
3처럼 쿼리에 가입 JDBC 불꽃 프레임 MS SQL 서버 프로세스에서 델타 메시지 HBase를에 기록하고 삽입을 읽습니다.
4) HBase를에 성공적으로 삽입 한 후, 스파크 최신 G_ID
즉 10 UpdateTime
즉 1512010500.
5)이 작업이되어 크론 1 분마다 실행되도록 예약 한과 etl_stat
테이블을 업데이트합니다.
내가 Nifi이 사용 사례를 이동할 NIFI
을 사용 ARCHITECTURE하기 위해, 나는 MS SQL DB에서 레코드를 읽고 카프카이 기록을 보내 NiFi를 사용하고 싶습니다.
성공적으로 카프카에 게시하면 NiFi가 G_ID 및 UpdateTime을 데이터베이스에 저장합니다.
메시지가 카프카에 도착하면 스파크 스트리밍은 카프카의 메시지를 읽고 기존 비즈니스 로직을 사용하여 HBase에 저장합니다.
Nifi 프로세서는 매번 델타 레코드를 가져와 Kafka에 게시하기 위해 max(G_ID)
및 max(UpdateTime)
을 사용하여 프레임 선택 내부 조인 쿼리를 실행해야합니다.
Nifi/HDF를 처음 사용했습니다. Nifi/HDF를 사용하여이를 구현하려면 귀하의 도움과 안내가 필요합니다. 이 유스 케이스에 대한 더 나은 솔루션/아키텍처가 있다면 제안하십시오.
죄송합니다.
빠른 응답 주셔서 감사합니다.이 사용 사례에 대해 Nifi/HDF를 사용하는 솔루션이 필요합니다. – nilesh1212
NiFi를 사용하여이 문제를 해결할 사람이 있다면이 문제에 대해 도움을 받으십시오. – nilesh1212
@daggett – nilesh1212