2016-12-04 5 views
4

우리가 알고있는 것처럼 카프카에서 Topic 작성은 서버 초기화 부분에서 처리해야합니다. 여기서 기본 스크립트 ./kafka-topics --zookeeper ...을 사용하지만 주제를 동적으로 만들어야하는 경우에는 어떻게해야합니까?아파치 카프카 코드에서 주제를 작성하십시오

+0

더 나은 방법이 있다면 질문은 무엇입니까? –

+0

@NickVanderhoven 그것은 답변을 찾는 사람들에게 팁과 같습니다. 나는 문서도 아니고 대답도 찾을 수 없었다. –

+0

@Andrey : 질문으로, "런타임에 아파치 카프카 주제를 어떻게 만듭니 까"와 같은 질문을 편집하고 원래 답변을 새로운 답변으로 게시하십시오. 자신의 게시물에 응답하는 것은 괜찮습니다. –

답변

5

다행히도 Kafka 0.10.1.0이이 기능을 제공합니다. Confluence Jira 보드에서이 매력적인 기능을 보았지만 주제와 관련된 문서를 찾을 수 없었습니다. 아이러니이지 않습니까?

그래서 소스 코드로 이동하여 주제를 빠르게 만드는 방법을 발견했습니다. 희망적으로 그것은 당신 중 일부에게 도움이 될 것입니다. 물론 더 나은 해결책이 있다면 주저하지 말고 우리와 공유하십시오.

자, 시작하겠습니다.

/** The method propagate topics **/ 
public List<String> propagateTopics(int partitions, short replication, int timeout) throws IOException { 
    CreateTopicsRequest.TopicDetails topicDetails = new CreateTopicsRequest.TopicDetails(partitions, replication); 
    Map<String, CreateTopicsRequest.TopicDetails> topicConfig = mTopics.stream() 
      .collect(Collectors.toMap(k -> k, v -> topicDetails)); // 1 

    CreateTopicsRequest request = new CreateTopicsRequest(topicConfig, timeout); // 2 

    try { 
     CreateTopicsResponse response = createTopic(request, BOOTSTRAP_SERVERS_CONFIG); // 3 
     return response.errors().entrySet().stream() 
       .filter(error -> error.getValue() == Errors.NONE) 
       .map(Map.Entry::getKey) 
       .collect(Collectors.toList()); // 4 
    } catch (IOException e) { 
     log.error(e); 
    } 

    return null; 
} 

1 우리는 간단하게하기 위해, 나는 모든 주제들 같은 CONFIGS을 공유 할, TopicDetails의 인스턴스가 필요합니다. 당신이 만들고자하는 모든 주제의 문자열 목록이라고 가정하십시오. mTopics

2는 기본적으로 우리는 지금 우리가에 대한 특별한 클래스가, 우리의 카프카 클러스터에 요청을 보낼 - CreateTopicsRequest 및 시간 제한을 받아 우리가 요청을 전송하고 CreateTopicsResponse

을 얻을 필요가보다

3

private static final short apiKey = ApiKeys.CREATE_TOPICS.id; 
    private static final short version = 0; 
    private static final short correlationId = -1; 

private static CreateTopicsResponse createTopic(CreateTopicsRequest request, String client) throws IllegalArgumentException, IOException { 
     String[] comp = client.split(":"); 
     if (comp.length != 2) { 
      throw new IllegalArgumentException("Wrong client directive"); 
     } 
     String address = comp[0]; 
     int port = Integer.parseInt(comp[1]); 

     RequestHeader header = new RequestHeader(apiKey, version, client, correlationId); 
     ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf()); 
     header.writeTo(buffer); 
     request.writeTo(buffer); 

     byte byteBuf[] = buffer.array(); 

     byte[] resp = requestAndReceive(byteBuf, address, port); 
     ByteBuffer respBuffer = ByteBuffer.wrap(resp); 
     ResponseHeader.parse(respBuffer); 

     return CreateTopicsResponse.parse(respBuffer); 
    } 

    private static byte[] requestAndReceive(byte[] buffer, String address, int port) throws IOException { 
     try(Socket socket = new Socket(address, port); 
      DataOutputStream dos = new DataOutputStream(socket.getOutputStream()); 
      DataInputStream dis = new DataInputStream(socket.getInputStream()) 
     ) { 
      dos.writeInt(buffer.length); 
      dos.write(buffer); 
      dos.flush(); 

      byte resp[] = new byte[dis.readInt()]; 
      dis.readFully(resp); 

      return resp; 
     } catch (IOException e) { 
      log.error(e); 
     } 

     return new byte[0]; 
    } 

요청을 보내고 응답에 바이트 스트림을 구문 분석하는 것 외에는 전혀 마법이 없습니다.

4CreateTopicsResponsekey 요청한 항목 이름 인 Map<String, Errors>이다 재산 errors을 가지고 있습니다. 까다로운 점은 요청한 모든 주제가 포함되어 있지만 오류가없는 것은 값이 Errors.None이므로 그 이유는 응답을 필터링하고 성공적으로 생성 된 주제 만 반환하기 때문입니다. 안드레이 Nechaev 확장

+0

카프카 0.10.2.0에'version = 1'을 사용하십시오 –

1

이 10.2.0로

응답, CreateTopicsRequest의 인스턴스를 얻을 수있는 방법이 조금 변경되었습니다. 우리는 CreateTopicsRequest 인스턴스를 빌드하기 위해 Builder 내부 클래스를 사용해야한다. 다음은 코드 샘플입니다.

CreateTopicsRequest.Builder builder = new CreateTopicsRequest.Builder(topicConfig, timeout, false); 
CreateTopicsRequest request = builder.build(); 
관련 문제