var stream = require('stream'),
- util = require('util'),
- net = require("net"),
- tls = require("tls"),
+ util = require('util'),
+ net = require("net"),
+ tls = require("tls"),
Identd = require('./identd');
}
// Socket connection responses
-var RESPONSE_ERROR = '0';
-var RESPONSE_OK = '1';
-var RESPONSE_ECONNRESET = '2';
-var RESPONSE_ECONNREFUSED = '3';
-var RESPONSE_ENOTFOUND = '4';
-var RESPONSE_ETIMEDOUT = '5';
+var RESPONSE_ERROR = '0';
+var RESPONSE_OK = '1';
+var RESPONSE_ECONNRESET = '2';
+var RESPONSE_ECONNREFUSED = '3';
+var RESPONSE_ENOTFOUND = '4';
+var RESPONSE_ETIMEDOUT = '5';
*/
function ProxyPipe(kiwi_socket) {
this.kiwi_socket = kiwi_socket;
- this.irc_socket = null;
- this.buffer = '';
- this.meta = null;
+ this.irc_socket = null;
+ this.buffer = '';
+ this.meta = null;
- kiwi_socket.on('data', this.kiwiSocketOnData.bind(this));
+ kiwi_socket.setEncoding('utf8');
+ kiwi_socket.on('readable', this.kiwiSocketOnReadable.bind(this));
}
};
-ProxyPipe.prototype.kiwiSocketOnData = function(data) {
- var meta;
+ProxyPipe.prototype.kiwiSocketOnReadable = function() {
+ var chunk, meta;
- this.buffer += data.toString();
+ while ((chunk = this.kiwi_socket.read()) !== null) {
+ this.buffer += chunk;
+ }
// Not got a complete line yet? Wait some more
- if (this.buffer.substr(-1) !== '\n')
+ if (this.buffer.indexOf('\n') === -1)
return;
try {
- meta = JSON.parse(this.buffer);
+ meta = JSON.parse(this.buffer.substr(0, this.buffer.indexOf('\n')));
} catch (err) {
this.destroy();
return;
this.buffer = '';
this.meta = meta;
- this.kiwi_socket.removeAllListeners('data');
+ this.kiwi_socket.removeAllListeners('readable');
this.makeIrcConnection();
};
ProxyPipe.prototype._onSocketConnect = function() {
- // Socket has connected, return no error to the kiwi server
debug('[KiwiProxy] ProxyPipe::_onSocketConnect()');
+
+ // Now that we're connected to the detination, return no
+ // error back to the kiwi server and start piping
this.kiwi_socket.write(new Buffer(RESPONSE_OK.toString()), this.startPiping.bind(this));
};
};
+ProxyPipe.prototype._onSocketClose = function() {
+ debug('[KiwiProxy] IRC Socket closed');
+ this.destroy();
+};
+
+
ProxyPipe.prototype.startPiping = function() {
debug('[KiwiProxy] ProxyPipe::startPiping()');
+
+ // Let the piping handle socket closures
+ this.irc_socket.removeAllListeners('error');
+ this.irc_socket.removeAllListeners('timeout');
+
+ this.irc_socket.on('close', this._onSocketClose.bind(this));
+
+ this.kiwi_socket.setEncoding('binary');
+
this.kiwi_socket.pipe(this.irc_socket);
this.irc_socket.pipe(this.kiwi_socket);
};
stream.Duplex.call(this);
this.connected_fn = null;
- this.proxy_addr = proxy_addr;
- this.proxy_port = proxy_port;
- this.meta = meta || {};
+ this.proxy_addr = proxy_addr;
+ this.proxy_port = proxy_port;
+ this.meta = meta || {};
this.state = 'disconnected';
}
this.socket.removeAllListeners();
this.socket.destroy();
delete this.socket;
+
+ debug('[KiwiProxy] Destroying socket');
};
if (this.state === 'connected' && this.socket) {
while ((data = this.socket.read()) !== null) {
- if ((this.push(data)) === false) {
+ if (this.push(data) === false) {
break;
}
}
if (this.state === 'connected' && this.socket) {
return this.socket.write(chunk, encoding, callback);
} else {
- callback("Not connected");
+ callback('Not connected');
}
};
error_code,
error_codes = {};
- error_codes[RESPONSE_ERROR] = 'ERROR';
- error_codes[RESPONSE_ECONNRESET] = 'ECONNRESET';
+ error_codes[RESPONSE_ERROR] = 'ERROR';
+ error_codes[RESPONSE_ECONNRESET] = 'ECONNRESET';
error_codes[RESPONSE_ECONNREFUSED] = 'ECONNREFUSED';
- error_codes[RESPONSE_ENOTFOUND] = 'ENOTFOUND';
- error_codes[RESPONSE_ETIMEDOUT] = 'ETIMEDOUT';
+ error_codes[RESPONSE_ENOTFOUND] = 'ENOTFOUND';
+ error_codes[RESPONSE_ETIMEDOUT] = 'ETIMEDOUT';
debug('[KiwiProxy] Recieved socket status: ' + data.toString());
if (status === RESPONSE_OK) {
ProxySocket.prototype._onSocketError = function(err) {
+ this.ignore_close = true;
this.emit('error', err);
};
\ No newline at end of file