우리가 알고있는 것처럼 카프카에서 Topic
작성은 서버 초기화 부분에서 처리해야합니다. 여기서 기본 스크립트 ./kafka-topics --zookeeper ...
을 사용하지만 주제를 동적으로 만들어야하는 경우에는 어떻게해야합니까?아파치 카프카 코드에서 주제를 작성하십시오
답변
다행히도 Kafka 0.10.1.0
이이 기능을 제공합니다. Confluence Jira 보드에서이 매력적인 기능을 보았지만 주제와 관련된 문서를 찾을 수 없었습니다. 아이러니이지 않습니까?
그래서 소스 코드로 이동하여 주제를 빠르게 만드는 방법을 발견했습니다. 희망적으로 그것은 당신 중 일부에게 도움이 될 것입니다. 물론 더 나은 해결책이 있다면 주저하지 말고 우리와 공유하십시오.
자, 시작하겠습니다.
/** The method propagate topics **/
public List<String> propagateTopics(int partitions, short replication, int timeout) throws IOException {
CreateTopicsRequest.TopicDetails topicDetails = new CreateTopicsRequest.TopicDetails(partitions, replication);
Map<String, CreateTopicsRequest.TopicDetails> topicConfig = mTopics.stream()
.collect(Collectors.toMap(k -> k, v -> topicDetails)); // 1
CreateTopicsRequest request = new CreateTopicsRequest(topicConfig, timeout); // 2
try {
CreateTopicsResponse response = createTopic(request, BOOTSTRAP_SERVERS_CONFIG); // 3
return response.errors().entrySet().stream()
.filter(error -> error.getValue() == Errors.NONE)
.map(Map.Entry::getKey)
.collect(Collectors.toList()); // 4
} catch (IOException e) {
log.error(e);
}
return null;
}
1
우리는 간단하게하기 위해, 나는 모든 주제들 같은 CONFIGS을 공유 할,TopicDetails
의 인스턴스가 필요합니다. 당신이 만들고자하는 모든 주제의 문자열 목록이라고 가정하십시오.mTopics
을 얻을 필요가보다
2
는 기본적으로 우리는 지금 우리가에 대한 특별한 클래스가, 우리의 카프카 클러스터에 요청을 보낼 -CreateTopicsRequest
및 시간 제한을 받아 우리가 요청을 전송하고CreateTopicsResponse
3
private static final short apiKey = ApiKeys.CREATE_TOPICS.id;
private static final short version = 0;
private static final short correlationId = -1;
private static CreateTopicsResponse createTopic(CreateTopicsRequest request, String client) throws IllegalArgumentException, IOException {
String[] comp = client.split(":");
if (comp.length != 2) {
throw new IllegalArgumentException("Wrong client directive");
}
String address = comp[0];
int port = Integer.parseInt(comp[1]);
RequestHeader header = new RequestHeader(apiKey, version, client, correlationId);
ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
header.writeTo(buffer);
request.writeTo(buffer);
byte byteBuf[] = buffer.array();
byte[] resp = requestAndReceive(byteBuf, address, port);
ByteBuffer respBuffer = ByteBuffer.wrap(resp);
ResponseHeader.parse(respBuffer);
return CreateTopicsResponse.parse(respBuffer);
}
private static byte[] requestAndReceive(byte[] buffer, String address, int port) throws IOException {
try(Socket socket = new Socket(address, port);
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
DataInputStream dis = new DataInputStream(socket.getInputStream())
) {
dos.writeInt(buffer.length);
dos.write(buffer);
dos.flush();
byte resp[] = new byte[dis.readInt()];
dis.readFully(resp);
return resp;
} catch (IOException e) {
log.error(e);
}
return new byte[0];
}
요청을 보내고 응답에 바이트 스트림을 구문 분석하는 것 외에는 전혀 마법이 없습니다.
4
CreateTopicsResponse
은key
요청한 항목 이름 인Map<String, Errors>
이다 재산errors
을 가지고 있습니다. 까다로운 점은 요청한 모든 주제가 포함되어 있지만 오류가없는 것은 값이Errors.None
이므로 그 이유는 응답을 필터링하고 성공적으로 생성 된 주제 만 반환하기 때문입니다. 안드레이 Nechaev 확장
카프카 0.10.2.0에'version = 1'을 사용하십시오 –
이 10.2.0로
응답, CreateTopicsRequest의 인스턴스를 얻을 수있는 방법이 조금 변경되었습니다. 우리는 CreateTopicsRequest 인스턴스를 빌드하기 위해 Builder 내부 클래스를 사용해야한다. 다음은 코드 샘플입니다.
CreateTopicsRequest.Builder builder = new CreateTopicsRequest.Builder(topicConfig, timeout, false);
CreateTopicsRequest request = builder.build();
- 1. 아파치 FLINK은 [] 카프카
- 2. 건물 아파치 카프카
- 3. AWS IoT 주제를 동적으로 작성하십시오.
- 4. 주어진 시간대에 카프카 주제를 어떻게 읽습니까?
- 5. 다른 경로에서 여러 카프카 주제를 s3으로 싱크하십시오.
- 6. 카프카 레스트 프록시로 주제를 만들 수 있습니까?
- 7. 카프카 한 주제를 어떻게 소비 할 것인가
- 8. 카프카 리스너에게 주제를 동적으로 전달하는 방법은 무엇입니까?
- 9. 카프카 주제를 사용하여 수년간 데이터 저장
- 10. 카프카 미러가 일부 주제를 건너 뛰는 중
- 11. 카프카 소비자는 다음 주제로 어떤 주제를 선택합니까?
- 12. 합류 플랫폼 대 아파치 카프카
- 13. 아파치 카프카 윈도우 설치 오류 #
- 14. 아파치 URL을 .htaccess에 다시 작성하십시오.
- 15. 아파치 로그에 PHP 변수를 작성하십시오
- 16. 카프카 주제를 만들기 위해 꼭두각시 권장 모듈이 있습니까?
- 17. 아파치 수로 - 카프카 싱크 프로듀서 중복 메시지
- 18. 아파치 카프카에서 상태를 만들고 쿼리하기 : 카프카 스트림?
- 19. 아파치 카프카 메시지 발송 및 균형로드
- 20. 아파치 스톰 (카프카 스파우트 병목 포함)
- 21. 카프카 소비자 대 아파치 플 링크
- 22. 아파치 스파크 시간 기반 카프카 OFF 세트
- 23. 오라클 데이터베이스에 아파치 아파치 httpd 액세스 로그 항목을 작성하십시오
- 24. MQTT/Python 라즈베리 LCD에 여러 주제를 등록하고 페이로드를 작성하십시오.
- 25. 아파치 점화를 사용하여 hdfs에 파일을 작성하십시오.
- 26. 카프카 복제
- 27. 카프카 소비자 콘솔
- 28. 카 프카 주제를 천천히 재현하기.
- 29. 카프카 데이터
- 30. 카프카
더 나은 방법이 있다면 질문은 무엇입니까? –
@NickVanderhoven 그것은 답변을 찾는 사람들에게 팁과 같습니다. 나는 문서도 아니고 대답도 찾을 수 없었다. –
@Andrey : 질문으로, "런타임에 아파치 카프카 주제를 어떻게 만듭니 까"와 같은 질문을 편집하고 원래 답변을 새로운 답변으로 게시하십시오. 자신의 게시물에 응답하는 것은 괜찮습니다. –