0
두 개의 토픽이 간단한 키 값 구조 (정수/문자열)를 갖고 있고 완벽하게 작동하는 조인 스트림에서 아주 간단한 예제를했습니다.Kafka가 키로 스트림을 합치십시오.
SELECT * FROM stream1, stream2
WHERE stream1.key = stream2.key AND (stream1.key > 50 && stream1.key < 100) AND (stream2.key > 50 AND stream2.key < 100)
카프카 이런 식으로 뭔가를 할 수 있습니다 :
나는 어떻게 같은 것을 할 수있는, 물어 봐도 될까요?
마지막으로 제가하고 싶은 것은이 키가 GenericRecord 될 것입니다 스트림에 합류 필터링이며 것이다 어떻게 든 같습니다
SELECT * FROM stream1, stream2
WHERE stream1.genericRecordkey.someId. = stream2.genericRecordkey.someId
내 테스트 예 : 나는 잘 설명
public void joinKStreamToKStreamWhereKeyValueIsIntegerString() throws Exception {
String uniqueKey = new Object() {
}.getClass().getEnclosingMethod().getName();
long timestamp = new Date().getTime();
String firstTopic = String.format("%1$s_1_%2$s", uniqueKey, timestamp);
String secondTopic = String.format("%1$s_2_%2$s", uniqueKey, timestamp);
String outputTopic = String.format("%1$s_output_%2$s", uniqueKey, timestamp);
String appIdConfig = String.format("%1$s_app_id_%2$s", uniqueKey, timestamp);
String groupIdConfig = String.format("%1$s_group_id_%2$s", uniqueKey, timestamp);
List<KeyValue<Integer, String>> ikv1 = Arrays.asList(
new KeyValue<>(1, "Bruce Eckel"),
new KeyValue<>(2, "Robert Lafore"),
new KeyValue<>(3, "Andrew Tanenbaum")
);
List<KeyValue<Integer, String>> ikv2 = Arrays.asList(
new KeyValue<>(3, "Modern Operating System"),
new KeyValue<>(1, "Thinking in Java"),
new KeyValue<>(3, "Computer Architecture"),
new KeyValue<>(4, "Programming in Scala")
);
List<KeyValue<Integer, String>> expectedResults = Arrays.asList(
new KeyValue<>(3, "Andrew Tanenbaum/Modern Operating System"),
new KeyValue<>(1, "Bruce Eckel/Thinking in Java"),
new KeyValue<>(3, "Andrew Tanenbaum/Computer Architecture")
);
Integer partitions = 1;
Integer replication = 1;
Properties topicConfig = new Properties();
TopicUtils.createTopic(firstTopic, partitions, replication, topicConfig);
TopicUtils.createTopic(secondTopic, partitions, replication, topicConfig);
TopicUtils.createTopic(outputTopic, partitions, replication, topicConfig);
final Serde<String> stringSerde = Serdes.String();
final Serde<Integer> integerSerde = Serdes.Integer();
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appIdConfig);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_CONNECT_CONFIG);
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// The commit interval for flushing records to state stores and downstream must be lower than
// this integration test's timeout (30 secs) to ensure we observe the expected processing results.
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Use a temporary directory for storing state, which will be automatically removed after the test.
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
KStreamBuilder builder = new KStreamBuilder();
KStream<Integer, String> firstStream = builder.stream(integerSerde, stringSerde, firstTopic);
KStream<Integer, String> secondStream = builder.stream(integerSerde, stringSerde, secondTopic);
KStream<Integer, String> outputStream = firstStream.join(secondStream, (l, r) -> {
return l + "/" + r;
}, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)), integerSerde, stringSerde, stringSerde);
outputStream.to(integerSerde, stringSerde, outputTopic);
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
Properties pCfg1 = new Properties();
pCfg1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
pCfg1.put(ProducerConfig.ACKS_CONFIG, "all");
pCfg1.put(ProducerConfig.RETRIES_CONFIG, 0);
pCfg1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
pCfg1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceKeyValuesSynchronously(firstTopic, ikv1, pCfg1);
Properties pCfg2 = new Properties();
pCfg2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
pCfg2.put(ProducerConfig.ACKS_CONFIG, "all");
pCfg2.put(ProducerConfig.RETRIES_CONFIG, 0);
pCfg2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
pCfg2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceKeyValuesSynchronously(secondTopic, ikv2, pCfg2);
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
List<KeyValue<Integer, String>> actualResults = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedResults.size());
streams.close();
assertThat(actualResults).containsExactlyElementsOf(expectedResults);
}
희망과 어떤 도움을 주셔서 감사합니다.
: 워드 프로세서를 확인, 자세한 내용은
가 : 당신이
stream1.genericRecordkey.someId
에 가입하려면먼저
someId
를 추출하고 키로 설정해야합니다 두 주제 사이에서 일반 레코드 키와 값을 필터링하는 방법을 이해하지 못했습니다. 내 말은 .filter (keyFromTopic1.something == keyFromTopic2.something && valueFromTopic1.something == valueFromTopic 2.something) 더 자세한 코드를 설명해 주시겠습니까? – EVO원래 예제는 'stream1.key> 50'과 같은 "간단한"필터 조건 자만 보여주었습니다. 복잡한 부분의 경우 먼저 키에 조인하고 조인 후에 값 필터 부분에 대해 추가 필터를 적용해야합니다. –