Coffee. On the rocks!

Websockets with Play Framework and Android

Thu 02 July 2015

In this post i will write about using websockets in Android using AndroidAsync and Play Framework(2.3.x). Consider the case of a very simple messaging app. As soon as the app is opened it established a websocket connection with the server and now the app can send and receive messages. Also the app uses a simple id and password combo for authentication. If the auth fails the connection is rejected. Also the app sends the message in json format with keys receiverId and content. First i'll need the websocket endpoint at which the interaction with the app can happen.

In Play one of the ways you can handle a websocket connection is by dedicating an Akka actor for each websocket connection. When an app tries to initiate the connection a new actor is created which will be responsible only for the connection of that app only. To do this i will create an actor named WebSocketActor. Its receive method is called each time a message is sent by the app. This is how it looks

object WebSocketActor {
    def props(out: ActorRef) = Props(new WebSocketActor(out))
}

class WebSocketActor(out: ActorRef) extends Actor {
    def receive = {
      case msg: String =>
        try {        // try to parse message as json
          val messageAsJson = Json.parse(msg)

          // will talk about delivering the message to the receiver later in this post

          // end a confirmation to the sender
          out ! Json.obj(ConfirmationMessageString)
        }
        catch {
          case e: Exception =>
            // the message that the sender sent was not json so send an error message
            out ! Json.obj(InValidJsonMessageString)
        }
    }
}

Now i need to tell Play to use this actor for the websocket connnection. To do this Play provides a Websocket object which has several methods(which you can see here). Because i might need to reject a connection if the auth fails i will use a method called tryAcceptWithActor. In that method if the auth is successful then i start a WebSocketActor with the ActorRef out to handle the connection otherwise i send a forbidden message.

def wsocket = WebSocket.tryAcceptWithActor[String, JsValue] { request =>
    (request.queryString.get("id"), request.queryString.get("password")) match {
      case (Some(userId), Some(password)) =>
        Future.successful(User.authenticate(userId(0), password(0)) match {
          case Some(user) => Right(out => WebSocketActor.props(out))        // auth successful so start a new actor
          case None => Left(Forbidden)
        })
      case _ =>
        Future.successful(Left(Forbidden))
    }
  }

Now our websocket endpoint can receive messages and reply with confirmation or error messages. But now i want that whenever a user establishes a connection(read logs in) he gets all the messages that were sent to him when he was not connected. So as soon as a user is successfully connected he should be sent a List[Msg]``(actually json of ``List[Msg]). The way i do that is i override the preStart method of the actor in that method it sends itself a list of messages. Also the receive method needs to be modified so that it can accept a list of messages. Here is how i do that

class WebSocketActor(out: ActorRef) extends Actor {
    override def preStart() = {
        self ! List[Msg]
    }

    def receive = {
        case msg: String =>
            // same as above

        case lst: List[Msg] =>
            out ! Json.obj("messages"-> lst.map(_.toJson))
    }
}

This takes care of the websocket endpoint. Now lets move to the client side(Android) code. I am using AndroidAsync to deal with WebSockets. For demonstration purpose i am opening the websocket connection in the onResume method and closing it in onStop method. I follow a very simple strategy. In onResume i execute a Runnable that tries to establish a connection. If it succeeds then it prints a log message and if it fails because of timeout or any connection issue it retries after some time. In the run method of Runnable i call the websocket method of the AsyncHttpClient which expects a callback as one of its parmeters. The callback has a method called onCompleted which is called when connection attempt is completed. The attempt might or might not be successful. If the attempt is successful then i get a non-null WebSocket object. If the connection is successful there are 3 things i will need to do.

  1. Know when i received something from the server To do something when i receive something from the server i pass a callback to the setStringCallback of the WebSocket object.
  2. Know when the server closed the connection To do something when the server closes the connection i pass a callback to the setClosedCallback of the WebSocket object.
  3. Send a message to the server To send a message to server i use the send method of the WebSocket object. Read the code below to get a clearer idea.

The activity includes a sendToServer message that the app uses to send message to server.

public class MainActivity extends Activity {
    private Handler mWsHandler = new Handler();
    private Runnable mWsRunnableCode = null;
    private WebSocket mWebSocket = null;
    private int retryDelay = 0;
    AsyncHttpClient client = AsyncHttpClient.getDefaultInstance();
    String wsUrl = "http://webocketHandler.url";

    // some code

    @Override
    public void onResume() {
        super.onResume();
        mWsRunnableCode = new Runnable() {
            @Override
            public void run() {
                client.websocket(wsUrl, null, new AsyncHttpClient.WebSocketConnectCallback() {
                    @Override
                    public void onCompleted(Exception ex, final WebSocket webSocket) {
                        if (ex != null) {           // When there is an exception
                            if (ex instanceof ConnectException || ex instanceof TimeoutException) {
                                ex.printStackTrace();
                                retryDelay = retryDelay == 0 ? 2 : retryDelay*2;
                                Log.v("retry", "retrying after " + String.valueOf(retryDelay));
                                mWsHandler.postDelayed(mWsRunnableCode, retryDelay * 1000);        // retry after some time
                            }
                            else
                                ex.printStackTrace();
                            return;
                        }

                        if (webSocket == null) {            // When webscoket is rejected
                            // do something because auth failed
                            return;
                        }

                        // websocket connection successful
                        retryDelay = 0;
                        mWebSocket = webSocket;
                        mWebSocket.setStringCallback(new WebSocket.StringCallback() {
                            public void onStringAvailable(String s) {
                                System.out.println("I got a string as websocket reponse: " + s);
                                // do something with the response
                            }
                        });

                        // websocket connection closed by server
                        mWebSocket.setClosedCallback(new CompletedCallback() {
                            @Override
                            public void onCompleted(Exception ex) {
                                Log.v("websocket", "disconnected");
                            }
                        });
                    }
                });
            }
        };

        mWsHandler.post(mWsRunnableCode);   // execute runnable
    }

    @Override
    public void onPause() {
        super.onPause();
        Log.v("State", "Pausing app");
        if (mWsRunnableCode != null) {
            mWsHandler.removeCallbacks(mWsRunnableCode);
            mWsRunnableCode = null;
        }
        if (mWebSocket != null)
            mWebSocket.close();
        retryDelay = 0;
    }

    private void sendToServer(String msg) {
        mWebSocket.send(msg);
    }
}

Now what about the actual chatting, i.e when you are using the app and sending and receiving messages. For this i will use Redis for its Pub-Sub feature. The idea is very simple. Any user who logs into the app subscribes to a channel whose name is equal to the id of that user. So each user is subscribed to one and only one unique channel. When another user needs to send a message to the first user he publishes the message to the first user's channel. For example if i have 2 users with id u1 and u2, u1 has subscribed to channel c1 and u2 has subscribed to channel c2. When u1 needs to send send a message to u2, he publishes the message on channel c2 and because u2 has subscribed to c2, it will receive the message that was sent to it by u1.

The Redis library i would be using here is rediscala (i wrote a post about rediscala). What i need is every user who successfully logs in subscribes to a channel that is unique to him. The way Pub-Sub is done with rediscala is that every subscription to a channel a new actor is created. For that in the preStart method of the WebSocketActor a new UserChannelSubscriptionActor needs to be created. Also when the WebSocketActor is stopped(because the user closed the app), the UserChannelSubscriptionActor needs to be stopped. For that to happend i can either have an ActorRef of UserChannelSubscriptionActor inside the WebSocketActor or i can make UserChannelSubscriptionActor child of WebSocketActor so it stops when the parent stops.

import redis.actors.RedisSubscriberActor
import redis.api.pubsub.{PMessage, Message}

case class SubscribedMessage(data: String)

class UserChannelSubscriptionActor(wsActor: ActorRef, channels: Seq[String] = Nil, patterns: Seq[String] = Nil)
    extends RedisSubscriberActor(new InetSocketAddress(current.configuration.getString("redis.host").get, current.configuration.getInt("redis.port").get), channels, patterns) {

  def onMessage(message: Message) {
    Logger.info(s"published message received: $message")
    wsActor ! SubscribedMessage(message.data)
  }
}

If you see above UserChannelSubscriptionActor needs a reference to the WebSocketActor so it can pass the message received from Redis to WebSocketActor. Now i need to modify the preStart and postStop methods of WebSocketActor to start and stop UserChannelSubscriptionActor. Also the receive method of WebSocketActor needs to accept a SubscribedMessage from which it can accept the data. Also the case where receive method gets a String (message from the user) it needs to publish the message to Redis channel of the receiver. Here is how the code for WebSocketActor would be

implicit val akkaSystem = Akka.system

val redisClient = RedisClient(host=current.configuration.getString("redis.host").get, port=current.configuration.getInt("redis.port").get)

object WebSocketActor {
    def props(out: ActorRef, user: User) = Props(new WebSocketActor(out, user))
}

class WebSocketActor(out: ActorRef, user: User) extends Actor {

    var subscriberActor = null

    override def preStart() = {
        subscriberActor = akkaSystem.actorOf(Props(classOf[UserChannelSubscriptionActor], self, Seq(user.subscribeChannelName), Nil).withDispatcher("rediscala.rediscala-client-worker-dispatcher"))
        self ! List[Msg]
    }

    override def postStop() = {
        subscriberActor ! PoisonPill        // stop the subscriberActor because websocket connection disconnected
    }

    def receive = {
      case msg: String =>
        try {        // try to parse message as json
          val messageAsJson = Json.parse(msg)

          // send the message to the receiver
          redisClient.publish(receiverChannelName, messageContent)

          // also save the message for later delivery in case the receiver is offline now

          //send a confirmation to the sender
          out ! Json.obj(ConfirmationMessageString)
        }
        catch {
          case e: Exception =>
            // the message that the sender sent was not json so send an error message
            out ! Json.obj(InValidJsonMessageString)
        }

      case m: SubscribedMessage =>
        out ! Json.obj("data"-> m.data)

      case lst: List[Msg] =>
        out ! Json.obj("messages"-> lst.map(_.toJson))
    }
}

Thats all for this post. Feedback is appreciated.

This entry was tagged as scala playframework android

blog comments powered by Disqus