Skip to content
Snippets Groups Projects
Unverified Commit 6279b038 authored by Rajini Sivaram's avatar Rajini Sivaram Committed by GitHub
Browse files

KAFKA-8118; Ensure ZK clients are closed in tests, fix verification (#6456)

We verify that ZK clients are closed in tests since these can affect subsequent tests and that makes it hard to debug test failures. But because of changes to ZooKeeper client, we were checking the wrong thread name. The thread name used now is <creatorThreadName>-EventThread where creatorThreadName varies depending on the test. Fixed ZooKeeperTestHarness to check this format and fixed tests which were leaving ZK clients behind. Also added a test to make sure we can detect changes to the thread name when we update ZK clients in future.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Manikumar Reddy <manikumar.reddy@gmail.com>
parent 938580ff
No related branches found
No related tags found
No related merge requests found
......@@ -84,7 +84,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
}
object ZooKeeperTestHarness {
val ZkClientEventThreadPrefix = "ZkClient-EventThread"
val ZkClientEventThreadSuffix = "-EventThread"
// Threads which may cause transient failures in subsequent tests if not shutdown.
// These include threads which make connections to brokers and may cause issues
......@@ -94,7 +94,7 @@ object ZooKeeperTestHarness {
KafkaProducer.NETWORK_THREAD_PREFIX,
AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
ZkClientEventThreadPrefix)
ZkClientEventThreadSuffix)
/**
* Verify that a previous test that doesn't use ZooKeeperTestHarness hasn't left behind an unexpected thread.
......
......@@ -82,8 +82,16 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testConnection(): Unit = {
new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup",
"testMetricType").close()
val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup",
"testMetricType")
try {
// Verify ZooKeeper event thread name. This is used in ZooKeeperTestHarness to verify that tests have closed ZK clients
val threads = Thread.getAllStackTraces.keySet.asScala.map(_.getName)
assertTrue(s"ZooKeeperClient event thread not found, threads=$threads",
threads.exists(_.contains(ZooKeeperTestHarness.ZkClientEventThreadSuffix)))
} finally {
client.close()
}
}
@Test
......@@ -327,14 +335,15 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
}
}
val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
zooKeeperClient.close()
zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
"testMetricGroup", "testMetricType")
client.registerStateChangeHandler(stateChangeHandler)
zooKeeperClient.registerStateChangeHandler(stateChangeHandler)
val requestThread = new Thread() {
override def run(): Unit = {
try
client.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
finally
latch.countDown()
......@@ -343,7 +352,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
val reinitializeThread = new Thread() {
override def run(): Unit = {
client.forceReinitialize()
zooKeeperClient.forceReinitialize()
}
}
......@@ -375,12 +384,13 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
}
}
val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
zooKeeperClient.close()
zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
"testMetricGroup", "testMetricType")
client.registerStateChangeHandler(faultyHandler)
client.registerStateChangeHandler(goodHandler)
zooKeeperClient.registerStateChangeHandler(faultyHandler)
zooKeeperClient.registerStateChangeHandler(goodHandler)
client.forceReinitialize()
zooKeeperClient.forceReinitialize()
assertEquals(1, goodCalls.get)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment