Reference


Table of Contents


Installation

Cettia Java Server requires Java 7 and is distributed through Maven Central. Add the following dependency to your build or include it on your classpath manually.

<dependency>
  <groupId>io.cettia</groupId>
  <artifactId>cettia-server</artifactId>
  <version>1.0.0-Beta1</version>
</dependency>

Asity is created to run a cettia application on any platform transparently. See their reference guide for what platforms are supported, how to install a cettia application on them and what you can do if your favorite platform is not supported.

Examples

Setting up a server on Servlet 3 and Java WebSocket API 1.

@WebListener
public class Bootstrap implements ServletContextListener {
  @Override
  public void contextInitialized(ServletContextEvent event) {
    // Cettia server
    Server server = new DefaultServer();
    server.onsocket(socket -> {})

    // Servlet
    HttpTransportServer httpTransportServer = new HttpTransportServer().ontransport(server);
    ServletContext context = event.getServletContext();
    Servlet servlet = new CettiaServlet().onhttp(httpTransportServer);
    ServletRegistration.Dynamic reg = context.addServlet(CettiaServlet.class.getName(), servlet);
    reg.setAsyncSupported(true);
    reg.addMapping("/cettia");

    // Java WebSocket API
    final WebSocketTransportServer wsTransportServer = new WebSocketTransportServer().ontransport(server);
    ServerContainer container = (ServerContainer) context.getAttribute(ServerContainer.class.getName());
    ServerEndpointConfig config = ServerEndpointConfig.Builder.create(CettiaServerEndpoint.class, "/cettia")
    .configurator(new Configurator() {
      @Override
      public <T> T getEndpointInstance(Class<T> endpointClass) throws InstantiationException {
        return endpointClass.cast(new CettiaServerEndpoint().onwebsocket(wsTransportServer));
      }
    })
    .build();
    try {
      container.addEndpoint(config);
    } catch (DeploymentException e) {
      throw new RuntimeException(e);
    }
  }

  @Override
  public void contextDestroyed(ServletContextEvent sce) {}
}

Server

The interface to consume transport and produce and manage socket. It represents a cettia application.

Configuring a server

The Cettia protocol options should be centralized in server side and configured through DefaultServer. Every option has a proper default value so you don’t need to touch it unless there’s anything else.

Heartbeat

An heartbeat interval value in milliseconds. Each time the heartbeat option has elapsed, an heartbeat event should be exchanged between the client and the server. First the client should send the heartbeat event to the server 5 seconds before the heartbeat timer expires, and the server should echo back the heartbeat event to the client within 5 seconds. Otherwise, both client and server fire the close event. The default value is 20000, and the value must be larger than 5000.

server.setHeartbeat(15 * 1000);

Handling a socket

When a transport is established and accordingly a socket is created, actions added via onsocket(Action<ServerSocket> action) are executed with it. It’s allowed to add several actions at any time, so you don’t need to centralize all your code to one class.

Note

  • You can add socket event handlers to a given socket but not send events through the socket as it’s not yet opened. The first event where communication is possible is the socket’s open event.
server.onsocket((ServerSocket socket) -> {
  // You can't send events here
  socket.onopen((Void v) -> {
    // but you can do that here
    socket.send("echo", "Hi");
  });
});

Selecting sockets

It’s a common use case to select some sockets and do something with them like dealing with persistence entities or HTML elements. All you need is to write a socket action and pass it to the server’s selector method. Then, the server will find the corresponding sockets and execute the action with them. Sockets being passed to the action are always either in the opened state or in the closed state.

Note

  • Just handle given sockets regardless of their state as you please. If you send some events through closed sockets, these events will be passed to each socket’s cache event where you can cache that event and send it on next reconnection. For more information about the cache event, see the Offline handling section.

All

all(Action<ServerSocket> action) executes the given action finding all of the socket in this server.

server.all((ServerSocket socket) -> {
  // Your logic here
});

By tag

A socket may have several tags and a tag may have several sockets exactly like many-to-many relationship. byTag(String[] names, Action<ServerSocket> action) finds sockets which have the given tag names in the server and executes the given action. For more information about the tag, see the Tagging section.

server.byTag("/user/flowersinthesand", (ServerSocket socket) -> {
  // Your logic here
});

Writing a sentence

Sentence is a fluent interface to deal with a group of sockets with ease. All finder methods return a sentence when being called without action. Use of sentence is always preferred to use of action if the goal is the same. Because, it enables to write one-liner action and internally uses actions implementing Serializable in execution, which is typically required in clustering.

server.all().send("foo", "bar");
server.byTag("/room/201", "/room/301").send("message", "time to say goodbye").close();

ServerSocket

The interface to represent a server-side socket.

Properties

These are read only.

State

The current state of the socket.

socket.state();

URI

A URI used to connect.

URI.create(socket.uri()).getQuery();

Tags

A modifiable set of tag names.

Set<String> tags = socket.tags();
tags.add("account#flowersinthesand");

Lifecycle

Socket always is in a specific state that can be determined by state() method. Transition between states occurs according to the underlying transport. The following list is a list of the state which a socket can be in.

  • null

    As the initial state of the lifecycle, it has been used only until the handshake is performed since a socket is created. The server’s socket event is fired with a created socket. You can add or remove event handlers but can’t exchange event in this state.

    State transition occurs to

    • OPENED: if the handshake is performed successfully.
    • CLOSED: if there was any error in performing the handshake.

  • OPENED

    The handshake is performed successfully and communication is possible. The open event is fired. Only in this state, the socket can send and receive events via connection. Note that a closed socket can be opened again, and a reference to the socket isn’t affected by disconnection and reconnection.

    State transition occurs to

    • CLOSED: if the underlying transport is closed for some reason.

  • CLOSED

    The underlying transport is closed for some reason. The close event is fired. In this state, sending and receiving events is not possible but sent events in this state are passed to the cache event so that you can cache and send them on next reconnection. It is the same for the client.

    State transition occurs to

    • OPENED: if the client reconnects to the server and the handshake is performed successfully.
    • DELETED: if the client has not reconnected to the server for a long time i.e. 1 minute.

  • DELETED

    As the final state of the lifecycle, it applies to sockets whose the underlying transport has been closed for a long time i.e. 1 minute. The delete event is fired. A socket in this state is already evicted from the server, hence, it shouldn’t and can’t be used.

Handling errors

To capture any error happening in socket, add error event handler. As an argument, Throwable in question is passed. Exceptions from the underlying transport are also propagated.

Note

  • In most cases, there is no error that you can ignore safely. You should watch this event and log thrown errors.
  • Errors thrown by user created event handler are not propagated to error event.

Sending and receiving events

on(String event, Action<T> action) attaches an event handler. The allowed types for T is determined by a format which is used to deserialize transport message. Now that format corresponds to Jackson’s data format. By default, built-in data format is used for text message and MessagePack data format is used for binary message.

send(String event) and send(String event, Object data) send an event with or without data, respectively. Unlike when receiving event, when sending event, you can use any type of data whether it is text, binary or composite.

Note

  • Any event name can be used except reserved ones: open, close, cache, delete and error.
  • If data or one of its properties is byte[] or ByteBuffer, it is regarded as binary. Though, you don’t need to be aware of that.
  • To manage a lot of events easily, use URI as event name format like /account/update.
  • If you send an event via a closed socket, it will be delegated to that socket’s cache event so you don’t need to worry about socket’s state when sending event.

The client sends an event and the server echoes back to the client.

Server

server.onsocket((ServerSocket socket) -> {
  socket.on("echo", (Object data) -> {
    System.out.println(data);
    socket.send("echo", data);
  });
});

Client

cettia.open("http://localhost:8080/cettia")
.on("open", function() {
  this.send("echo", "echo");
  this.send("echo", new Buffer("echo"));
  this.send("echo", {text: "echo", binary: new Buffer("echo")});
})
.on("echo", function(data) {
  console.log(data);
});

The server sends an event and the client echoes back to the server.

Server

server.onsocket((ServerSocket socket) -> {
  socket.onopen((Void v) -> {
    socket.send("echo", "echo");
    socket.send("echo", "echo".getBytes());
    socket.send("echo", ByteBuffer.wrap("echo".getBytes()));
    socket.send("echo", new LinkedHashMap<String, Object>() {
      {
        put("text", "echo");
        put("binary", "echo".getBytes());
      }
    });
  });
  socket.on("echo", (Object data) -> System.out.println(data));
});

Client

cettia.open("http://localhost:8080/cettia")
.on("echo", function(data) {
  console.log(data);
  this.send("echo", data);
})

Offline handling

Once the underlying transport is disconnected, it’s not possible to send an event through a socket that transport underlies until the client reconnects and the new transport replaces the old one. To cache event which is being passed to send method while offline and send it on next reconnection, add cache event handler with open and delete event handler. The cache event is fired if the send method is called when there is no connection with an object array of arguments used to call the send method.

Note

  • There is no default behavior for offline handling.

Caching events while offline and sending them on next reconnection.

server.onsocket((ServerSocket socket) -> {
  // A queue containing events the server couldn't send to the client while disconnection
  Queue<Object[]> cache = new ConcurrentLinkedQueue<>();
  // Fired if the send method is called when there is no connection
  socket.oncache((Object[] args) -> {
    // You can determine whether or not to cache this arguments used to call the send method
    // For example, in some cases, you may want to avoid caching to deliver live data in time
    cache.offer(args);
  });
  socket.onopen((Void v) -> {
    // Now that communication is possible, you can flush the cache
    while (socket.state() == ServerSocket.State.OPENED && !cache.isEmpty()) {
      // Removes the first event from the cache and sends it to the client one by one
      Object[] args = cache.poll();
      socket.send((String) args[0], args[1], (Action<?>) args[2], (Action<?>) args[3]);
    }
  });
  socket.ondelete((Void v) -> {
    // If the cache is not empty, that is to say, there are still some messages user should receive,
    if (!cache.isEmpty()) {
      // here you can send an email to notify user or utilize database for user to check on next logging in
    }
  });
});

Tagging

A socket is not suitable for handling a specific entity in the real world. For example, when a user signs in using multiple devices like desktop, laptop, tablet and smartphone, if someone sends a message, it should be delivered to all devices where the user signed in. To do that, you need a way to handle multiple sockets, or the devices, as a single entity, or the user.

That’s why tag is introduced. A tag is used to point to a group of sockets. Tag set is managed only by server and unknown to client. tag(String... names)/untag(String... names) attcahes/detaches given names of tags to/from a socket.

Note

  • Authentication result can be dealt with as a tag.
  • To manage a lot of tags easily, use URI as tag name format like /account/flowersinthesand.

Notifying user using multiple devices of the login/logout from some specific device.

server.onsocket((ServerSocket socket) -> {
  // An imaginary helper class to handle URI
  Uri uri = Uris.parse(socket.uri());
  String username = uri.param("username");
  String devicename = uri.param("devicename");

  socket.tag(username);
  socket.onopen((Void v) -> server.byTag(username).send("/login", "Using device " + devicename));
  socket.onclose((Void v) -> server.byTag(username).send("/logout", "Using device " + devicename));
});

Handling the result of the remote event processing

You can get the result of event processing from the client in sending event using send(String event, Object data, Action<T> onFulfilled) and send(String event, Object data, Action<T> onFulfilled, Action<U> onRejected) where the allowed types, T, are the same with in receiving event, and set the result of event processing to the client in receiving event by using Reply as data type in an asynchronous manner. You can apply this functionality to Acknowledgements, Remote Procedure Call and so on.

Note

  • If the client doesn’t call either attached fulfilled or rejected callback, these callbacks won’t be executed in any way. It is the same for the server. Therefore, it should be dealt with as a kind of contract.
  • Beforehand determine whether to use rejected callback or not to avoid writing unnecessary rejected callbacks. For example, if required resource is not available, you can execute either fulfilled callback with null or rejected callback with exception e.g. ResourceNotFoundException.

The client sends an event attaching callbacks and the server executes one of them with the result of event processing.

Server

server.onsocket((ServerSocket socket) -> {
  socket.on("/account/find", (Reply<String> reply) -> {
    String id = reply.data();
    System.out.println(id);
    try {
      reply.resolve(accountService.findById(id));
    } catch(EntityNotFoundException e) {
      reply.reject(e.getMessage());
    }
  });
});

Client

cettia.open("http://localhost:8080/cettia")
.on("open", function(data) {
  this.send("/account/find", "flowersinthesand", function(data) {
    console.log("fulfilled", data);
  }, function(data) {
    console.log("rejected", data);
  });
});

The server sends an event attaching callbacks and the client executes one of them with the result of event processing.

Server

server.onsocket((ServerSocket socket) -> {
  socket.onopen((Void v) -> {
    socket.send("/account/find", "flowersinthesand",
      (Map<String, Object> data) -> System.out.println("fulfilled " + data),
      (String data) -> System.out.println("rejected " + data));
  });
});

Client

cettia.open("http://localhost:8080/cettia")
.on("/account/find", function(id, reply) {
  console.log(id);
  try {
    reply.resolve(accountService.findById(id));
  } catch(e) {
    reply.reject(e.message);
  }
});

Accessing underlying objects

In any case, transport underlies socket and resource like HTTP request-response exchange and WebSocket underlies transport. To access such underlying objects like HTTP session, use unwrap(Class<?> clazz).

Note

  • Don’t manipulate returned objects unless you know what you are doing.

Integration

Here is how to integrate cettia application with awesome technologies.

Dependency injection

With dependency injection, you can inject a server wherever you need. Register a Server as a singleton component and inject it wherever you want to handle socket.

Examples

Dealing with a server as a component using Spring.

@WebListener
public class Bootstrap implements ServletContextListener {
  @Override
  @SuppressWarnings("resource")
  public void contextInitialized(ServletContextEvent event) {
    AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(SpringConfig.class);
    Server server = applicationContext.getBean(Server.class);
    // ... skipped
  }

  @Override
  public void contextDestroyed(ServletContextEvent sce) {}
}
@Configuration
@EnableScheduling
@ComponentScan(basePackages = { "simple" })
public class SpringConfig {
  // Registers the server as a component
  @Bean
  public Server server() {
    return new DefaultServer();
  }
}
@Component
public class Clock {
  // Injects the server
  @Autowired
  private Server server;

  @Scheduled(fixedRate = 3000)
  public void tick() {
    server.all().send("chat", "tick: " + System.currentTimeMillis());
  }
}

Clustering

All of the Message Oriented Middleware (MOM) supporting publish and subscribe model can be used to cluster multiple cettia applications with ClusteredServer. ClusteredServer intercepts a method invocation to all and byTag, converts the call into a message and executes actions added via onpublish(Action<Map<String, Object>> action) with that message.

All you need is to add an action to onpublish(Action<Map<String, Object>> action) to publish message to all servers in the cluster including the one issued and to pass them to messageAction().on(Map<String, Object> message) when receiving such messages from other server.

Note

  • Most MOMs in Java require message to be serialized. In other words, Action instance used in all and byTag (not onsocket) should implement Serializable. Whereas Action is generally used as anonymous class, Serializable can’t be used in that manner. Therefore always use Sentence instead of Action if possible especially in this case. However, Java 8’s lambda has no such issues thanks to additional bound. For example, you can use a lambda like server.all((Action<ServerSocket> & Serializable) socket -> socket.send("chat", "Hi")).

Examples

Hazelcast example.

@WebListener
public class Bootstrap implements ServletContextListener {
  @Override
  public void contextInitialized(ServletContextEvent event) {
    ClusteredServer server = new ClusteredServer();
    HazelcastInstance hazelcast = HazelcastInstanceFactory.newHazelcastInstance(new Config());
    ITopic<Map<String, Object>> topic = hazelcast.getTopic("cettia");
    // Some server in the cluster published a message
    // Pass it to this local server
    topic.addMessageListener((Message<Map<String, Object>> message) -> server.messageAction().on(message.getMessageObject()));
    // This local server got a method call from 'all' or 'byTag' and created a message
    // Publish it to every server in the cluster
    server.onpublish((Map<String, Object> message) -> topic.publish(message));
    // ... skipped
  }

  @Override
  public void contextDestroyed(ServletContextEvent sce) {}
}