Skip to content
Snippets Groups Projects
Commit d4c74868 authored by Perry's avatar Perry Committed by Rodric Rabbah
Browse files

Add a ticket based queue to ensure fairness in container acquisition

parent 9a53a8fd
No related branches found
No related tags found
No related merge requests found
......@@ -123,7 +123,14 @@ class ContainerPool(
private val startingCounter = new Counter()
private var shuttingDown = false
/**
/*
* Tracks requests for getting containers.
* The first value doled out via nextPosition.next() will be 1 and completedPosition.cur remains at 0 until completion.
*/
private val nextPosition = new Counter()
private val completedPosition = new Counter()
/*
* Lists ALL containers at this docker point with "docker ps -a --no-trunc".
* This could include containers not in this pool at all.
*/
......@@ -144,18 +151,25 @@ class ContainerPool(
} else {
info(this, s"Getting container for ${action.fullyQualifiedName} with ${auth.uuid}")
val key = makeKey(action, auth)
getImpl(key, { () => makeWhiskContainer(action, auth) }) map {
case (c, initResult) =>
val cacheMsg = if (!initResult.isDefined) "(Cache Hit)" else "(Cache Miss)"
info(this, s"ContainerPool.getAction obtained container ${c.id} ${cacheMsg}")
(c.asInstanceOf[WhiskContainer], initResult)
try {
getImpl(nextPosition.next(), key, { () => makeWhiskContainer(action, auth) }) map {
case (c, initResult) =>
val cacheMsg = if (!initResult.isDefined) "(Cache Hit)" else "(Cache Miss)"
info(this, s"ContainerPool.getAction obtained container ${c.id} ${cacheMsg}")
(c.asInstanceOf[WhiskContainer], initResult)
}
} finally {
completedPosition.next()
}
}
/*
* For testing
*/
def getByImageName(imageName: String, args: Array[String])(implicit transid: TransactionId): Option[Container] = {
info(this, s"Getting container for image $imageName with args " + args.mkString(" "))
val key = makeKey(imageName, args)
getImpl(key, { () => makeContainer(imageName, args) }) map { _._1 }
getImpl(0, key, { () => makeContainer(imageName, args) }) map { _._1 }
}
/**
......@@ -163,8 +177,12 @@ class ContainerPool(
* This method will apply retry so that the caller is blocked until retry succeeds.
*/
@tailrec
final def getImpl(key: String, conMaker: () => ContainerResult)(implicit transid: TransactionId): Option[(Container, Option[RunResult])] = {
getOrMake(key, conMaker) match {
final def getImpl(position: Int, key: String, conMaker: () => ContainerResult)(implicit transid: TransactionId): Option[(Container, Option[RunResult])] = {
val positionInLine = position - completedPosition.cur // this will be 1 if at the front of the line
val available = slack()
if (positionInLine > available) { // e.g. if there is 1 available, then I wait if I am second in line (positionInLine = 2)
Thread.sleep(50) // TODO: replace with wait/notify but tricky to get right because of desire for maximal concurrency
} else getOrMake(key, conMaker) match {
case Success(con, initResult) =>
info(this, s"Obtained container ${con.containerId.getOrElse("unknown")}")
return Some(con, initResult)
......@@ -172,9 +190,9 @@ class ContainerPool(
error(this, s"Error starting container: $str")
return None
case Busy() =>
Thread.sleep(100)
getImpl(key, conMaker)
// This will not cause a busy loop because only those that could be productive will get a chance
}
getImpl(position, key, conMaker)
}
def getNumberOfIdleContainers(key: String)(implicit transid: TransactionId): Int = {
......@@ -183,8 +201,14 @@ class ContainerPool(
}
}
/**
* Tries to get or create a container, returning None if there are too many
/*
* How many containers can we start? Someone could have fully started a container so we must include startingCounter.
* The use of a method rather than a getter is meant to signify the synchronization in the implementation.
*/
private def slack() = _maxActive - (activeCount() + startingCounter.cur)
/*
* Try to get or create a container, returning None if there are too many
* active containers.
*
* The multiple synchronization block, and the use of startingCounter,
......@@ -198,7 +222,7 @@ class ContainerPool(
retrieve(key) match {
case CacheMiss() => {
this.synchronized {
if (activeCount() + startingCounter.cur >= _maxActive) // Someone could have fully started a container
if (slack() <= 0)
return Busy()
if (startingCounter.cur >= 1) // Limit concurrent starting of containers
return Busy()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment