0

제목이 너무 모호한 경우 사과드립니다.하지만 제대로 문구에 문제가있었습니다.Apache Spark에서 스트리밍 데이터에 합류하십시오.

기본적으로 Apache Kafka와 함께 Apache Spark가 내 관계형 데이터베이스의 데이터를 Elasticsearch에 동기화 할 수 있는지 여부를 파악하려고합니다.

나의 계획은 Kafka 커넥터 중 하나를 사용하여 RDBMS에서 데이터를 읽고이를 Kafka 주제로 푸시하는 것입니다. 그것은 모델 및 DDL의 ERD입니다. 이 아주 기본, ReportProduct 테이블 대다 ReportProduct 테이블에 존재하는 관계 : ERD

CREATE TABLE dbo.Report (
    ReportID INT NOT NULL PRIMARY KEY, 
    Title NVARCHAR(500) NOT NULL, 
    PublishedOn DATETIME2 NOT NULL); 

CREATE TABLE dbo.Product (
    ProductID INT NOT NULL PRIMARY KEY, 
    ProductName NVARCHAR(100) NOT NULL); 

CREATE TABLE dbo.ReportProduct (
    ReportID INT NOT NULL, 
    ProductID INT NOT NULL, 
    PRIMARY KEY (ReportID, ProductID), 
    FOREIGN KEY (ReportID) REFERENCES dbo.Report (ReportID), 
    FOREIGN KEY (ProductID) REFERENCES dbo.Product (ProductID)); 

INSERT INTO dbo.Report (ReportID, Title, PublishedOn) 
VALUES (1, N'Yet Another Apache Spark StackOverflow question', '2017-09-12T19:15:28'); 

INSERT INTO dbo.Product (ProductID, ProductName) 
VALUES (1, N'Apache'), (2, N'Spark'), (3, N'StackOverflow'), (4, N'Random product'); 

INSERT INTO dbo.ReportProduct (ReportID, ProductID) 
VALUES (1, 1), (1, 2), (1, 3), (1, 4); 

SELECT * 
FROM dbo.Report AS R 
INNER JOIN dbo.ReportProduct AS RP 
    ON RP.ReportID = R.ReportID 
INNER JOIN dbo.Product AS P 
    ON P.ProductID = RP.ProductID; 
내 목표는 다음과 같은 구조로 문서에이를 변환하는 것입니다

:

{ 
    "ReportID":1, 
    "Title":"Yet Another Apache Spark StackOverflow question", 
    "PublishedOn":"2017-09-12T19:15:28+00:00", 
    "Product":[ 
    { 
     "ProductID":1, 
     "ProductName":"Apache" 
    }, 
    { 
     "ProductID":2, 
     "ProductName":"Spark" 
    }, 
    { 
     "ProductID":3, 
     "ProductName":"StackOverflow" 
    }, 
    { 
     "ProductID":4, 
     "ProductName":"Random product" 
    } 
    ] 
} 

로컬로 조롱 한 정적 데이터를 사용하여 이러한 종류의 구조를 만들 수있었습니다.

report.join(
    report_product.join(product, "product_id") 
    .groupBy("report_id") 
    .agg(
     collect_list(struct("product_id", "product_name")).alias("product") 
    ), "report_id").show 

하지만 이것은 너무 기본적이고 스트림이 훨씬 더 복잡해질 것이라는 것을 알고 있습니다.

데이터가 불규칙적으로 변경되고 보고서 및 제품이 지속적으로 변경되고 제품이 (주로 주 단위로) 한 번씩 변경됩니다.

이러한 테이블 중 하나에서 발생한 변경 사항을 Elasticsearch에 복제하고 싶습니다. 당신이 Confluent Platform (또는 separately)의 일부로 사용할 수있는 JDBC Source을 사용할 수 있으며, 또한 '당신이 한 번 kafka-connect-cdc-mssql

  • 조사 할 수 있습니다 -

  • 답변

    1
    1. 카프카 연결은 소스 DB에서 데이터를 당겨 카프카에서 데이터를 얻었 으면 Kafka Streams API을 사용하여 원하는대로 데이터를 조작하거나 새로 출시 된 KSQL을 봅니다. 어떤 것을 택할지는 자바 (Kafka Streams)로 코딩하거나 SQL과 비슷한 환경 (KSQL로)에서 데이터를 조작하는 것과 같은 것들에 의해 좌우 될 것입니다. 그럼에도 불구하고이 두 가지 결과는 카프카의 또 다른 주제가 될 것입니다.

    2. 마지막으로, (here 가능한, 또는 Confluent Platform의 일부 등) Elasticsearch 카프카 연결 플러그인 정말 좋은 소리

    +0

    를 사용 Elasticsearch에 위에서 카프카 항목을 스트리밍. 이전에 작성한 연구 결과에서 Kafka는 나를위한 비 파티션 키에 참여하게하지 않았습니다. KSQL이이를 해결합니까? –

    +0

    KSQL를 사용하여 쉽게 다시 분할 할 수 있습니다.이 문제는이 문제를 해결할 수있는 방법입니다. 나는 그것을 시도하지 않았다. –

    관련 문제