2016-08-09 3 views
3

제 3 자 라이브러리를 사용하지 않고 카프카 응용 프로그램에서 단위 테스트를 수행해야합니다.단위 테스트를위한 카프카 항목 지우기

지금 당장 내 문제는 테스트 사이에 모든 주제를 정리하고 싶지만 어떻게해야할지 모르겠다는 것입니다.

이것은 내 임시 솔루션입니다. 각 테스트 후에 생성 된 모든 메시지를 커밋하고 모든 테스트 소비자를 동일한 소비자 그룹에 포함시킵니다.

override protected def afterEach():Unit={ 
    val cleanerConsumer= newConsumer(Seq.empty) 
    val topics=cleanerConsumer.listTopics() 
    println("pulisco") 
    cleanerConsumer.subscribe(topics.keySet()) 
    cleanerConsumer.poll(100) 
    cleanerConsumer.commitSync() 
    cleanerConsumer.close() 
} 

이것은 작동하지 않으며 이유를 알지 못합니다.

예를 들어 테스트 중에 새로운 소비자를 만들 때 messages에는 이전 테스트에서 생성 된 메시지가 포함됩니다.

val consumerProbe = newConsumer(SMSGatewayTopic) 

val messages = consumerProbe.poll(1000) 

어떻게 해결할 수 있습니까?

+0

카프카가 지속성 메시지 저장소이며 소비자가가 소비를 시작 오프셋을 결정할 수 있습니다. 마지막으로 오프셋 한 것을 기억하고 그 후에 소비하기 시작하면됩니다. –

+0

예를 들어 다음과 같이 내장 브로커를 사용해보십시오. https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java –

+0

왜이 질문은 Java로 태그가 붙었습니까? 결과적으로 이는 매우 자바에 특정한 검색에서 나타난다. 별로 도움이되지 않습니다. – user1283068

답변

1

또한 Kafka/Zookeeper 인스턴스를 테스트 소스에 포함시켜 격리 된 서비스를보다 잘 제어 할 수 있습니다.

trait Kafka { self: ZooKeeper => 
    Kafka.start() 
} 

object Kafka { 
    import org.apache.hadoop.fs.FileUtil 
    import kafka.server.KafkaServer 

    @volatile private var started = false 

    lazy val logDir = java.nio.file.Files.createTempDirectory("kafka-log").toFile 

    lazy val kafkaServer: KafkaServer = { 
    val config = com.typesafe.config.ConfigFactory. 
     load(this.getClass.getClassLoader) 

    val (host, port) = { 
     val (h, p) = config.getString("kafka.servers").span(_ != ':') 
     h -> p.drop(1).toInt 
    } 

    val serverConf = new kafka.server.KafkaConfig({ 
     val props = new java.util.Properties() 
     props.put("port", port.toString) 
     props.put("broker.id", port.toString) 
     props.put("log.dir", logDir.getAbsolutePath) 

     props.put(
     "zookeeper.connect", 
     s"localhost:${config getInt "test.zookeeper.port"}" 
    ) 

     props 
    }) 

    new KafkaServer(serverConf) 
    } 

    def start(): Unit = if (!started) { 
    try { 
     kafkaServer.startup() 
     started = true 
    } catch { 
     case err: Throwable => 
     println(s"fails to start Kafka: ${err.getMessage}") 
     throw err 
    } 
    } 

    def stop(): Unit = try { 
    if (started) kafkaServer.shutdown() 
    } finally { 
    FileUtil.fullyDelete(logDir) 
    } 
} 

trait ZooKeeper { 
    ZooKeeper.start() 
} 

object ZooKeeper { 
    import java.nio.file.Files 
    import java.net.InetSocketAddress 
    import org.apache.hadoop.fs.FileUtil 
    import org.apache.zookeeper.server.ZooKeeperServer 
    import org.apache.zookeeper.server.ServerCnxnFactory 

    @volatile private var started = false 
    lazy val logDir = Files.createTempDirectory("zk-log").toFile 
    lazy val snapshotDir = Files.createTempDirectory("zk-snapshots").toFile 

    lazy val (zkServer, zkFactory) = { 
    val srv = new ZooKeeperServer(
     snapshotDir, logDir, 500 
    ) 

    val config = com.typesafe.config.ConfigFactory. 
     load(this.getClass.getClassLoader) 
    val port = config.getInt("test.zookeeper.port") 

    srv -> ServerCnxnFactory.createFactory(
     new InetSocketAddress("localhost", port), 1024 
    ) 
    } 

    def start(): Unit = if (!zkServer.isRunning) { 
    try { 
     zkFactory.startup(zkServer) 

     started = true 

     while (!zkServer.isRunning) { 
     Thread.sleep(500) 
     } 
    } catch { 
     case err: Throwable => 
     println(s"fails to start ZooKeeper: ${err.getMessage}") 
     throw err 
    } 
    } 

    def stop(): Unit = try { 
    if (started) zkFactory.shutdown() 
    } finally { 
    try { FileUtil.fullyDelete(logDir) } catch { case _: Throwable =>() } 
    FileUtil.fullyDelete(snapshotDir) 
    } 
} 

테스트 클래스는 이것을 확인하기 위해 extends Kafka with ZooKeeper 수 있습니다. SBT testOptions in Test 설정에서

테스트 JVM이 갈래되지 않은 경우

, Tests.Cleanup은 테스트 후 내장 된 서비스를 중지 할 수 있습니다.

1

테스트하기 전에 모든 주제를 다시 작성하는 것이 좋습니다. 예를 들어,이 카프카의 테스트 항목을 생성/삭제 방법 :

Kafka repository on GitHub

관련 문제