4
카프카를 사용하여 스트리밍하려는 수천 개의 파일이 매일 생성됩니다. 파일을 읽으려고하면 각 줄이 별도의 메시지로 간주됩니다.카프카를 사용하여 파일을 전송할 수 있습니까?
각 파일의 내용을 카프카 주제의 단일 메시지와 소비자가 카파 (Kafka) 주제에서 개별 파일로 작성하는 방법을 알고 싶습니다.
카프카를 사용하여 스트리밍하려는 수천 개의 파일이 매일 생성됩니다. 파일을 읽으려고하면 각 줄이 별도의 메시지로 간주됩니다.카프카를 사용하여 파일을 전송할 수 있습니까?
각 파일의 내용을 카프카 주제의 단일 메시지와 소비자가 카파 (Kafka) 주제에서 개별 파일로 작성하는 방법을 알고 싶습니다.
파일 처리를 위해 사용자 고유의 직렬/병렬 변환기를 작성할 수 있습니다. 예를 들어 :
생산자 소품 :
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, YOUR_FILE_SERIALIZER_URI);
소비자 소품 :
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, YOUR_FILE_DESERIALIZER_URI);
시리얼
public class FileMapSerializer implements Serializer<Map<?,?>> {
@Override
public void close() {
}
@Override
public void configure(Map configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, Map data) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = null;
byte[] bytes = null;
try {
out = new ObjectOutputStream(bos);
out.writeObject(data);
bytes = bos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (out != null) {
out.close();
}
} catch (IOException ex) {
// ignore close exception
}
try {
bos.close();
} catch (IOException ex) {
// ignore close exception
}
}
return bytes;
}
}
디시리얼라이저
public class MapDeserializer implements Deserializer<Map> {
@Override
public void close() {
}
@Override
public void configure(Map config, boolean isKey) {
}
@Override
public Map deserialize(String topic, byte[] message) {
ByteArrayInputStream bis = new ByteArrayInputStream(message);
ObjectInput in = null;
try {
in = new ObjectInputStream(bis);
Object o = in.readObject();
if (o instanceof Map) {
return (Map) o;
} else
return new HashMap<String, String>();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
bis.close();
} catch (IOException ex) {
}
try {
if (in != null) {
in.close();
}
} catch (IOException ex) {
// ignore close exception
}
}
return new HashMap<String, String>();
}
}
final Object kafkaMessage = new ProducerRecord<String, Map>((String) <TOPIC>,Integer.toString(messageId++), messageMap);
messageMap 다음과 같은 형태로
작성 메시지는 키와 값으로 파일 내용으로 파일 이름을 포함합니다. 값은 직렬화 가능 객체 일 수 있습니다. 따라서 각 메시지에는 File_Name 대 FileContent 맵이있는 Map이 포함됩니다. 단일 값 또는 다중 값일 수 있습니다.
카프카 커넥트에 대해 살펴 보셨습니까? http://docs.confluent.io/3.0.0/connect/index.html –
예. 알고 있습니다. 여기 어떻게 사용할 수 있습니까? 시나리오는 파일을 읽을 때마다 각 행이 별도의 메시지로 간주되지만 각 파일은 긴 단일 메시지가되도록합니다. (파일에 30-40 줄이있을 수 있음) – Nahush
Java 클라이언트, 콘솔 생산자, 기타를 사용하고 있습니까? –