Skip to content
Snippets Groups Projects
Commit eb3335ef authored by Jonathan Santilli's avatar Jonathan Santilli Committed by Jun Rao
Browse files

KAFKA-7165: Retry the BrokerInfo registration into ZooKeeper (#5575)

* Add logic to retry the BrokerInfo registration into ZooKeeper

In case the ZooKeeper session has been regenerated and the broker
tries to register the BrokerInfo into Zookeeper, this code deletes
the current BrokerInfo from Zookeeper and creates it again, just if
the znode ephemeral owner belongs to the Broker which tries to register
himself again into ZooKeeper

* Add test to validate the BrokerInfo re-registration into ZooKeeper
parent 29383d6d
No related branches found
No related tags found
No related merge requests found
......@@ -53,7 +53,7 @@ import scala.collection.JavaConverters._
* easier to quickly migrate away from `ZkUtils`. We should revisit this once the migration is completed and tests are
* in place. We should also consider whether a monolithic [[kafka.zk.ZkData]] is the way to go.
*/
class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time) extends AutoCloseable with
class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time) extends AutoCloseable with
Logging with KafkaMetricsGroup {
override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
......@@ -67,6 +67,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
// Only for testing
private[kafka] def currentZooKeeper: ZooKeeper = zooKeeperClient.currentZooKeeper
// This variable holds the Zookeeper session id at the moment a Broker gets registered in Zookeeper and the subsequent
// updates of the session id. It is possible that the session id changes over the time for 'Session expired'.
// This code is part of the work around done in the KAFKA-7165, once ZOOKEEPER-2985 is complete, this code must
// be deleted.
private var currentZooKeeperSessionId: Long = -1
/**
* Create a sequential persistent path. That is, the znode will not be automatically deleted upon client's disconnect
* and a monotonically increasing number will be appended to its name.
......@@ -1585,7 +1591,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
private def acls(path: String): Seq[ACL] = ZkData.defaultAcls(isSecure, path)
private def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = {
private[zk] def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = {
retryRequestsUntilConnected(Seq(request)).head
}
......@@ -1631,26 +1637,94 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
throw KeeperException.create(code)
}
private def isZKSessionIdDiffFromCurrentZKSessionId(): Boolean = {
zooKeeperClient.sessionId != currentZooKeeperSessionId
}
private def isZKSessionTheEphemeralOwner(ephemeralOwnerId: Long): Boolean = {
ephemeralOwnerId == currentZooKeeperSessionId
}
private[zk] def shouldReCreateEphemeralZNode(ephemeralOwnerId: Long): Boolean = {
isZKSessionTheEphemeralOwner(ephemeralOwnerId) && isZKSessionIdDiffFromCurrentZKSessionId()
}
private def updateCurrentZKSessionId(newSessionId: Long): Unit = {
currentZooKeeperSessionId = newSessionId
}
private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging {
def create(): Code = {
val createRequest = CreateRequest(path, data, acls(path), CreateMode.EPHEMERAL)
val createResponse = retryRequestUntilConnected(createRequest)
createResponse.resultCode match {
case code@ Code.OK => code
case Code.NODEEXISTS => getAfterNodeExists()
val createResultCode = createResponse.resultCode match {
case code@ Code.OK =>
code
case Code.NODEEXISTS =>
getAfterNodeExists()
case code =>
error(s"Error while creating ephemeral at $path with return code: $code")
code
}
if (createResultCode == Code.OK) {
// At this point, we need to save a reference to the zookeeper session id.
// This is done here since the Zookeeper session id may not be available at the Object creation time.
// This is assuming the 'retryRequestUntilConnected' method got connected and a valid session id is present.
// This code is part of the workaround done in the KAFKA-7165, once ZOOKEEPER-2985 is complete, this code
// must be deleted.
updateCurrentZKSessionId(zooKeeperClient.sessionId)
}
createResultCode
}
// This method is part of the work around done in the KAFKA-7165, once ZOOKEEPER-2985 is complete, this code must
// be deleted.
private def delete(): Code = {
val deleteRequest = DeleteRequest(path, ZkVersion.MatchAnyVersion)
val deleteResponse = retryRequestUntilConnected(deleteRequest)
deleteResponse.resultCode match {
case code@ Code.OK => code
case code@ Code.NONODE => code
case code =>
error(s"Error while deleting ephemeral node at $path with return code: $code")
code
}
}
private def reCreate(): Code = {
val codeAfterDelete = delete()
var codeAfterReCreate = codeAfterDelete
debug(s"Result of znode ephemeral deletion at $path is: $codeAfterDelete")
if (codeAfterDelete == Code.OK || codeAfterDelete == Code.NONODE) {
codeAfterReCreate = create()
debug(s"Result of znode ephemeral re-creation at $path is: $codeAfterReCreate")
}
codeAfterReCreate
}
private def getAfterNodeExists(): Code = {
val getDataRequest = GetDataRequest(path)
val getDataResponse = retryRequestUntilConnected(getDataRequest)
val ephemeralOwnerId = getDataResponse.stat.getEphemeralOwner
getDataResponse.resultCode match {
case Code.OK if getDataResponse.stat.getEphemeralOwner != zooKeeperClient.sessionId =>
// At this point, the Zookeeper session could be different (due a 'Session expired') from the one that initially
// registered the Broker into the Zookeeper ephemeral node, but the znode is still present in ZooKeeper.
// The expected behaviour is that Zookeeper server removes the ephemeral node associated with the expired session
// but due an already reported bug in Zookeeper (ZOOKEEPER-2985) this is not happening, so, the following check
// will validate if this Broker got registered with the previous (expired) session and try to register again,
// deleting the ephemeral node and creating it again.
// This code is part of the work around done in the KAFKA-7165, once ZOOKEEPER-2985 is complete, this code must
// be deleted.
case Code.OK if shouldReCreateEphemeralZNode(ephemeralOwnerId) =>
info(s"Was not possible to create the ephemeral at $path, node already exists and owner " +
s"'$ephemeralOwnerId' does not match current session '${zooKeeperClient.sessionId}'" +
s", trying to delete and re-create it with the newest Zookeeper session")
reCreate()
case Code.OK if ephemeralOwnerId != zooKeeperClient.sessionId =>
error(s"Error while creating ephemeral at $path, node already exists and owner " +
s"'${getDataResponse.stat.getEphemeralOwner}' does not match current session '${zooKeeperClient.sessionId}'")
s"'$ephemeralOwnerId' does not match current session '${zooKeeperClient.sessionId}'")
Code.NODEEXISTS
case code@ Code.OK => code
case Code.NONODE =>
......
......@@ -59,6 +59,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
val controllerEpochZkVersion = 0
var otherZkClient: KafkaZkClient = _
var expiredSessionZkClient: ExpiredKafkaZkClient = _
@Before
override def setUp(): Unit = {
......@@ -66,6 +67,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
zkClient.createControllerEpochRaw(1)
otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
expiredSessionZkClient = ExpiredKafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled),
zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
}
@After
......@@ -73,6 +76,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
if (otherZkClient != null)
otherZkClient.close()
zkClient.deletePath(ControllerEpochZNode.path)
if (expiredSessionZkClient != null)
expiredSessionZkClient.close()
super.tearDown()
}
......@@ -683,6 +688,31 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
}
@Test
def testRetryRegisterBrokerInfo(): Unit = {
val brokerId = 5
val brokerPort = 9999
val brokerHost = "test.host"
val expiredBrokerInfo = createBrokerInfo(brokerId, brokerHost, brokerPort, SecurityProtocol.PLAINTEXT)
expiredSessionZkClient.createTopLevelPaths()
// Register the broker, for the first time
expiredSessionZkClient.registerBroker(expiredBrokerInfo)
assertEquals(Some(expiredBrokerInfo.broker), expiredSessionZkClient.getBroker(brokerId))
val originalCzxid = expiredSessionZkClient.getPathCzxid(BrokerIdZNode.path(brokerId))
// Here, the node exists already, when trying to register under a different session id,
// the node will be deleted and created again using the new session id.
expiredSessionZkClient.registerBroker(expiredBrokerInfo)
// The broker info should be the same, no error should be raised
assertEquals(Some(expiredBrokerInfo.broker), expiredSessionZkClient.getBroker(brokerId))
val newCzxid = expiredSessionZkClient.getPathCzxid(BrokerIdZNode.path(brokerId))
assertNotEquals("The Czxid of original ephemeral znode should be different " +
"from the new ephemeral znode Czxid", originalCzxid, newCzxid)
}
@Test
def testGetBrokerMethods(): Unit = {
zkClient.createTopLevelPaths()
......@@ -1112,4 +1142,34 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertEquals(expectedConsumerGroupOffsetsPath, actualConsumerGroupOffsetsPath)
}
class ExpiredKafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time)
extends KafkaZkClient(zooKeeperClient, isSecure, time) {
// Overwriting this method from the parent class to force the client to re-register the Broker.
override def shouldReCreateEphemeralZNode(ephemeralOwnerId: Long): Boolean = {
true
}
def getPathCzxid(path: String): Long = {
val getDataRequest = GetDataRequest(path)
val getDataResponse = retryRequestUntilConnected(getDataRequest)
getDataResponse.stat.getCzxid
}
}
private object ExpiredKafkaZkClient {
def apply(connectString: String,
isSecure: Boolean,
sessionTimeoutMs: Int,
connectionTimeoutMs: Int,
maxInFlightRequests: Int,
time: Time,
metricGroup: String = "kafka.server",
metricType: String = "SessionExpireListener") = {
val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests,
time, metricGroup, metricType)
new ExpiredKafkaZkClient(zooKeeperClient, isSecure, time)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment