0

Kafka 및 Spark를 통해 Avro 메시지 스트림을 처리하는 동안 처리 된 데이터를 문서로 저장하여 ElasticSearch 색인에 저장합니다. 여기 코드입니다 (간체) :Spark에서 ElasticSearch에 데이터 저장

directKafkaStream.foreachRDD(rdd ->{ 

     rdd.foreach(avroRecord -> { 
      byte[] encodedAvroData = avroRecord._2; 
      MyType t = deserialize(encodedAvroData); 

    // Creating the ElasticSearch Transport client 
    Settings settings = Settings.builder() 
      .put("client.transport.ping_timeout", 5, TimeUnit.SECONDS).build(); 
    TransportClient client = new PreBuiltTransportClient(settings) 
      .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300)); 

    IndexRequest indexRequest = new IndexRequest("index", "item", id) 
      .source(jsonBuilder() 
        .startObject() 
        .field("name", name) 
        .field("timestamp", new Timestamp(System.currentTimeMillis())) 
        .endObject()); 

    UpdateRequest updateRequest = new UpdateRequest("index", "item", id) 
      .doc(jsonBuilder() 
        .startObject() 
        .field("name", name) 
        .field("timestamp", new Timestamp(System.currentTimeMillis())) 
        .endObject()) 
      .upsert(indexRequest); 

    client.update(updateRequest).get(); 

    client.close(); 

모든 기대 작품으로, 유일한 문제는 성능입니다. ES에 저장하는 데는 약간의 시간이 필요하며, 이는 각 RDD에 대한 ES 전송 클라이언트를 열거 나 닫아야한다는 사실에 기인한다고 생각합니다. Spark documentation은이 접근법이 매우 정확하다고 제안합니다. 이해하는 즉시 가능한 유일한 최적화는 rdd.foreachPartition을 사용하고 있지만 단 하나의 파티션 만 있기 때문에 이것이 도움이 될지 확신 할 수 없습니다. 성능을 향상시키는 다른 솔루션은 없습니까?

+0

- 당신이 elasticsearch - 하둡을 사용하지 않는 이유는 무엇입니까? –

답변

0

RDD 레코드를 처리 할 때마다 새로운 연결을 작성하기 때문에. 그래서, foreachPartition을 사용하면 하나의 파티션에만 관계없이 더 나은 성능을 낼 수 있다고 생각합니다. ES 연결 인스턴스를 외부로 가져오고 루프에서 다시 사용할 수 있기 때문입니다.

관련 문제