2016-08-23 4 views
4

카프카를 사용하여 스트리밍하려는 수천 개의 파일이 매일 생성됩니다. 파일을 읽으려고하면 각 줄이 별도의 메시지로 간주됩니다.카프카를 사용하여 파일을 전송할 수 있습니까?

각 파일의 내용을 카프카 주제의 단일 메시지와 소비자가 카파 (Kafka) 주제에서 개별 파일로 작성하는 방법을 알고 싶습니다.

+0

카프카 커넥트에 대해 살펴 보셨습니까? http://docs.confluent.io/3.0.0/connect/index.html –

+0

예. 알고 있습니다. 여기 어떻게 사용할 수 있습니까? 시나리오는 파일을 읽을 때마다 각 행이 별도의 메시지로 간주되지만 각 파일은 긴 단일 메시지가되도록합니다. (파일에 30-40 줄이있을 수 있음) – Nahush

+0

Java 클라이언트, 콘솔 생산자, 기타를 사용하고 있습니까? –

답변

3

파일 처리를 위해 사용자 고유의 직렬/병렬 변환기를 작성할 수 있습니다. 예를 들어 :

생산자 소품 :

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer); 
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, YOUR_FILE_SERIALIZER_URI); 

소비자 소품 :

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); 
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, YOUR_FILE_DESERIALIZER_URI); 

시리얼

public class FileMapSerializer implements Serializer<Map<?,?>> { 

@Override 
public void close() { 

} 

@Override 
public void configure(Map configs, boolean isKey) { 
} 

@Override 
public byte[] serialize(String topic, Map data) { 
    ByteArrayOutputStream bos = new ByteArrayOutputStream(); 
    ObjectOutput out = null; 
    byte[] bytes = null; 
    try { 
     out = new ObjectOutputStream(bos); 
     out.writeObject(data); 
     bytes = bos.toByteArray(); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } finally { 
     try { 
      if (out != null) { 
       out.close(); 
      } 
     } catch (IOException ex) { 
      // ignore close exception 
     } 
     try { 
      bos.close(); 
     } catch (IOException ex) { 
      // ignore close exception 
     } 
    } 
    return bytes; 
} 
} 

디시리얼라이저

public class MapDeserializer implements Deserializer<Map> { 

@Override 
public void close() { 

} 

@Override 
public void configure(Map config, boolean isKey) { 

} 

@Override 
public Map deserialize(String topic, byte[] message) { 
    ByteArrayInputStream bis = new ByteArrayInputStream(message); 
    ObjectInput in = null; 
    try { 
     in = new ObjectInputStream(bis); 
     Object o = in.readObject(); 
     if (o instanceof Map) { 
      return (Map) o; 
     } else 
      return new HashMap<String, String>(); 
    } catch (ClassNotFoundException e) { 
     e.printStackTrace(); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } finally { 
     try { 
      bis.close(); 
     } catch (IOException ex) { 
     } 
     try { 
      if (in != null) { 
       in.close(); 
      } 
     } catch (IOException ex) { 
      // ignore close exception 
     } 
    } 
    return new HashMap<String, String>(); 
} 
} 

final Object kafkaMessage = new ProducerRecord<String, Map>((String) <TOPIC>,Integer.toString(messageId++), messageMap); 

messageMap 다음과 같은 형태로

작성 메시지는 키와 값으로 파일 내용으로 파일 이름을 포함합니다. 값은 직렬화 가능 객체 일 수 있습니다. 따라서 각 메시지에는 File_Name 대 FileContent 맵이있는 Map이 포함됩니다. 단일 값 또는 다중 값일 수 있습니다.

관련 문제