Separate IRC line parsing and socket onData parsing
[KiwiIRC.git] / server / irc / connection.js
index a7202bd64f93d83ed215636c13306e64fbf5a3b2..8b1d3ba72a0f7fb24ff54c2cfb3f5d2e93843088 100644 (file)
@@ -1,9 +1,11 @@
 var net             = require('net'),
     tls             = require('tls'),
     util            = require('util'),
+    dns             = require('dns'),
     _               = require('lodash'),
     EventBinder     = require('./eventbinder.js'),
     IrcServer       = require('./server.js'),
+    IrcCommands     = require('./commands.js'),
     IrcChannel      = require('./channel.js'),
     IrcUser         = require('./user.js'),
     EE              = require('../ee.js'),
@@ -21,7 +23,7 @@ if (version_values[1] >= 10) {
     Socks = require('socksjs');
 }
 
-var IrcConnection = function (hostname, port, ssl, nick, user, pass, state) {
+var IrcConnection = function (hostname, port, ssl, nick, user, options, state, con_num) {
     var that = this;
 
     EE.call(this,{
@@ -30,9 +32,20 @@ var IrcConnection = function (hostname, port, ssl, nick, user, pass, state) {
     });
     this.setMaxListeners(0);
 
+    options = options || {};
+
     // Socket state
     this.connected = false;
 
+    // IRCd write buffers (flood controll)
+    this.write_buffer = [];
+
+    // In process of writing the buffer?
+    this.writing_buffer = false;
+
+    // Max number of lines to write a second
+    this.write_buffer_lines_second = 2;
+
     // If registeration with the IRCd has completed
     this.registered = false;
 
@@ -43,11 +56,22 @@ var IrcConnection = function (hostname, port, ssl, nick, user, pass, state) {
     this.nick = nick;
     this.user = user;  // Contains users real hostname and address
     this.username = this.nick.replace(/[^0-9a-zA-Z\-_.\/]/, '');
-    this.password = pass;
+    this.password = options.password || '';
+
+    // Set the passed encoding. or the default if none giving or it fails
+    if (!options.encoding || !this.setEncoding(options.encoding)) {
+        this.setEncoding(global.config.default_encoding);
+    }
 
     // State object
     this.state = state;
 
+    // Connection ID in the state
+    this.con_num = con_num;
+
+    // IRC protocol handling
+    this.irc_commands = new IrcCommands(this);
+
     // IrcServer object
     this.server = new IrcServer(this, hostname, port);
 
@@ -86,15 +110,9 @@ var IrcConnection = function (hostname, port, ssl, nick, user, pass, state) {
 
     // Buffers for data sent from the IRCd
     this.hold_last = false;
-    this.held_data = '';
+    this.held_data = null;
 
     this.applyIrcEvents();
-
-    // Call any modules before making the connection
-    global.modules.emit('irc connecting', {connection: this})
-        .done(function () {
-            that.connect();
-        });
 };
 util.inherits(IrcConnection, EE);
 
@@ -125,63 +143,126 @@ IrcConnection.prototype.applyIrcEvents = function () {
  */
 IrcConnection.prototype.connect = function () {
     var that = this;
-    var socks;
 
     // The socket connect event to listener for
     var socket_connect_event_name = 'connect';
 
+    // The destination address
+    var dest_addr = this.socks ?
+        this.socks.host :
+        this.irc_host.hostname;
 
     // Make sure we don't already have an open connection
     this.disposeSocket();
 
-    // Are we connecting through a SOCKS proxy?
-    if (this.socks) {
-        this.socket = Socks.connect({
-            host: this.irc_host.hostname,
-            port: this.irc_host.port,
-            ssl: this.ssl,
-            rejectUnauthorized: global.config.reject_unauthorised_certificates
-        }, {host: this.socks.host,
-            port: this.socks.port,
-            user: this.socks.user,
-            pass: this.socks.pass
-        });
+    // Get the IP family for the dest_addr (either socks or IRCd destination)
+    getConnectionFamily(dest_addr, function getConnectionFamilyCb(err, family, host) {
+        var outgoing;
+
+        // Decide which net. interface to make the connection through
+        if (global.config.outgoing_address) {
+            if ((family === 'IPv6') && (global.config.outgoing_address.IPv6)) {
+                outgoing = global.config.outgoing_address.IPv6;
+            } else {
+                outgoing = global.config.outgoing_address.IPv4 || '0.0.0.0';
+
+                // We don't have an IPv6 interface but dest_addr may still resolve to
+                // an IPv4 address. Reset `host` and try connecting anyway, letting it
+                // fail if an IPv4 resolved address is not found
+                host = dest_addr;
+            }
 
-    } else if (this.ssl) {
-        this.socket = tls.connect({
-            host: this.irc_host.hostname,
-            port: this.irc_host.port,
-            rejectUnauthorized: global.config.reject_unauthorised_certificates
-        });
+            // If we have an array of interfaces, select a random one
+            if (typeof outgoing !== 'string' && outgoing.length) {
+                outgoing = outgoing[Math.floor(Math.random() * outgoing.length)];
+            }
 
-        socket_connect_event_name = 'secureConnect';
+            // Make sure we have a valid interface address
+            if (typeof outgoing !== 'string')
+                outgoing = '0.0.0.0';
 
-    } else {
-        this.socket = net.connect({
-            host: this.irc_host.hostname,
-            port: this.irc_host.port
+        } else {
+            // No config was found so use the default
+            outgoing = '0.0.0.0';
+        }
+
+        // Are we connecting through a SOCKS proxy?
+        if (this.socks) {
+            that.socket = Socks.connect({
+                host: host,
+                port: that.irc_host.port,
+                ssl: that.ssl,
+                rejectUnauthorized: global.config.reject_unauthorised_certificates
+            }, {host: that.socks.host,
+                port: that.socks.port,
+                user: that.socks.user,
+                pass: that.socks.pass,
+                localAddress: outgoing
+            });
+
+        } else {
+            // No socks connection, connect directly to the IRCd
+
+            if (that.ssl) {
+                that.socket = tls.connect({
+                    host: host,
+                    port: that.irc_host.port,
+                    rejectUnauthorized: global.config.reject_unauthorised_certificates,
+                    localAddress: outgoing
+                });
+
+                // We need the raw socket connect event
+                that.socket.socket.on('connect', function() { rawSocketConnect.call(that, this); });
+
+                socket_connect_event_name = 'secureConnect';
+
+            } else {
+                that.socket = net.connect({
+                    host: host,
+                    port: that.irc_host.port,
+                    localAddress: outgoing
+                });
+            }
+        }
+
+        // Apply the socket listeners
+        that.socket.on(socket_connect_event_name, function socketConnectCb() {
+
+            // TLS connections have the actual socket as a property
+            var is_tls = (typeof this.socket !== 'undefined') ?
+                true :
+                false;
+
+            // TLS sockets have already called this
+            if (!is_tls)
+                rawSocketConnect.call(that, this);
+
+            that.connected = true;
+
+            socketConnectHandler.call(that);
         });
-    }
 
-    this.socket.on(socket_connect_event_name, function () {
-        that.connected = true;
-        socketConnectHandler.call(that);
-    });
+        that.socket.on('error', function socketErrorCb(event) {
+            that.emit('error', event);
+        });
 
-    this.socket.on('error', function (event) {
-        that.emit('error', event);
-    });
+        that.socket.on('data', function () {
+            socketOnData.apply(that, arguments);
+        });
 
-    this.socket.on('data', function () {
-        parse.apply(that, arguments);
-    });
+        that.socket.on('close', function socketCloseCb(had_error) {
+            that.connected = false;
 
-    this.socket.on('close', function (had_error) {
-        that.connected = false;
-        that.emit('close');
+            // Remove this socket form the identd lookup
+            if (that.identd_port_pair) {
+                delete global.clients.port_pairs[that.identd_port_pair];
+            }
+
+            that.emit('close');
 
-        // Close the whole socket down
-        that.disposeSocket();
+            // Close the whole socket down
+            that.disposeSocket();
+        });
     });
 };
 
@@ -195,21 +276,83 @@ IrcConnection.prototype.clientEvent = function (event_name, data, callback) {
 
 /**
  * Write a line of data to the IRCd
+ * @param data The line of data to be sent
+ * @param force Write the data now, ignoring any write queue
  */
-IrcConnection.prototype.write = function (data, callback) {
+IrcConnection.prototype.write = function (data, force) {
     //ENCODE string to encoding of the server
-    encoded_buffer = iconv.encode(data + '\r\n', "windows-1252");
-    this.socket.write(encoded_buffer);
+    encoded_buffer = iconv.encode(data + '\r\n', this.encoding);
+
+    if (force) {
+        this.socket.write(encoded_buffer);
+        return;
+    }
+
+    this.write_buffer.push(encoded_buffer);
+
+    // Only flush if we're not writing already
+    if (!this.writing_buffer)
+        this.flushWriteBuffer();
+};
+
+
+
+/**
+ * Flush the write buffer to the server in a throttled fashion
+ */
+IrcConnection.prototype.flushWriteBuffer = function () {
+
+    // In case the socket closed between writing our queue.. clean up
+    if (!this.connected) {
+        this.write_buffer = [];
+        this.writing_buffer = false;
+        return;
+    }
+
+    this.writing_buffer = true;
+
+    // Disabled write buffer? Send everything we have
+    if (!this.write_buffer_lines_second) {
+        this.write_buffer.forEach(function(buffer, idx) {
+            this.socket.write(buffer);
+            this.write_buffer = null;
+        });
+
+        this.write_buffer = [];
+        this.writing_buffer = false;
+
+        return;
+    }
+
+    // Nothing to write? Stop writing and leave
+    if (this.write_buffer.length === 0) {
+        this.writing_buffer = false;
+        return;
+    }
+
+    this.socket.write(this.write_buffer[0]);
+    this.write_buffer = this.write_buffer.slice(1);
+
+    // Call this function again at some point if we still have data to write
+    if (this.write_buffer.length > 0) {
+        setTimeout(this.flushWriteBuffer.bind(this), 1000 / this.write_buffer_lines_second);
+    } else {
+        // No more buffers to write.. so we've finished
+        this.writing_buffer = false;
+    }
 };
 
 
 
 /**
- * Close the connection to the IRCd after sending one last line
+ * Close the connection to the IRCd after forcing one last line
  */
 IrcConnection.prototype.end = function (data, callback) {
+    if (!this.socket)
+        return;
+
     if (data)
-        this.write(data);
+        this.write(data, true);
 
     this.socket.end();
 };
@@ -220,6 +363,17 @@ IrcConnection.prototype.end = function (data, callback) {
  * Clean up this IrcConnection instance and any sockets
  */
 IrcConnection.prototype.dispose = function () {
+    // If we're still connected, wait until the socket is closed before disposing
+    // so that all the events are still correctly triggered
+    if (this.socket && this.connected) {
+        this.end();
+        return;
+    }
+
+    if (this.socket) {
+        this.disposeSocket();
+    }
+
     _.each(this.irc_users, function (user) {
         user.dispose();
     });
@@ -232,9 +386,10 @@ IrcConnection.prototype.dispose = function () {
     this.server.dispose();
     this.server = undefined;
 
+    this.irc_commands = undefined;
+
     EventBinder.unbindIrcEvents('', this.irc_events, this);
 
-    this.disposeSocket();
     this.removeAllListeners();
 };
 
@@ -251,6 +406,52 @@ IrcConnection.prototype.disposeSocket = function () {
     }
 };
 
+/**
+ * Set a new encoding for this connection
+ * Return true in case of success
+ */
+
+IrcConnection.prototype.setEncoding = function (encoding) {
+    var encoded_test;
+
+    try {
+        encoded_test = iconv.encode("TEST", encoding);
+        //This test is done to check if this encoding also supports
+        //the ASCII charset required by the IRC protocols
+        //(Avoid the use of base64 or incompatible encodings)
+        if (encoded_test == "TEST") {
+            this.encoding = encoding;
+            return true;
+        }
+        return false;
+    } catch (err) {
+        return false;
+    }
+};
+
+function getConnectionFamily(host, callback) {
+    if (net.isIP(host)) {
+        if (net.isIPv4(host)) {
+            callback(null, 'IPv4', host);
+        } else {
+            callback(null, 'IPv6', host);
+        }
+    } else {
+        dns.resolve6(host, function resolve6Cb(err, addresses) {
+            if (!err) {
+                callback(null, 'IPv6', addresses[0]);
+            } else {
+                dns.resolve4(host, function resolve4Cb(err, addresses) {
+                    if (!err) {
+                        callback(null, 'IPv4',addresses[0]);
+                    } else {
+                        callback(err);
+                    }
+                });
+            }
+        });
+    }
+}
 
 
 function onChannelJoin(event) {
@@ -328,6 +529,19 @@ function onUserKick(event){
 
 
 
+/**
+ * When a socket connects to an IRCd
+ * May be called before any socket handshake are complete (eg. TLS)
+ */
+var rawSocketConnect = function(socket) {
+    // Make note of the port numbers for any identd lookups
+    // Nodejs < 0.9.6 has no socket.localPort so check this first
+    if (typeof socket.localPort != 'undefined') {
+        this.identd_port_pair = socket.localPort.toString() + '_' + socket.remotePort.toString();
+        global.clients.port_pairs[this.identd_port_pair] = this;
+    }
+};
+
 
 /**
  * Handle the socket connect event, starting the IRCd registration
@@ -347,7 +561,7 @@ var socketConnectHandler = function () {
     // Let the webirc/etc detection modify any required parameters
     connect_data = findWebIrc.call(this, connect_data);
 
-    global.modules.emit('irc authorize', connect_data).done(function () {
+    global.modules.emit('irc authorize', connect_data).done(function ircAuthorizeCb() {
         // Send any initial data for webirc/etc
         if (connect_data.prepend_data) {
             _.each(connect_data.prepend_data, function(data) {
@@ -392,7 +606,7 @@ function findWebIrc(connect_data) {
     // Check if we need to pass the users IP as its username/ident
     if (ip_as_username && ip_as_username.indexOf(this.irc_host.hostname) > -1) {
         // Get a hex value of the clients IP
-        this.username = this.user.address.split('.').map(function(i, idx){
+        this.username = this.user.address.split('.').map(function ipSplitMapCb(i, idx){
             var hex = parseInt(i, 10).toString(16);
 
             // Pad out the hex value if it's a single char
@@ -408,6 +622,76 @@ function findWebIrc(connect_data) {
 }
 
 
+/**
+ * Buffer any data we get from the IRCd until we have complete lines.
+ */
+function socketOnData(data) {
+    var data_pos,               // Current position within the data Buffer
+        line_start = 0,
+        lines = [],
+        line = '',
+        max_buffer_size = 1024; // 1024 bytes is the maximum length of two RFC1459 IRC messages.
+                                // May need tweaking when IRCv3 message tags are more widespread
+
+    // Split data chunk into individual lines
+    for (data_pos = 0; data_pos < data.length; data_pos++) {
+        if (data[data_pos] === 0x0A) { // Check if byte is a line feed
+            lines.push(data.slice(line_start, data_pos));
+            line_start = data_pos + 1;
+        }
+    }
+
+    // No complete lines of data? Check to see if buffering the data would exceed the max buffer size
+    if (!lines[0]) {
+        if ((this.held_data ? this.held_data.length : 0 ) + data.length > max_buffer_size) {
+            // Buffering this data would exeed our max buffer size
+            this.emit('error', 'Message buffer too large');
+            this.socket.destroy();
+
+        } else {
+
+            // Append the incomplete line to our held_data and wait for more
+            if (this.held_data) {
+                this.held_data = Buffer.concat([this.held_data, data], this.held_data.length + data.length);
+            } else {
+                this.held_data = data;
+            }
+        }
+
+        // No complete lines to process..
+        return;
+    }
+
+    // If we have an incomplete line held from the previous chunk of data
+    // merge it with the first line from this chunk of data
+    if (this.hold_last && this.held_data !== null) {
+        lines[0] = Buffer.concat([this.held_data, lines[0]], this.held_data.length + lines[0].length);
+        this.hold_last = false;
+        this.held_data = null;
+    }
+
+    // If the last line of data in this chunk is not complete, hold it so
+    // it can be merged with the first line from the next chunk
+    if (line_start < data_pos) {
+        if ((data.length - line_start) > max_buffer_size) {
+            // Buffering this data would exeed our max buffer size
+            this.emit('error', 'Message buffer too large');
+            this.socket.destroy();
+            return;
+        }
+
+        this.hold_last = true;
+        this.held_data = new Buffer(data.length - line_start);
+        data.copy(this.held_data, 0, line_start);
+    }
+
+    // Process our data line by line
+    for (i = 0; i < lines.length; i++)
+        parseIrcLine.call(this, lines[i]);
+
+}
+
+
 
 /**
  * The regex that parses a line of data from the IRCd
@@ -416,66 +700,47 @@ function findWebIrc(connect_data) {
  */
 var parse_regex = /^(?:(?:(?:(@[^ ]+) )?):(?:([a-z0-9\x5B-\x60\x7B-\x7D\.\-*]+)|([a-z0-9\x5B-\x60\x7B-\x7D\.\-*]+)!([^\x00\r\n\ ]+?)@?([a-z0-9\.\-:\/_]+)?) )?(\S+)(?: (?!:)(.+?))?(?: :(.+))?$/i;
 
-var parse = function (data) {
-    var i,
-        msg,
-        msg2,
-        trm,
-        j,
+function parseIrcLine(buffer_line) {
+    var msg,
+        i, j,
         tags = [],
-        tag;
+        tag,
+        line = '';
 
-    //DECODE server encoding 
-    data = iconv.decode(data, 'windows-1252');
-    console.log(data);
-    if (this.hold_last && this.held_data !== '') {
-        data = this.held_data + data;
-        this.hold_last = false;
-        this.held_data = '';
-    }
+    // Decode server encoding
+    line = iconv.decode(buffer_line, this.encoding);
+    if (!line) return;
 
-    // If the last line is incomplete, hold it until we have more data
-    if (data.substr(-1) !== '\n') {
-        this.hold_last = true;
+    // Parse the complete line, removing any carriage returns
+    msg = parse_regex.exec(line.replace(/^\r+|\r+$/, ''));
+
+    if (!msg) {
+        // The line was not parsed correctly, must be malformed
+        console.log("Malformed IRC line: " + line.replace(/^\r+|\r+$/, ''));
+        return;
     }
 
-    // Process our data line by line
-    data = data.split("\n");
-    for (i = 0; i < data.length; i++) {
-        if (!data[i]) break;
-
-        // If flagged to hold the last line, store it and move on
-        if (this.hold_last && (i === data.length - 1)) {
-            this.held_data = data[i];
-            break;
-        }
-        
-        // Parse the complete line, removing any carriage returns
-        msg = parse_regex.exec(data[i].replace(/^\r+|\r+$/, ''));
-
-        if (msg) {
-            if (msg[1]) {
-                tags = msg[1].split(';');
-                for (j = 0; j < tags.length; j++) {
-                    tag = tags[j].split('=');
-                    tags[j] = {tag: tag[0], value: tag[1]};
-                }
-            }
-            msg = {
-                tags:       tags,
-                prefix:     msg[2],
-                nick:       msg[3],
-                ident:      msg[4],
-                hostname:   msg[5] || '',
-                command:    msg[6],
-                params:     msg[7] || '',
-                trailing:   (msg[8]) ? msg[8].trim() : ''
-            };
-            msg.params = msg.params.split(' ');
-            this.irc_commands.dispatch(msg.command.toUpperCase(), msg);
-        } else {
-            // The line was not parsed correctly, must be malformed
-            console.log("Malformed IRC line: " + data[i].replace(/^\r+|\r+$/, ''));
+    // Extract any tags (msg[1])
+    if (msg[1]) {
+        tags = msg[1].split(';');
+
+        for (j = 0; j < tags.length; j++) {
+            tag = tags[j].split('=');
+            tags[j] = {tag: tag[0], value: tag[1]};
         }
     }
-};
+
+    msg = {
+        tags:       tags,
+        prefix:     msg[2],
+        nick:       msg[3],
+        ident:      msg[4],
+        hostname:   msg[5] || '',
+        command:    msg[6],
+        params:     msg[7] || '',
+        trailing:   (msg[8]) ? msg[8].trim() : ''
+    };
+
+    msg.params = msg.params.split(' ');
+    this.irc_commands.dispatch(msg.command.toUpperCase(), msg);
+}