Skip to content
Snippets Groups Projects
Commit f6f8da70 authored by huxihx's avatar huxihx Committed by Manikumar Reddy
Browse files

KAFKA-8098: Fix Flaky Test testConsumerGroups

- The flaky failure is caused by the fact that the main thread sometimes issues DescribeConsumerGroup request before the consumer assignment takes effect. Added a latch to make sure such situation is not going to happen.

Author: huxihx <huxi_2b@hotmail.com>
Author: huxi <huxi_2b@hotmail.com>
Author: Manikumar Reddy <manikumar.reddy@gmail.com>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>

Closes #6441 from huxihx/KAFKA-8098
parent fa57eb06
No related branches found
No related tags found
No related merge requests found
......@@ -19,7 +19,7 @@ package kafka.api
import java.{time, util}
import java.util.{Collections, Properties}
import java.util.Arrays.asList
import java.util.concurrent.{ExecutionException, TimeUnit}
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
import java.io.File
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
......@@ -52,6 +52,7 @@ import kafka.zk.KafkaZkClient
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import java.lang.{Long => JLong}
import java.time.{Duration => JDuration}
import kafka.security.auth.{Cluster, Group, Topic}
......@@ -1158,19 +1159,28 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
val consumer = createConsumer(configOverrides = newConsumerConfig)
val latch = new CountDownLatch(1)
try {
// Start a consumer in a thread that will subscribe to a new group.
val consumerThread = new Thread {
override def run {
consumer.subscribe(Collections.singleton(testTopicName))
while (true) {
consumer.poll(time.Duration.ofSeconds(5L))
consumer.commitSync()
try {
while (true) {
consumer.poll(JDuration.ofSeconds(5))
if (!consumer.assignment.isEmpty && latch.getCount > 0L)
latch.countDown()
consumer.commitSync()
}
} catch {
case _: InterruptException => // Suppress the output to stderr
}
}
}
try {
consumerThread.start
assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
// Test that we can list the new group.
TestUtils.waitUntilTrue(() => {
val matching = client.listConsumerGroups.all.get().asScala.filter(_.groupId == testGroupId)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment