Skip to content
Snippets Groups Projects
Commit 171ad4ce authored by Philippe Suter's avatar Philippe Suter Committed by Rodric Rabbah
Browse files

Test to confirm that CouchDbRestClient is resilient to transient TCP errors

The test introduces an actor that acts as a "configurable" HTTP proxy (i.e. can simulate failures).

Removing DbPing.scala as the new test subsumes it.
parent b0c5b45d
No related branches found
No related tags found
No related merge requests found
......@@ -19,61 +19,52 @@ package whisk.core.database.test
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.{ Try, Success, Failure }
import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.DurationDouble
import scala.util._
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.OverflowStrategy
import akka.stream.QueueOfferResult
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.actor.Props
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.unmarshalling._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json._
import spray.json.DefaultJsonProtocol._
import org.junit.runner.RunWith
import whisk.core.WhiskConfig
import whisk.core.WhiskConfig._
import whisk.core.database.CouchDbRestClient
import whisk.test.http.RESTProxy
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfter
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import whisk.core.WhiskConfig
import whisk.core.WhiskConfig._
@RunWith(classOf[JUnitRunner])
class DbPing extends FlatSpec
with BeforeAndAfter
class CouchDbRestClientTests extends FlatSpec
with Matchers
with BeforeAndAfterAll
with ScalaFutures
with Matchers {
with ScalaFutures {
override implicit val patienceConfig = PatienceConfig(timeout = 10.seconds, interval = 0.5.seconds)
implicit val actorSystem = ActorSystem()
implicit val actorSystem = ActorSystem()
implicit val executionContext = actorSystem.dispatcher
implicit val materializer = ActorMaterializer()
override implicit val patienceConfig = PatienceConfig(timeout = 10.seconds, interval = 0.5.seconds)
override def afterAll() {
println("Shutting down materializer")
materializer.shutdown()
println("Shutting down HTTP connections")
Await.result(akka.http.scaladsl.Http().shutdownAllConnectionPools(), Duration.Inf)
println("Shutting down actor system")
Await.ready(akka.http.scaladsl.Http().shutdownAllConnectionPools(), Duration.Inf)
actorSystem.terminate()
Await.result(actorSystem.whenTerminated, Duration.Inf)
}
lazy val config = new WhiskConfig(Map(
val config = new WhiskConfig(Map(
dbProvider -> null,
dbProtocol -> null,
dbUsername -> null,
......@@ -81,33 +72,69 @@ class DbPing extends FlatSpec
dbHost -> null,
dbPort -> null))
lazy val dbRootUri = Uri(
scheme = config.dbProtocol,
authority = Uri.Authority(
host = Uri.Host(config.dbHost),
port = config.dbPort.toInt),
path = Uri.Path("/")
val dbAuthority = Uri.Authority(
host = Uri.Host(config.dbHost),
port = config.dbPort.toInt
)
lazy val dbRootRequest = HttpRequest(
headers = List(
Authorization(BasicHttpCredentials(config.dbUsername, config.dbPassword)),
Accept(MediaTypes.`application/json`)),
uri = dbRootUri
)
val proxyPort = 15975
val proxyActor = actorSystem.actorOf(Props(new RESTProxy("0.0.0.0", proxyPort)(dbAuthority, config.dbProtocol == "https")))
behavior of "Database ping"
// the (fake) DB name doesn't matter as this test only exercises the "/" endpoint ("instance info")
val client = CouchDbRestClient.make("http", "localhost", proxyPort, config.dbUsername, config.dbPassword, "fakeDB")
it should "retrieve CouchDB/Cloudant DB info" in {
def checkResponse(response: Either[StatusCode,JsObject]) : Unit = response match {
case Right(obj) =>
assert(obj.fields.contains("couchdb"), "response object doesn't contain 'couchdb'")
case Left(code) =>
assert(false, s"unsuccessful response (code ${code.intValue})")
}
behavior of "CouchDbRestClient"
it should "successfully access the DB instance info" in {
assume(config.dbProvider == "Cloudant" || config.dbProvider == "CouchDB")
val f = client.instanceInfo()
whenReady(f) { e => checkResponse(e) }
}
val f: Future[JsObject] = Http().singleRequest(dbRootRequest).flatMap { response =>
Unmarshal(response.entity).to[JsObject]
it should "successfully access the DB despite transient connection failures" in {
// sprays the client with requests, makes sure they are all answered
// despite temporary connection failure.
val numRequests = 30
val timeSpan = 5.seconds
val delta = timeSpan / numRequests
val promises = Vector.fill(numRequests)(Promise[Either[StatusCode,JsObject]])
for(i <- 0 until numRequests) {
actorSystem.scheduler.scheduleOnce(delta * (i+1)) {
client.instanceInfo().andThen({ case r => promises(i).tryComplete(r) })
}
}
whenReady(f) { js =>
js.fields.get("couchdb") shouldBe defined
js.fields("couchdb").convertTo[String] should equal("Welcome")
// Mayhem! Havoc!
actorSystem.scheduler.scheduleOnce(2.5.seconds, proxyActor, RESTProxy.UnbindFor(1.second))
// What a type!
val futures: Vector[Future[Try[Either[StatusCode,JsObject]]]] =
promises.map(_.future.map(e => Success(e)).recover { case t: Throwable => Failure(t) })
whenReady(Future.sequence(futures), Timeout(timeSpan * 2)) { results =>
// We check that the first result was OK
// (i.e. the service worked before the disruption)
results.head.toOption shouldBe defined
checkResponse(results.head.get)
// We check that the last result was OK
// (i.e. the service worked again after the disruption)
results.last.toOption shouldBe defined
checkResponse(results.last.get)
// We check that there was at least one error
// (i.e. we did manage to unbind for a while)
results.find(_.isFailure) shouldBe defined
}
}
}
/*
* Copyright 2015-2016 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package whisk.test.http
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.FiniteDuration
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import akka.stream.ActorMaterializer
import akka.stream.Materializer
import akka.stream.scaladsl._
import akka.pattern.ask
import akka.pattern.pipe
import akka.util.Timeout
object RESTProxy {
// Orders the proxy to immediately unbind and rebind after the duration has passed.
case class UnbindFor(duration: FiniteDuration)
}
/** A simple REST proxy that can receive commands to change its behavior (e.g.
* simulate failures of the proxied service). Not for use in production.
*/
class RESTProxy(val host: String, val port: Int)(val serviceAuthority: Uri.Authority, val useHTTPS: Boolean) extends Actor with ActorLogging {
private implicit val actorSystem = context.system
private implicit val executionContex = actorSystem.dispatcher
private val destHost = serviceAuthority.host.address
private val destPort = serviceAuthority.port
// These change as connections come and go
private var materializer: Option[ActorMaterializer] = None
private var binding: Option[Http.ServerBinding] = None
// Public messages
import RESTProxy._
// Internal messages
private case object DoBind
private case object DoUnbind
private case class Request(request: HttpRequest)
// Route requests through messages to this actor, to serialize w.r.t events such as unbinding
private def mkRequestFlow(materializer: Materializer) : Flow[HttpRequest,HttpResponse,_] = {
implicit val m = materializer
Flow.apply[HttpRequest].mapAsync(4) { request =>
ask(self, Request(request))(timeout = Timeout(1.minute)).mapTo[HttpResponse]
}
}
private def bind(checkState: Boolean = true) : Unit = {
assert(!checkState || binding.isEmpty, "Proxy is already bound")
if(binding.isEmpty) {
assert(materializer.isEmpty)
implicit val m = ActorMaterializer()
materializer = Some(m)
log.debug(s"[RESTProxy] Binding to '$host:$port'.")
val b = Await.result(Http().bindAndHandle(mkRequestFlow(m), host, port), 5.seconds)
binding = Some(b)
}
}
private def unbind(checkState: Boolean = true) : Unit = {
assert(!checkState || binding.isDefined, "Proxy is not bound")
binding.foreach { b =>
log.debug(s"[RESTProxy] Unbinding from '${b.localAddress}'")
Await.result(b.unbind(), 5.seconds)
binding = None
assert(materializer.isDefined)
materializer.foreach { m =>
materializer = None
m.shutdown()
}
}
}
override def preStart() = bind()
override def postStop() = unbind(checkState = false)
override def receive = {
case UnbindFor(d) =>
self ! DoUnbind
actorSystem.scheduler.scheduleOnce(d, self, DoBind)
case DoUnbind =>
unbind(checkState = false)
case DoBind =>
bind(checkState = false)
case Request(request) =>
// If the actor isn't bound to the port / has no materializer,
// the request is simply dropped.
materializer.map { implicit m =>
log.debug(s"[RESTProxy] Proxying '${request.uri}' to '${serviceAuthority}'")
val flow = if(useHTTPS) {
Http().outgoingConnectionHttps(destHost, destPort)
} else {
Http().outgoingConnection(destHost, destPort)
}
// akka-http doesn't like us to set those headers ourselves.
val upstreamRequest = request.copy(
headers = request.headers.filter(_ match {
case `Timeout-Access`(_) => false
case _ => true
})
)
Source.single(upstreamRequest)
.via(flow)
.runWith(Sink.head)
.pipeTo(sender)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment