Reference
Table of Contents
Installation
Cettia Java Server requires Java 7 and is distributed through Maven Central. Add the following dependency to your build or include it on your classpath manually.
<dependency>
<groupId>io.cettia</groupId>
<artifactId>cettia-server</artifactId>
<version>1.0.0-Beta1</version>
</dependency>
Asity is created to run a cettia application on any platform transparently. See their reference guide for what platforms are supported, how to install a cettia application on them and what you can do if your favorite platform is not supported.
Examples
Setting up a server on Servlet 3 and Java WebSocket API 1.
@WebListener
public class Bootstrap implements ServletContextListener {
@Override
public void contextInitialized(ServletContextEvent event) {
// Cettia server
Server server = new DefaultServer();
server.onsocket(socket -> {})
// Servlet
HttpTransportServer httpTransportServer = new HttpTransportServer().ontransport(server);
ServletContext context = event.getServletContext();
Servlet servlet = new CettiaServlet().onhttp(httpTransportServer);
ServletRegistration.Dynamic reg = context.addServlet(CettiaServlet.class.getName(), servlet);
reg.setAsyncSupported(true);
reg.addMapping("/cettia");
// Java WebSocket API
final WebSocketTransportServer wsTransportServer = new WebSocketTransportServer().ontransport(server);
ServerContainer container = (ServerContainer) context.getAttribute(ServerContainer.class.getName());
ServerEndpointConfig config = ServerEndpointConfig.Builder.create(CettiaServerEndpoint.class, "/cettia")
.configurator(new Configurator() {
@Override
public <T> T getEndpointInstance(Class<T> endpointClass) throws InstantiationException {
return endpointClass.cast(new CettiaServerEndpoint().onwebsocket(wsTransportServer));
}
})
.build();
try {
container.addEndpoint(config);
} catch (DeploymentException e) {
throw new RuntimeException(e);
}
}
@Override
public void contextDestroyed(ServletContextEvent sce) {}
}
Server
The interface to consume transport and produce and manage socket. It represents a cettia application.
Configuring a server
The Cettia protocol options should be centralized in server side and configured through DefaultServer
. Every option has a proper default value so you don’t need to touch it unless there’s anything else.
Heartbeat
An heartbeat interval value in milliseconds. Each time the heartbeat
option has elapsed, an heartbeat
event should be exchanged between the client and the server. First the client should send the heartbeat
event to the server 5 seconds before the heartbeat timer expires, and the server should echo back the heartbeat
event to the client within 5 seconds. Otherwise, both client and server fire the close
event. The default value is 20000
, and the value must be larger than 5000
.
server.setHeartbeat(15 * 1000);
Handling a socket
When a transport is established and accordingly a socket is created, actions added via onsocket(Action<ServerSocket> action)
are executed with it. It’s allowed to add several actions at any time, so you don’t need to centralize all your code to one class.
Note
- You can add socket event handlers to a given socket but not send events through the socket as it’s not yet opened. The first event where communication is possible is the socket’s
open
event.
server.onsocket((ServerSocket socket) -> {
// You can't send events here
socket.onopen((Void v) -> {
// but you can do that here
socket.send("echo", "Hi");
});
});
Selecting sockets
It’s a common use case to select some sockets and do something with them like dealing with persistence entities or HTML elements. All you need is to write a socket action and pass it to the server’s selector method. Then, the server will find the corresponding sockets and execute the action with them. Sockets being passed to the action are always either in the opened state or in the closed state.
Note
- Just handle given sockets regardless of their state as you please. If you send some events through closed sockets, these events will be passed to each socket’s
cache
event where you can cache that event and send it on next reconnection. For more information about thecache
event, see the Offline handling section.
All
all(Action<ServerSocket> action)
executes the given action finding all of the socket in this server.
server.all((ServerSocket socket) -> {
// Your logic here
});
By tag
A socket may have several tags and a tag may have several sockets exactly like many-to-many relationship. byTag(String[] names, Action<ServerSocket> action)
finds sockets which have the given tag names in the server and executes the given action. For more information about the tag, see the Tagging section.
server.byTag("/user/flowersinthesand", (ServerSocket socket) -> {
// Your logic here
});
Writing a sentence
Sentence
is a fluent interface to deal with a group of sockets with ease. All finder methods return a sentence when being called without action. Use of sentence is always preferred to use of action if the goal is the same. Because, it enables to write one-liner action and internally uses actions implementing Serializable
in execution, which is typically required in clustering.
server.all().send("foo", "bar");
server.byTag("/room/201", "/room/301").send("message", "time to say goodbye").close();
ServerSocket
The interface to represent a server-side socket.
Properties
These are read only.
State
The current state of the socket.
socket.state();
URI
A URI used to connect.
URI.create(socket.uri()).getQuery();
Tags
A modifiable set of tag names.
Set<String> tags = socket.tags();
tags.add("account#flowersinthesand");
Lifecycle
Socket always is in a specific state that can be determined by state()
method. Transition between states occurs according to the underlying transport. The following list is a list of the state which a socket can be in.
-
null
As the initial state of the lifecycle, it has been used only until the handshake is performed since a socket is created. The server’s
socket
event is fired with a created socket. You can add or remove event handlers but can’t exchange event in this state.State transition occurs to
OPENED
: if the handshake is performed successfully.CLOSED
: if there was any error in performing the handshake.
-
OPENED
The handshake is performed successfully and communication is possible. The
open
event is fired. Only in this state, the socket can send and receive events via connection. Note that a closed socket can be opened again, and a reference to the socket isn’t affected by disconnection and reconnection.State transition occurs to
CLOSED
: if the underlying transport is closed for some reason.
-
CLOSED
The underlying transport is closed for some reason. The
close
event is fired. In this state, sending and receiving events is not possible but sent events in this state are passed to thecache
event so that you can cache and send them on next reconnection. It is the same for the client.State transition occurs to
OPENED
: if the client reconnects to the server and the handshake is performed successfully.DELETED
: if the client has not reconnected to the server for a long time i.e. 1 minute.
-
DELETED
As the final state of the lifecycle, it applies to sockets whose the underlying transport has been closed for a long time i.e. 1 minute. The
delete
event is fired. A socket in this state is already evicted from the server, hence, it shouldn’t and can’t be used.
Handling errors
To capture any error happening in socket, add error
event handler. As an argument, Throwable
in question is passed. Exceptions from the underlying transport are also propagated.
Note
- In most cases, there is no error that you can ignore safely. You should watch this event and log thrown errors.
- Errors thrown by user created event handler are not propagated to
error
event.
Sending and receiving events
on(String event, Action<T> action)
attaches an event handler. The allowed types for T
is determined by a format which is used to deserialize transport message. Now that format corresponds to Jackson’s data format. By default, built-in data format is used for text message and MessagePack data format is used for binary message.
send(String event)
and send(String event, Object data)
send an event with or without data, respectively. Unlike when receiving event, when sending event, you can use any type of data whether it is text, binary or composite.
Note
- Any event name can be used except reserved ones:
open
,close
,cache
,delete
anderror
. - If data or one of its properties is
byte[]
orByteBuffer
, it is regarded as binary. Though, you don’t need to be aware of that. - To manage a lot of events easily, use URI as event name format like
/account/update
. - If you send an event via a closed socket, it will be delegated to that socket’s
cache
event so you don’t need to worry about socket’s state when sending event.
The client sends an event and the server echoes back to the client.
Server
server.onsocket((ServerSocket socket) -> {
socket.on("echo", (Object data) -> {
System.out.println(data);
socket.send("echo", data);
});
});
Client
cettia.open("http://localhost:8080/cettia")
.on("open", function() {
this.send("echo", "echo");
this.send("echo", new Buffer("echo"));
this.send("echo", {text: "echo", binary: new Buffer("echo")});
})
.on("echo", function(data) {
console.log(data);
});
The server sends an event and the client echoes back to the server.
Server
server.onsocket((ServerSocket socket) -> {
socket.onopen((Void v) -> {
socket.send("echo", "echo");
socket.send("echo", "echo".getBytes());
socket.send("echo", ByteBuffer.wrap("echo".getBytes()));
socket.send("echo", new LinkedHashMap<String, Object>() {
{
put("text", "echo");
put("binary", "echo".getBytes());
}
});
});
socket.on("echo", (Object data) -> System.out.println(data));
});
Client
cettia.open("http://localhost:8080/cettia")
.on("echo", function(data) {
console.log(data);
this.send("echo", data);
})
Offline handling
Once the underlying transport is disconnected, it’s not possible to send an event through a socket that transport underlies until the client reconnects and the new transport replaces the old one. To cache event which is being passed to send
method while offline and send it on next reconnection, add cache
event handler with open
and delete
event handler. The cache
event is fired if the send
method is called when there is no connection with an object array of arguments used to call the send
method.
Note
- There is no default behavior for offline handling.
Caching events while offline and sending them on next reconnection.
server.onsocket((ServerSocket socket) -> {
// A queue containing events the server couldn't send to the client while disconnection
Queue<Object[]> cache = new ConcurrentLinkedQueue<>();
// Fired if the send method is called when there is no connection
socket.oncache((Object[] args) -> {
// You can determine whether or not to cache this arguments used to call the send method
// For example, in some cases, you may want to avoid caching to deliver live data in time
cache.offer(args);
});
socket.onopen((Void v) -> {
// Now that communication is possible, you can flush the cache
while (socket.state() == ServerSocket.State.OPENED && !cache.isEmpty()) {
// Removes the first event from the cache and sends it to the client one by one
Object[] args = cache.poll();
socket.send((String) args[0], args[1], (Action<?>) args[2], (Action<?>) args[3]);
}
});
socket.ondelete((Void v) -> {
// If the cache is not empty, that is to say, there are still some messages user should receive,
if (!cache.isEmpty()) {
// here you can send an email to notify user or utilize database for user to check on next logging in
}
});
});
Tagging
A socket is not suitable for handling a specific entity in the real world. For example, when a user signs in using multiple devices like desktop, laptop, tablet and smartphone, if someone sends a message, it should be delivered to all devices where the user signed in. To do that, you need a way to handle multiple sockets, or the devices, as a single entity, or the user.
That’s why tag is introduced. A tag is used to point to a group of sockets. Tag set is managed only by server and unknown to client. tag(String... names)
/untag(String... names)
attcahes/detaches given names of tags to/from a socket.
Note
- Authentication result can be dealt with as a tag.
- To manage a lot of tags easily, use URI as tag name format like
/account/flowersinthesand
.
Notifying user using multiple devices of the login/logout from some specific device.
server.onsocket((ServerSocket socket) -> {
// An imaginary helper class to handle URI
Uri uri = Uris.parse(socket.uri());
String username = uri.param("username");
String devicename = uri.param("devicename");
socket.tag(username);
socket.onopen((Void v) -> server.byTag(username).send("/login", "Using device " + devicename));
socket.onclose((Void v) -> server.byTag(username).send("/logout", "Using device " + devicename));
});
Handling the result of the remote event processing
You can get the result of event processing from the client in sending event using send(String event, Object data, Action<T> onFulfilled)
and send(String event, Object data, Action<T> onFulfilled, Action<U> onRejected)
where the allowed types, T
, are the same with in receiving event, and set the result of event processing to the client in receiving event by using Reply
as data type in an asynchronous manner. You can apply this functionality to Acknowledgements, Remote Procedure Call and so on.
Note
- If the client doesn’t call either attached fulfilled or rejected callback, these callbacks won’t be executed in any way. It is the same for the server. Therefore, it should be dealt with as a kind of contract.
- Beforehand determine whether to use rejected callback or not to avoid writing unnecessary rejected callbacks. For example, if required resource is not available, you can execute either fulfilled callback with
null
or rejected callback with exception e.g.ResourceNotFoundException
.
The client sends an event attaching callbacks and the server executes one of them with the result of event processing.
Server
server.onsocket((ServerSocket socket) -> {
socket.on("/account/find", (Reply<String> reply) -> {
String id = reply.data();
System.out.println(id);
try {
reply.resolve(accountService.findById(id));
} catch(EntityNotFoundException e) {
reply.reject(e.getMessage());
}
});
});
Client
cettia.open("http://localhost:8080/cettia")
.on("open", function(data) {
this.send("/account/find", "flowersinthesand", function(data) {
console.log("fulfilled", data);
}, function(data) {
console.log("rejected", data);
});
});
The server sends an event attaching callbacks and the client executes one of them with the result of event processing.
Server
server.onsocket((ServerSocket socket) -> {
socket.onopen((Void v) -> {
socket.send("/account/find", "flowersinthesand",
(Map<String, Object> data) -> System.out.println("fulfilled " + data),
(String data) -> System.out.println("rejected " + data));
});
});
Client
cettia.open("http://localhost:8080/cettia")
.on("/account/find", function(id, reply) {
console.log(id);
try {
reply.resolve(accountService.findById(id));
} catch(e) {
reply.reject(e.message);
}
});
Accessing underlying objects
In any case, transport underlies socket and resource like HTTP request-response exchange and WebSocket underlies transport. To access such underlying objects like HTTP session, use unwrap(Class<?> clazz)
.
Note
- Don’t manipulate returned objects unless you know what you are doing.
Integration
Here is how to integrate cettia application with awesome technologies.
Dependency injection
With dependency injection, you can inject a server wherever you need. Register a Server
as a singleton component and inject it wherever you want to handle socket.
Examples
Dealing with a server as a component using Spring.
@WebListener
public class Bootstrap implements ServletContextListener {
@Override
@SuppressWarnings("resource")
public void contextInitialized(ServletContextEvent event) {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(SpringConfig.class);
Server server = applicationContext.getBean(Server.class);
// ... skipped
}
@Override
public void contextDestroyed(ServletContextEvent sce) {}
}
@Configuration
@EnableScheduling
@ComponentScan(basePackages = { "simple" })
public class SpringConfig {
// Registers the server as a component
@Bean
public Server server() {
return new DefaultServer();
}
}
@Component
public class Clock {
// Injects the server
@Autowired
private Server server;
@Scheduled(fixedRate = 3000)
public void tick() {
server.all().send("chat", "tick: " + System.currentTimeMillis());
}
}
Clustering
All of the Message Oriented Middleware (MOM) supporting publish and subscribe model can be used to cluster multiple cettia applications with ClusteredServer
. ClusteredServer
intercepts a method invocation to all
and byTag
, converts the call into a message and executes actions added via onpublish(Action<Map<String, Object>> action)
with that message.
All you need is to add an action to onpublish(Action<Map<String, Object>> action)
to publish message to all servers in the cluster including the one issued and to pass them to messageAction().on(Map<String, Object> message)
when receiving such messages from other server.
Note
- Most MOMs in Java require message to be serialized. In other words,
Action
instance used inall
andbyTag
(notonsocket
) should implementSerializable
. WhereasAction
is generally used as anonymous class,Serializable
can’t be used in that manner. Therefore always useSentence
instead ofAction
if possible especially in this case. However, Java 8’s lambda has no such issues thanks to additional bound. For example, you can use a lambda likeserver.all((Action<ServerSocket> & Serializable) socket -> socket.send("chat", "Hi"))
.
Examples
Hazelcast example.
@WebListener
public class Bootstrap implements ServletContextListener {
@Override
public void contextInitialized(ServletContextEvent event) {
ClusteredServer server = new ClusteredServer();
HazelcastInstance hazelcast = HazelcastInstanceFactory.newHazelcastInstance(new Config());
ITopic<Map<String, Object>> topic = hazelcast.getTopic("cettia");
// Some server in the cluster published a message
// Pass it to this local server
topic.addMessageListener((Message<Map<String, Object>> message) -> server.messageAction().on(message.getMessageObject()));
// This local server got a method call from 'all' or 'byTag' and created a message
// Publish it to every server in the cluster
server.onpublish((Map<String, Object> message) -> topic.publish(message));
// ... skipped
}
@Override
public void contextDestroyed(ServletContextEvent sce) {}
}