Node Tuts

Detach script window ⇗

You can navigate the video and the script by using the ← and → cursor keys.

TCP

On this episode we'll learn how we can:

Creating a TCP Server

In Node, the TCP functionality resides inside the net module. Let's require it:

var net = require('net');

Now let's create a TCP server:

var server = net.createServer();

The server object is an event emitter which emits the connection event every time a client connects:

server.on('connection', function(socket) {
  console.log('got a new connection');
});

Now we can make the server listen on a specific port:

server.listen(4000);

Creating a Logs Server

Say that we want to build a logging server and want to persist everything that every user sends to the server to the server.

We'd need to create a file write stream to persist the logs:

var net = require('net');
var file = require('fs').createWriteStream(__dirname + '/logs.json');

var server = net.createServer();
server.on('connection', function(socket) {
  socket.pipe(file);
});

server.listen(4000);

Let's try to start and then use the server:

$ node server.js&
$ nc localhost 4000
dsadsáda
dsadsádad
andasd
^C
$ nc localhost 4000
dsadsáda
dsadsádad
andasd
^C

events.js:71
        throw arguments[1]; // Unhandled 'error' event
                       ^
Error: EBADF, close

Here we could see that we are having an error because we are trying to close the file twice.

It happens that pipe closes the destination stream if the source stream ends. That's not really what we want in this case, so let's tell pipe that:

   socket.pipe(file, {end: false});

Now we can see that, no matter how many times we connect and disconnect, the file stays open.

Another problem that we have is that the log.json file gets overwritten evey time we start our server.

What we want is for the file to be appended:

var file = require('fs').createWriteStream(__dirname + '/logs.json', {flags: 'a'});

Using JSONStream

Now we don't want to write unstructured data, we want the clients of this log server to write in the JSON format.

For this we we'll use the json-stream third-party module:

https://npmjs.org/package/json-stream

$ npm install json-stream

Let's then use json-stream to parse the input stream:

var JSONStream = require('json-stream');

server.on('connection', function(socket) {

  var jsonStream = new JSONStream();

  socket
    .pipe(jsonStream)
    .pipe(file, {end: false});
});

Let's test the server:

$ kill %1
$ rm logs.json
$ node server.js&
$ nc localhost 4000
{"a": 1}
{"b": 2}
^C

Let's now inspect the contents of the file:

cat log.json
[Object][Object]

Oops! Two problems here: 1) looks like objects in Javascript are being printed out as [Object] and 2) they're not new-line separated.

Let's solve this by using a very helpful module for simply creating your own streams: Dominic Tarr's event-stream:

$ npm install event-stream

Let's use it to create a through stream that maps a JS object into a the serialized representation followed by a new-line character:

var es = require('event-stream');

var server = net.createServer();

var stringifier = es.mapSync(function(data) {
  return JSON.stringify(data) + "\n";
});

stringifier.pipe(file);

server.on('connection', function(socket) {

  socket
    .pipe(jsonStream)
    .pipe(stringifier, {end: false});
});

Let's try this now:

$ kill %1
$ rm logs.json
$ node server.js&
$ nc localhost 4000
{"a": 1}
{"b": 2}
{"c": 3}
^C
$ cat logs.json
{"a":1}
{"b":2}
{"c":3}

Nice!

Piping out

Now we also want the logger to output the stream of logs in real-time to every connecting user, but we want that stream to be controllable:

Now that we have a part of our process that is the write TCP server, we need a read TCP server. The read will use the write output to pipe the data into the read server clients.

First, let's separate the write server part into a file named write_server.js:

var net = require('net');
var file = require('fs').createWriteStream(__dirname + '/logs.json', {flags: 'a'});
var JSONStream = require('json-stream');
var es = require('event-stream');

function identity(o) {
  return o;
}

var source = es.mapSync(identity);

var stringifier = es.mapSync(function(data) {
  return JSON.stringify(data) + "\n";
});

source
  .pipe(stringifier)
  .pipe(file);

var server = net.createServer();
server.on('connection', function(socket) {

  var jsonStream = new JSONStream();

  socket
    .pipe(jsonStream)
    .pipe(source, {end: false})
    ;
  ;

});

server.listen(4000);

module.exports = source;

This new module now exports a source for log events.

Now, let's create the the read server on a file named read_server.js. This module will export an initialization function that obtains the event source:

var net = require('net');

module.exports = function(source) {
  var server = net.createServer();

  server.on('connection', function(socket) {

  });

  server.listen(4001);
}

First we'll need to parse commands, which will activate some actions. These commands will come in the form of JSON:

var net = require('net');
var JSONStream = require('json-stream');


module.exports = function(source) {
  var server = net.createServer();

  server.on('connection', function(socket) {
    var jsonStream = new JSONStream();

    jsonStream.on('data', function(command) {
      var action = command.action;
      if (action && actions[action]) actions[action](command);
    });

    socket.pipe(jsonStream);
  });

  server.listen(4001);
}

Now we need a actions object with all available actions:

var net = require('net');
var JSONStream = require('json-stream');
var es = require('event-stream');

module.exports = function(source) {
  var server = net.createServer();

  server.on('connection', function(socket) {
    var started = false;

    var stringifier = es.mapSync(function(data) {
      return JSON.stringify(data) + "\n";
    });


    function end() {

    }

    var actions = {
      start: function(command) {
        if (started) return;

        source
          .pipe(stringifier)
          .pipe(socket);

        started = true;
      },

      stop: end
    };

    var jsonStream = new JSONStream();

    jsonStream.on('data', function(command) {
      var action = command.action;
      if (action && actions[action]) actions[action](command);
    });

    socket.pipe(jsonStream);

    socket.on('end', end);
  });

  server.listen(4001);
}

This code makes use of the source stream, piping it to the clients as needed.

Now we need to implement the end() function, which means to be able to end the stream, which means that we should be able to unpipe.

It turns out that unpiping is not trivial, but there is a third-party module named pup that makes it easy.

$ npm install pup

Then we need to add the require on the top:

var pup = require('pup');

Then we need to make the following changes:

    function end() {
      pup.unpipe(source, stringifier);
      pup.unpipe(stringifier, socket);
      started = false;
    }

    var actions = {
      start: function(command) {
        if (started) return;

        pup.pipe(source, stringifier);
        pup.pipe(stringifier, socket);

        started = true;
      },

      stop: end
    };

Filter the Stream

Now we want to allow the client to define a function that will be used to filter the log stream coming into the client. We can allow to pass a function that will be used to filter the event at the source.

For this we need to make the following changes to the read server:

var net = require('net');
var JSONStream = require('json-stream');
var es = require('event-stream');
var pup = require('pup');

function alwaysReturnTrue() {
  return true;
}

module.exports = function(source) {
  var server = net.createServer();

  server.on('connection', function(socket) {
    var started = false;
    var filter;

    var stringifier = es.mapSync(function(data) {
      return JSON.stringify(data) + "\n";
    });


    function end() {
      pup.unpipe(source, filter);
      pup.unpipe(filter, stringifier);
      pup.unpipe(stringifier, socket);
      started = false;
    }

    var actions = {
      start: function(command) {
        if (started) return;

        if (command.filter) {
          command.filter = new Function('ev', command.filter);
        }
        if (! command.filter || typeof command.filter != 'function') {
          command.filter = alwaysReturnTrue;
        }

        filter = es.mapSync(function(event) {
          if (command.filter(event)) return event;
        });

        pup.pipe(source, filter);
        pup.pipe(filter, stringifier);
        pup.pipe(stringifier, socket);

        started = true;
      },

      stop: end
    };

    var jsonStream = new JSONStream();

    jsonStream.on('data', function(command) {
      var action = command.action;
      if (action && actions[action]) actions[action](command);
    });

    socket.pipe(jsonStream);

    socket.on('end', end);
  });

  server.listen(4001);
}

Now we can test the server:

$ kill %1
$ node server&
$ nc localhost 4000

And on another window we launch the client:

$ nc localhost 4001

On the client we can filter the events by providing the body of a function in the filter argument. For instance, to get only those events that have an a attribute that starts by the letter "B":

{"action":"start", "filter": "return ev.a && ev.a.charAt(0) == 'B'"}