Coffee. On the rocks!

REST API with Scala, Play, Slick, PostgreSQL, Redis and AWS S3, Part 2

Mon 01 December 2014

In the last post i talked about using slick with play. In this post i am gonna share how i use Redis and S3. The library i use for Redis is rediscala. To use this add this "com.etaty.rediscala" %% "rediscala" % "1.4.0" to your build.sbt. For using S3 i use awscala. To use it add this "com.github.seratch" %% "awscala" % "0.4.+" to your build.sbt.

I am using Redis for a few use cases. The first is authentication which happens on every call to the API, so i dont hit postgres for auth but do it from Redis. Second use case is maintaing hourly stats. Third is some kind of cache for specific stuff. For every use case i have a separate Redis instance. Right now they are just separate processes running on different ports on the same machine but in future if i find that any particular instance is being crushed by the load, i can scale it independently from other instances. I wouldn't use different dbs in a single Redis instance because Redis is single threaded, so writes on one db would block writes to the other db. The idea here is to split things to lowest common denominator without going insane. This is how my application.conf looks for Redis instances

redis = {
  auth: {
    host: "localhost",
    port: 6379,
    db: 0
  },
  stats: {
    host: "localhost",
    port: 6380,
    db: 0
  },
  do_i_need_it: {
    host: "localhost",
    port: 6381,
    db: 0
  }
}

I wrote a trait which each of my wrapper extends.

import redis.RedisClient
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import play.api.Play.current

trait RedisWrapper {
  implicit val akkaSystem = akka.actor.ActorSystem()

  def host: String
  def port: Int
  def db: Option[Int]
  val timeout = 15 seconds      // this is a timeout duration, comes from scala.concurrent.duration

  var redis:RedisClient = null

  def connect() = {
    redis = RedisClient(host=host,port=port,db=db)
  }

  def stop() {
    akkaSystem.shutdown()
  }
}

Initially each wrapper has its redis connection set to null until its start method is called. Now this is how a wrapper would look like. Here is the Auth wrapper that is used to query the redis instance having auth details

object RedisAuth extends RedisWrapper {
  val host = current.configuration.getString("redis.auth.host").get
  val port = current.configuration.getInt("redis.auth.port").get
  val db = current.configuration.getInt("redis.auth.db")
  def createAuth(key: String, secret: String, userId: Int) {
    redis.hmset(key, Map("secret"-> secret, "user_id"-> userId.toString))
  }

  def verifyAuth(key: String, secret: String): Option[Int] = {
    val resp = redis.hgetall(key)
    Await.result(resp, timeout) match {
      case m: Map[String,akka.util.ByteString] if m("secret").utf8String == secret =>
        Some(m("user_id").utf8String.toInt)
      case _ =>
        None
    }
  }

  def getUserIdFromKey(key: String): Option[Int] = {
    val resp = redis.hget(key, "user_id")
    Await.result(resp, timeout) match {
      case Some(u: akka.util.ByteString) => Some(u.utf8String.toInt)
      case _ => None
    }
  }

  def keyExists(key: String): Boolean = {
    val resp = redis.exists(key)
    Await.result(resp, timeout)
  }
}


// more wrapper objects like above

object BootRedis {
  val RedisDBList = List(RedisAuth, RedisStats, RedisCachedItems)
}

Now each wrapper's connect has to be called to initialize the connection. That code goes in Global.scala.

object Global extends GlobalSettings {
  override def onStart(app: Application) {
    RedisDBList.foreach(r => r.connect())
  }

  override def onStop(app: Application) {
    RedisDBList.foreach(r => r.stop())
  }
}

When the play app starts onStart method of object Global is called and each wrapper object's connect is called which has the redis connection set to the appropriate redis server and when the app stops onStop method of object Global is called which shutsdown the ActorSystem by calling stop of each wrapper. I see that i call shutdown on the ActorSystem 3 times where i should just be doing it once but this hasn't caused any issues yet and moreover i dont know how akka works so i am not sure how could i handle this. I'll try to figure something out.

Now for S3. I couldn't get the play-s3 plugin to work with play so i chose awscala. Its pretty simple to use. In the example below i store a file(java.io.File) in Singapore region. This is what i do

import awscala.Region
import awscala.s3._

implicit val region = Region.Singapore          // omit this if you want to store in the default AWS Region
implicit val s3 = S3(config("access_key"), config("secret"))

def uploadFile(file: File, args: Map[String, String]=Map()): Option[URL] = {
  val bucketName = config("bucket_name")
  s3.bucket(bucketName) match {
    case Some(bucket) =>
      val fileName = randomString(50) + "-" + args("fileName")          // i need some random string because i dont want files with similar name overwriting each other
      bucket.putObjectAsPublicRead(fileName, file)
      Some(new URL("https://" + bucketName + ".s3.amazonaws.com/" +fileName))
    case _ =>
      Logger.error(s"No bucket by name $bucketName exists")
      None
      // Or i could create a bucket here with s3.createBucket("my-unique-bucket-name") and then store the object.
  }
}

I was wondereing how would i do async tasks with Play because when i wrote APIs in python it was pretty simple, i could just add tasks to a celery queue and forget about it. In play i didnt even need to plugin something extra like celery. I can use scala futures to do fire and forget asynchronous tasks. Lets say a user updates his settings. Here i dont need to notify user that settings has been updated. If the update fails then there is something wrong with my db or the update code which i can fix after i log the exception.

import play.api.libs.concurrent.Execution.Implicits.defaultContext

val settings = Json.parse(validForm.settings).as[Map[String, String]]

Future {
  models.UserModel.updateSettings(userId, settings)
}.recover { case e: Exception => Logger.error("Error updating settings: " + e.getMessage); }

One more thing i liked about play. If i need to do some task that might wait for some time on I/O, i can return Future[Result] from the controllers instead of Result. Lets say i have a method suggestPeople that returns a list of user ids that might be interesting to a person. Now if i am processing the request synchronously my code would be this

def suggestPeopleList = Action { implicit request =>
  // authentication and other stuff
  val peopleIds: List[Int] = suggestPeople(userId)          // the type hint is not needed here. its just to illustrate the point
  peopleIds match {
    case head::tail =>
      Ok(Json.toJson(Json.obj(
        "people"-> getDetailsOfPeopleWithIds(peopleIds),
        "success"-> true
      )))

    case Nil =>
      Ok(Json.toJson(Json.obj(
        "people"-> List(),
        "success"-> true
      )))
  }
}

Here the request processing thread is blocked on suggestPeople since this function reads data from multiple sources and is I/O bound. A better way is to use code like this

def suggestPeopleList = Action.async { implicit request =>
  // authentication and other stuff
  val peopleIds: Future[List[Int]] = Future {       // the type hint is not needed here. its just to illustrate the point
    suggestPeople(userId)
  }.recover {
    case e: Exception =>
      Logger.error(e.getMessage)
      None
  }

  peopleIds map {
    case head::tail =>
      Ok(Json.toJson(Json.obj(
        "people"-> getDetailsOfPeopleWithIds(peopleIds),
        "success"-> true
      )))

    case Nil =>
      Ok(Json.toJson(Json.obj(
        "people"-> List(),
        "success"-> true
      )))

    case None =>                // this is the case where some exception occured
      Ok(Json.toJson(Json.obj(
        "success"-> false
      )))
  }
}

Here the request processing thread is not blocked on suggestPeople because its executed in a Future. Btw, this is what Play recommends, to write all controllers because any request that results in calling an I/O bound function does not block the current thread processing the request but only the client. Thats all from me for now. I am new to Scala and Play and this is just my first app so the code here would not be the best way to do things. Any feedback is apprecaited. Thanks.

This entry was tagged as scala play redis s3

blog comments powered by Disqus