Skip to content
Snippets Groups Projects
Commit 09b49271 authored by Perry's avatar Perry
Browse files

Use cpu sharing to limit cpu resource. Map multiple containers per core.

parent 145d8570
No related branches found
No related tags found
No related merge requests found
......@@ -34,6 +34,7 @@ class Container(
containerName: Option[String],
val image: String,
network: String,
cpuShare: Int,
policy: Option[String],
val limits: ActionLimits = ActionLimits(),
env: Map[String, String] = Map(),
......@@ -48,7 +49,7 @@ class Container(
val name = containerName.getOrElse("anon")
val dockerhost = pool.dockerhost
val (containerId, containerHostAndPort) = bringup(containerName, image, network, env, args, limits, policy)
val (containerId, containerHostAndPort) = bringup(containerName, image, network, cpuShare, env, args, limits, policy)
def details: String = {
val name = containerName getOrElse "??"
......
......@@ -27,11 +27,14 @@ import scala.concurrent.duration._
import scala.annotation.tailrec
import akka.actor.ActorSystem
import whisk.common.Counter
import whisk.common.Logging
import whisk.common.TransactionId
import whisk.core.WhiskConfig
import whisk.core.WhiskConfig.dockerImageTag
import whisk.core.WhiskConfig.invokerContainerNetwork
import whisk.core.WhiskConfig.invokerContainerPolicy
import whisk.core.WhiskConfig.invokerCoreShare
import whisk.core.WhiskConfig.invokerNumCore
import whisk.core.WhiskConfig.selfDockerEndpoint
import whisk.core.entity.ActionLimits
import whisk.core.entity.MemoryLimit
......@@ -46,6 +49,7 @@ import akka.event.Logging.LogLevel
import akka.event.Logging.InfoLevel
import whisk.core.entity.BlackBoxExec
/**
* A thread-safe container pool that internalizes container creation/teardown and allows users
* to check out a container.
......@@ -63,11 +67,13 @@ class ContainerPool(
standalone: Boolean = false)(implicit actorSystem: ActorSystem)
extends ContainerUtils {
val dockerhost = config.selfDockerEndpoint
// These must be defined before verbosity is set
private val datastore = WhiskEntityStore.datastore(config)
private val authStore = WhiskAuthStore.datastore(config)
setVerbosity(verbosity)
val dockerhost = config.selfDockerEndpoint
// Eventually, we will have a more sophisticated warmup strategy that does multiple sizes
private val defaultMemoryLimit = MemoryLimit(MemoryLimit.STD_MEMORY)
......@@ -112,7 +118,7 @@ class ContainerPool(
def maxActive_=(value: Int): Unit = _maxActive = Math.max(0, value)
def resetMaxIdle() = _maxIdle = defaultMaxIdle
def resetMaxActive() = _maxActive = ContainerPool.defaultMaxActive
def resetMaxActive() = _maxActive = ContainerPool.getDefaultMaxActive(config)
def resetGCThreshold() = _gcThreshold = defaultGCThreshold
/*
......@@ -503,6 +509,7 @@ class ContainerPool(
private def makeGeneralContainer(key: ActionContainerId, containerName: String,
imageName: String, limits: ActionLimits, pull: Boolean)(implicit transid: TransactionId): WhiskContainer = {
val network = config.invokerContainerNetwork
val cpuShare = ContainerPool.cpuShare(config)
val policy = config.invokerContainerPolicy
val env = getContainerEnvironment()
// This will start up the container
......@@ -510,7 +517,7 @@ class ContainerPool(
ContainerUtils.pullImage(dockerhost, imageName)
}
runDockerOp {
new WhiskContainer(transid, this, key, containerName, imageName, network, policy, env, limits, isBlackbox = pull)
new WhiskContainer(transid, this, key, containerName, imageName, network, cpuShare, policy, env, limits, isBlackbox = pull)
}
}
......@@ -531,7 +538,7 @@ class ContainerPool(
private def makeContainer(key: ActionContainerId, imageName: String, args: Array[String])(implicit transid: TransactionId): FinalContainerResult = {
val con = runDockerOp {
new Container(transid, this, key, None, imageName,
config.invokerContainerNetwork, config.invokerContainerPolicy, ActionLimits(), Map(), args)
config.invokerContainerNetwork, ContainerPool.cpuShare(config), config.invokerContainerPolicy, ActionLimits(), Map(), args)
}
con.setVerbosity(getVerbosity())
Success(con, None)
......@@ -571,7 +578,7 @@ class ContainerPool(
val gcFrequency = 1000.milliseconds // this should not be leaked but a test needs this until GC count is implemented
private var _maxIdle = defaultMaxIdle
private var _maxActive = ContainerPool.defaultMaxActive
private var _maxActive = ContainerPool.getDefaultMaxActive(config)
private var _gcThreshold = defaultGCThreshold
private var gcOn = true
private val gcSync = new Object()
......@@ -685,9 +692,41 @@ class ContainerPool(
warn(this, "Shutdown hook completed.")
}
}
}
object ContainerPool {
def requiredProperties = Map(selfDockerEndpoint -> "localhost") ++ Map(dockerImageTag -> "latest") ++ Map(invokerContainerNetwork -> "bridge") ++ Map(invokerContainerPolicy -> "")
val defaultMaxActive = 4
/*
* These methods are parameterized on the configuration but defined here as an instance of ContainerPool is not
* always available from other call sites.
*/
object ContainerPool extends Logging {
def requiredProperties = Map(selfDockerEndpoint -> "localhost",
dockerImageTag -> "latest",
invokerContainerNetwork -> "bridge",
invokerNumCore -> "4",
invokerCoreShare -> "2",
invokerContainerPolicy -> "")
/*
* Extract parameters from whisk config. In the future, these may not be static but
* dynamically updated. They still serve as a starting point for downstream parameters.
*/
def numCore(config: WhiskConfig) = config.invokerNumCore.toInt
def shareFactor(config: WhiskConfig) = config.invokerCoreShare.toInt
/*
* The total number of containers is simply the number of cores dilated by the cpu sharing.
*/
def getDefaultMaxActive(config: WhiskConfig) = numCore(config) * shareFactor(config)
/* The shareFactor indicates the number of containers that would share a single core, on average.
* cpuShare is a docker option (-c) whereby a container's CPU access is limited.
* A value of 1024 is the full share so a strict resource division with a shareFactor of 2 would yield 512.
* On an idle/underloaded system, a container will still get to use underutilized CPU shares.
*/
private val totalShare = 1024.0 // This is a pre-defined value coming from docker and not our hard-coded value.
def cpuShare(config: WhiskConfig) = (totalShare / getDefaultMaxActive(config)).toInt
}
......@@ -49,8 +49,8 @@ trait ContainerUtils extends Logging {
* @param image the docker image to run
* @return container id and container host
*/
def bringup(name: Option[String], image: String, network: String, env: Map[String, String], args: Array[String], limits: ActionLimits, policy: Option[String])(implicit transid: TransactionId): (ContainerId, ContainerIP) = {
val id = makeContainer(name, image, network, env, args, limits, policy)
def bringup(name: Option[String], image: String, network: String, cpuShare:Int, env: Map[String, String], args: Array[String], limits: ActionLimits, policy: Option[String])(implicit transid: TransactionId): (ContainerId, ContainerIP) = {
val id = makeContainer(name, image, network, cpuShare, env, args, limits, policy)
val host = id.flatMap(_ => getContainerHostAndPort(name))
(id, host)
}
......@@ -63,8 +63,9 @@ trait ContainerUtils extends Logging {
/*
* TODO: The file handle and process limits should be moved to some global limits config.
*/
def makeContainer(name: Option[String], image: String, network: String, env: Map[String, String], args: Array[String], limits: ActionLimits, policy: Option[String])(implicit transid: TransactionId): ContainerId = {
def makeContainer(name: Option[String], image: String, network: String, cpuShare: Int, env: Map[String, String], args: Array[String], limits: ActionLimits, policy: Option[String])(implicit transid: TransactionId): ContainerId = {
val nameOption = name.map(n => Array("--name", n)).getOrElse(Array.empty[String])
val cpuArg = Array("-c", cpuShare.toString)
val memoryArg = Array("-m", s"${limits.memory()}m")
val capabilityArg = Array("--cap-drop", "NET_RAW", "--cap-drop", "NET_ADMIN")
val consulServiceIgnore = Array("-e", "SERVICE_IGNORE=true")
......@@ -72,7 +73,7 @@ trait ContainerUtils extends Logging {
val processLimit = Array("--ulimit", "nproc=512:512")
val securityOpts = policy map { p => Array("--security-opt", s"apparmor:${p}") } getOrElse (Array.empty[String])
val containerNetwork = Array("--net", network)
val cmd = Array("run") ++ makeEnvVars(env) ++ consulServiceIgnore ++ nameOption ++ memoryArg ++
val cmd = Array("run") ++ makeEnvVars(env) ++ consulServiceIgnore ++ nameOption ++ cpuArg ++ memoryArg ++
capabilityArg ++ fileHandleLimit ++ processLimit ++ securityOpts ++ containerNetwork ++ Array("-d", image) ++ args
runDockerCmd(cmd: _*)
}
......
......@@ -54,12 +54,13 @@ class WhiskContainer(
containerName: String,
image: String,
network: String,
cpuShare: Int,
policy: Option[String],
env: Map[String, String],
limits: ActionLimits,
args: Array[String] = Array(),
val isBlackbox: Boolean)
extends Container(originalId, pool, key, Some(containerName), image, network, policy, limits, env, args) {
extends Container(originalId, pool, key, Some(containerName), image, network, cpuShare, policy, limits, env, args) {
var boundParams = JsObject() // Mutable to support pre-alloc containers
var lastLogSize = 0L
......
......@@ -602,7 +602,7 @@ object InvokerService {
val topic = s"invoke${instance}"
val groupid = "invokers"
val maxdepth = ContainerPool.defaultMaxActive
val maxdepth = ContainerPool.getDefaultMaxActive(config)
val consumer = new KafkaConsumerConnector(config.kafkaHost, groupid, topic, maxdepth)
val dispatcher = new Dispatcher(verbosity, consumer, 500 milliseconds, 2 * maxdepth, system)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment