Kafka 및 Spark를 통해 Avro 메시지 스트림을 처리하는 동안 처리 된 데이터를 문서로 저장하여 ElasticSearch 색인에 저장합니다. 여기 코드입니다 (간체) :Spark에서 ElasticSearch에 데이터 저장
directKafkaStream.foreachRDD(rdd ->{
rdd.foreach(avroRecord -> {
byte[] encodedAvroData = avroRecord._2;
MyType t = deserialize(encodedAvroData);
// Creating the ElasticSearch Transport client
Settings settings = Settings.builder()
.put("client.transport.ping_timeout", 5, TimeUnit.SECONDS).build();
TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
IndexRequest indexRequest = new IndexRequest("index", "item", id)
.source(jsonBuilder()
.startObject()
.field("name", name)
.field("timestamp", new Timestamp(System.currentTimeMillis()))
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "item", id)
.doc(jsonBuilder()
.startObject()
.field("name", name)
.field("timestamp", new Timestamp(System.currentTimeMillis()))
.endObject())
.upsert(indexRequest);
client.update(updateRequest).get();
client.close();
모든 기대 작품으로, 유일한 문제는 성능입니다. ES에 저장하는 데는 약간의 시간이 필요하며, 이는 각 RDD에 대한 ES 전송 클라이언트를 열거 나 닫아야한다는 사실에 기인한다고 생각합니다. Spark documentation은이 접근법이 매우 정확하다고 제안합니다. 이해하는 즉시 가능한 유일한 최적화는 rdd.foreachPartition을 사용하고 있지만 단 하나의 파티션 만 있기 때문에 이것이 도움이 될지 확신 할 수 없습니다. 성능을 향상시키는 다른 솔루션은 없습니까?
- 당신이 elasticsearch - 하둡을 사용하지 않는 이유는 무엇입니까? –