Skip to content
Snippets Groups Projects
Commit 1deb072f authored by Radai Rosenblatt's avatar Radai Rosenblatt Committed by Ismael Juma
Browse files

MINOR: Avoid unnecessary collection copy in MetadataCache (#6397)

`map` was being used to convert `Iterable[Integer]` to `Iterable[Int`]. That
operation represented 11% of total CPU time measured under load for us.
We also expect a positive impact on GC.

Reviewers: Joel Koshy <jjkoshy@gmail.com>, Ismael Juma <ismael@juma.me.uk>
parent 1f692bdf
No related branches found
No related tags found
No related merge requests found
......@@ -51,9 +51,10 @@ class MetadataCache(brokerId: Int) extends Logging {
private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None)
// This method is the main hotspot when it comes to the performance of metadata requests,
// we should be careful about adding additional logic here.
// we should be careful about adding additional logic here. Relatedly, `brokers` is
// `Iterable[Integer]` instead of `Iterable[Int]` to avoid a collection copy.
// filterUnavailableEndpoints exists to support v0 MetadataResponses
private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[Int], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = {
private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[java.lang.Integer], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = {
val result = new mutable.ArrayBuffer[Node](math.min(snapshot.aliveBrokers.size, brokers.size))
brokers.foreach { brokerId =>
val endpoint = getAliveEndpoint(snapshot, brokerId, listenerName) match {
......@@ -76,9 +77,9 @@ class MetadataCache(brokerId: Int) extends Logging {
val leaderBrokerId = partitionState.basePartitionState.leader
val leaderEpoch = partitionState.basePartitionState.leaderEpoch
val maybeLeader = getAliveEndpoint(snapshot, leaderBrokerId, listenerName)
val replicas = partitionState.basePartitionState.replicas.asScala.map(_.toInt)
val replicas = partitionState.basePartitionState.replicas.asScala
val replicaInfo = getEndpoints(snapshot, replicas, listenerName, errorUnavailableEndpoints)
val offlineReplicaInfo = getEndpoints(snapshot, partitionState.offlineReplicas.asScala.map(_.toInt), listenerName, errorUnavailableEndpoints)
val offlineReplicaInfo = getEndpoints(snapshot, partitionState.offlineReplicas.asScala, listenerName, errorUnavailableEndpoints)
maybeLeader match {
case None =>
......@@ -94,7 +95,7 @@ class MetadataCache(brokerId: Int) extends Logging {
offlineReplicaInfo.asJava)
case Some(leader) =>
val isr = partitionState.basePartitionState.isr.asScala.map(_.toInt)
val isr = partitionState.basePartitionState.isr.asScala
val isrInfo = getEndpoints(snapshot, isr, listenerName, errorUnavailableEndpoints)
if (replicaInfo.size < replicas.size) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment