Skip to content
Snippets Groups Projects
Commit 29383d6d authored by Jason Gustafson's avatar Jason Gustafson Committed by Rajini Sivaram
Browse files

KAFKA-7604; Fix flaky unit test `testRebalanceAfterTopicUnavailableWithPatternSubscribe` (#5889)

The problem is the concurrent metadata updates in the foreground and in the heartbeat thread. Changed the code to use ConsumerNetworkClient.poll, which enforces mutual exclusion when accessing the underlying client.
parent 8abbf33b
No related branches found
No related tags found
No related merge requests found
......@@ -558,7 +558,7 @@ public class ConsumerCoordinatorTest {
// Refresh the metadata again. Since there have been no changes since the last refresh, it won't trigger
// rebalance again.
metadata.requestUpdate();
client.poll(Long.MAX_VALUE, time.milliseconds());
consumerClient.poll(time.timer(Long.MAX_VALUE));
assertFalse(coordinator.rejoinNeededOrPending());
}
......@@ -1010,13 +1010,13 @@ public class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
partitionAssignor.prepare(Collections.<String, List<TopicPartition>>emptyMap());
partitionAssignor.prepare(Collections.emptyMap());
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.NONE));
client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(Collections.<TopicPartition>emptySet(), rebalanceListener.assigned);
assertEquals(Collections.emptySet(), rebalanceListener.assigned);
assertTrue("Metadata refresh not requested for unavailable partitions", metadata.updateRequested());
Map<String, Errors> topicErrors = new HashMap<>();
......@@ -1026,7 +1026,7 @@ public class ConsumerCoordinatorTest {
client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
topicErrors, singletonMap(topic1, 1)));
client.poll(0, time.milliseconds());
consumerClient.poll(time.timer(0));
client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE));
client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment