• Jump To … +
    index.js server.js socket.js transport-base-transport.js transport-http-base-transport.js transport-http-longpoll-transport.js transport-http-server.js transport-http-stream-transport.js transport-websocket-server.js transport-websocket-transport.js
  • transport-http-stream-transport.js

  • ¶
    /*
     * Cettia
     * http://cettia.io/projects/cettia-protocol/
     * 
     * Copyright 2015 The Cettia Project
     * Licensed under the Apache License, Version 2.0
     * http://www.apache.org/licenses/LICENSE-2.0
     */
    var events = require("events");
    var url = require("url");
    var http = require("http");
    var createHttpBaseTransport = require("./transport-http-base-transport");
    
    http.globalAgent.maxSockets = Infinity;
  • ¶

    This function is exposed to the module’s transport module’s createHttpStreamTransport as a factory to create a HTTP streaming transport. In streaming, the client performs a HTTP persistent connection and watches changes in response and the server prints chunk as message to response.

    module.exports = function(uri, options) {
        var urlObj = url.parse(uri, true);
  • ¶

    URI’s protocol should be either http or https and transport param should be stream.

        if ((urlObj.protocol === "http:" || urlObj.protocol === "https:") && urlObj.query.transport === "stream") {
  • ¶

    A transport object.

            var self = createHttpBaseTransport(uri, options);
  • ¶

    Any error on request-response should propagate to transport.

            function onerror(error) {
                self.emit("error", error);
            }
    
            var req;
            self.connect = function() {
  • ¶

    Performs a HTTP persistent connection through GET method. when param should be open. In case of Server-Sent Events, sse param should be true.

                req = http.get(uri + "&when=open")
                .on("error", onerror)
  • ¶

    If the underlying connection of request-response was closed, fires close event.

                .on("close", function() {
                    self.emit("close");
                })
                .on("response", function(res) {
  • ¶

    When to fire open event is a first message which is an output of handshaking not when the response is available.

                    var handshaked = false;
  • ¶

    On a message of the event stream format of Server-Sent Events,

                    function onmessage(data) {
                        if (!handshaked) {
                            handshaked = true;
  • ¶

    The handshake output is in the form of URI.

                            var result = url.parse(data, true).query;
  • ¶

    A newly issued id for HTTP transport. It is used to identify which HTTP transport is associated with which HTTP exchange.

                            self.id = result.id;
  • ¶

    And then fire open event.

                            self.emit("open");
                        } else {
  • ¶

    code is a first character of a message and used to recognize that delivered message is text message or binary message.

                            var code = data.substring(0, 1);
                            data = data.substring(1);
                            switch (code) {
  • ¶

    If the code is 1, the remainder of message is a plain text message. Fires text event.

                            case "1":
                                self.emit("text", data);
                                break;
  • ¶

    If the code is 2, the remainder of message is a Base64-encoded binary message. Decodes it in Base64 and fires binary event.

                            case "2":
                                self.emit("binary", new Buffer(data, "base64"));
                                break;
  • ¶

    Otherwise, it is invalid. Fires an error and closes the connection.

                            default:
                                self.emit("error", new Error("protocol"));
                                self.close();
                                break;
                            }
                        }
                    }
  • ¶

    Every chunk may be a single message, multiple messages or a fragment of a single message. This buffer helps handle fragments.

                    var buffer = "";
  • ¶

    Chunks are formatted according to the event stream format. However, you don’t need to know that. A single message starts with ‘data: ‘ and ends with \n\n. That’s all you need to know.

                    res.on("error", onerror).on("data", function(chunk) {
  • ¶

    Strips off the left padding of the chunk that appears in the first chunk.

                        chunk = chunk.toString().replace(/^\s+/, "");
  • ¶

    If the chunk consists of only whitespace characters that is the first chunk padding in the above, there is nothing to do.

                        if (!chunk) {
                            return;
                        }
  • ¶

    Let’s think of a series of the following chunks:

    • "data: {}\n\ndata: {}\n\n"
    • "data: {}\n\ndata: {"
    • "}\n\ndata:{"
    • ".."
    • ".}"
    • "\n\ndata: {}\n\n"

    It looks not easy to handle. So let’s concatenate buffer and chunk. Here the buffer is a string after last \n\n of the concatenation.

    • "" + "data: {}\n\ndata: {}\n\n"
    • "" + "data: {}\n\ndata: {"
    • "data: {" + "}\n\ndata:{"
    • "data: {" + ".."
    • "data: {.." + ".}"
    • "data: {...}" + "\n\ndata: {}\n\n"
  • ¶

    Let’s split the concatenation by \n\n.

                        (buffer + chunk).split("\n\n").forEach(function(line, i, lines) {
  • ¶

    Except the last element, unwraps ‘data: ‘ and fires a message event.

                            if (i < lines.length - 1) {
                                onmessage(line.substring("data: ".length));
                            } else {
  • ¶

    The last element is a fragment of a data which is an incomplete message. Assigns it to buffer.

                                buffer = line;
                            }
                        });
                    })
  • ¶

    The end of response corresponds to the close of transport.

                    .on("end", function() {
                        self.emit("close");
                    });
                });
            };
            self.abort = function() {
  • ¶

    Aborts the current request. The rest of work, firing the close event, will be done by res‘s end event handler.

                req.abort();
            };
            return self;
        }
    };