제목이 너무 모호한 경우 사과드립니다.하지만 제대로 문구에 문제가있었습니다.Apache Spark에서 스트리밍 데이터에 합류하십시오.
기본적으로 Apache Kafka와 함께 Apache Spark가 내 관계형 데이터베이스의 데이터를 Elasticsearch에 동기화 할 수 있는지 여부를 파악하려고합니다.
나의 계획은 Kafka 커넥터 중 하나를 사용하여 RDBMS에서 데이터를 읽고이를 Kafka 주제로 푸시하는 것입니다. 그것은 모델 및 DDL의 ERD입니다. 이 아주 기본, Report
및 Product
테이블 대다 ReportProduct
테이블에 존재하는 관계 :
CREATE TABLE dbo.Report (
ReportID INT NOT NULL PRIMARY KEY,
Title NVARCHAR(500) NOT NULL,
PublishedOn DATETIME2 NOT NULL);
CREATE TABLE dbo.Product (
ProductID INT NOT NULL PRIMARY KEY,
ProductName NVARCHAR(100) NOT NULL);
CREATE TABLE dbo.ReportProduct (
ReportID INT NOT NULL,
ProductID INT NOT NULL,
PRIMARY KEY (ReportID, ProductID),
FOREIGN KEY (ReportID) REFERENCES dbo.Report (ReportID),
FOREIGN KEY (ProductID) REFERENCES dbo.Product (ProductID));
INSERT INTO dbo.Report (ReportID, Title, PublishedOn)
VALUES (1, N'Yet Another Apache Spark StackOverflow question', '2017-09-12T19:15:28');
INSERT INTO dbo.Product (ProductID, ProductName)
VALUES (1, N'Apache'), (2, N'Spark'), (3, N'StackOverflow'), (4, N'Random product');
INSERT INTO dbo.ReportProduct (ReportID, ProductID)
VALUES (1, 1), (1, 2), (1, 3), (1, 4);
SELECT *
FROM dbo.Report AS R
INNER JOIN dbo.ReportProduct AS RP
ON RP.ReportID = R.ReportID
INNER JOIN dbo.Product AS P
ON P.ProductID = RP.ProductID;
내 목표는 다음과 같은 구조로 문서에이를 변환하는 것입니다
:
{
"ReportID":1,
"Title":"Yet Another Apache Spark StackOverflow question",
"PublishedOn":"2017-09-12T19:15:28+00:00",
"Product":[
{
"ProductID":1,
"ProductName":"Apache"
},
{
"ProductID":2,
"ProductName":"Spark"
},
{
"ProductID":3,
"ProductName":"StackOverflow"
},
{
"ProductID":4,
"ProductName":"Random product"
}
]
}
로컬로 조롱 한 정적 데이터를 사용하여 이러한 종류의 구조를 만들 수있었습니다.
report.join(
report_product.join(product, "product_id")
.groupBy("report_id")
.agg(
collect_list(struct("product_id", "product_name")).alias("product")
), "report_id").show
하지만 이것은 너무 기본적이고 스트림이 훨씬 더 복잡해질 것이라는 것을 알고 있습니다.
데이터가 불규칙적으로 변경되고 보고서 및 제품이 지속적으로 변경되고 제품이 (주로 주 단위로) 한 번씩 변경됩니다.
이러한 테이블 중 하나에서 발생한 변경 사항을 Elasticsearch에 복제하고 싶습니다. 당신이 Confluent Platform (또는 separately)의 일부로 사용할 수있는 JDBC Source을 사용할 수 있으며, 또한 '당신이 한 번 kafka-connect-cdc-mssql
조사 할 수 있습니다 -
를 사용 Elasticsearch에 위에서 카프카 항목을 스트리밍. 이전에 작성한 연구 결과에서 Kafka는 나를위한 비 파티션 키에 참여하게하지 않았습니다. KSQL이이를 해결합니까? –
KSQL를 사용하여 쉽게 다시 분할 할 수 있습니다.이 문제는이 문제를 해결할 수있는 방법입니다. 나는 그것을 시도하지 않았다. –