1 var stream
= require('stream'),
2 util
= require('util'),
3 events
= require('events'),
10 ProxyServer
: ProxyServer
,
11 ProxySocket
: ProxySocket
15 //console.log.apply(console, arguments);
18 // Socket connection responses
19 var RESPONSE_ERROR
= '0';
20 var RESPONSE_OK
= '1';
21 var RESPONSE_ECONNRESET
= '2';
22 var RESPONSE_ECONNREFUSED
= '3';
23 var RESPONSE_ENOTFOUND
= '4';
24 var RESPONSE_ETIMEDOUT
= '5';
30 * Listens for connections from a kiwi server, dispatching ProxyPipe
31 * instances for each connection
33 function ProxyServer() {
34 events
.EventEmitter
.call(this);
36 util
.inherits(ProxyServer
, events
.EventEmitter
);
39 ProxyServer
.prototype.listen = function(listen_port
, listen_addr
, opts
) {
42 connection_event
= 'connection';
49 key
: fs
.readFileSync(opts
.ssl_key
),
50 cert
: fs
.readFileSync(opts
.ssl_cert
)
53 // Do we have an intermediate certificate?
54 if (typeof opts
.ssl_ca
!== 'undefined') {
56 if (typeof opts
.ssl_ca
.map
!== 'undefined') {
57 serv_opts
.ca
= opts
.ssl_ca
.map(function (f
) { return fs
.readFileSync(f
); });
60 serv_opts
.ca
= fs
.readFileSync(opts
.ssl_ca
);
64 this.server
= tls
.createServer(serv_opts
);
66 connection_event
= 'secureConnection';
70 // No SSL, start a simple clear text server
72 this.server
= new net
.Server();
75 this.server
.listen(listen_port
, listen_addr
, function() {
76 that
.emit('listening');
79 this.server
.on(connection_event
, function(socket
) {
80 new ProxyPipe(socket
, that
);
85 ProxyServer
.prototype.close = function(callback
) {
87 return this.server
.close(callback
);
90 if (typeof callback
=== 'function')
99 * Takes connections from a kiwi server, then:
100 * 1. Reads its meta data such as username for identd lookups
101 * 2. Make the connection to the IRC server
102 * 3. Reply to the kiwi server with connection status
103 * 4. If all ok, pipe data between the 2 sockets as a proxy
105 function ProxyPipe(kiwi_socket
, proxy_server
) {
106 debug('[KiwiProxy] New Kiwi connection');
108 this.kiwi_socket
= kiwi_socket
;
109 this.proxy_server
= proxy_server
;
110 this.irc_socket
= null;
114 debug('[KiwiProxy] Setting encoding to utf8');
115 kiwi_socket
.setEncoding('utf8');
116 kiwi_socket
.on('readable', this.kiwiSocketOnReadable
.bind(this));
120 ProxyPipe
.prototype.destroy = function() {
124 if (this.irc_socket
) {
125 this.irc_socket
.destroy();
126 this.irc_socket
.removeAllListeners();
127 this.irc_socket
= null;
130 if (this.kiwi_socket
) {
131 this.kiwi_socket
.destroy();
132 this.kiwi_socket
.removeAllListeners();
133 this.kiwi_socket
= null;
138 ProxyPipe
.prototype.kiwiSocketOnReadable = function() {
141 while ((chunk
= this.kiwi_socket
.read()) !== null) {
142 this.buffer
+= chunk
;
145 // Not got a complete line yet? Wait some more
146 if (this.buffer
.indexOf('\n') === -1)
150 debug('[KiwiProxy] Found a complete line in the buffer');
151 meta
= JSON
.parse(this.buffer
.substr(0, this.buffer
.indexOf('\n')));
153 debug('[KiwiProxy] Error parsing meta');
158 if (!meta
.username
) {
159 debug('[KiwiProxy] Meta does not contain a username');
166 this.kiwi_socket
.removeAllListeners('readable');
168 this.makeIrcConnection();
172 ProxyPipe
.prototype.makeIrcConnection = function() {
173 debug('[KiwiProxy] Opening proxied connection to: ' + this.meta
.host
+ ':' + this.meta
.port
.toString());
175 var local_address
= this.meta
.interface ?
176 this.meta
.interface :
180 this.irc_socket
= tls
.connect({
181 port
: parseInt(this.meta
.port
, 10),
182 host
: this.meta
.host
,
183 rejectUnauthorized
: global
.config
.reject_unauthorised_certificates
,
184 localAddress
: local_address
185 }, this._onSocketConnect
.bind(this));
188 this.irc_socket
= net
.connect({
189 port
: parseInt(this.meta
.port
, 10),
190 host
: this.meta
.host
,
191 localAddress
: local_address
192 }, this._onSocketConnect
.bind(this));
195 this.irc_socket
.setTimeout(10000);
196 this.irc_socket
.on('error', this._onSocketError
.bind(this));
197 this.irc_socket
.on('timeout', this._onSocketTimeout
.bind(this));
199 // We need the raw socket connect event, not after any SSL handshakes or anything
200 if (this.irc_socket
.socket
) {
201 this.irc_socket
.socket
.on('connect', this._onRawSocketConnect
.bind(this));
203 this.irc_socket
.on('connect', this._onRawSocketConnect
.bind(this));
208 ProxyPipe
.prototype._onRawSocketConnect = function() {
209 this.proxy_server
.emit('socket_connected', this);
213 ProxyPipe
.prototype._onSocketConnect = function() {
214 debug('[KiwiProxy] ProxyPipe::_onSocketConnect()');
216 this.proxy_server
.emit('connection_open', this);
218 // Now that we're connected to the detination, return no
219 // error back to the kiwi server and start piping
220 this.kiwi_socket
.write(new Buffer(RESPONSE_OK
.toString()), this.startPiping
.bind(this));
224 ProxyPipe
.prototype._onSocketError = function(err
) {
226 ECONNRESET
: RESPONSE_ECONNRESET
,
227 ECONNREFUSED
: RESPONSE_ECONNREFUSED
,
228 ENOTFOUND
: RESPONSE_ENOTFOUND
,
229 ETIMEDOUT
: RESPONSE_ETIMEDOUT
231 debug('[KiwiProxy] IRC Error ' + err
.code
);
232 this.kiwi_socket
.write(new Buffer((replies
[err
.code
] || RESPONSE_ERROR
).toString()), 'UTF8', this.destroy
.bind(this));
236 ProxyPipe
.prototype._onSocketTimeout = function() {
237 this.has_timed_out
= true;
238 debug('[KiwiProxy] IRC Timeout');
239 this.irc_socket
.destroy();
240 this.kiwi_socket
.write(new Buffer(RESPONSE_ETIMEDOUT
.toString()), 'UTF8', this.destroy
.bind(this));
244 ProxyPipe
.prototype._onSocketClose = function() {
245 debug('[KiwiProxy] IRC Socket closed');
246 this.proxy_server
.emit('connection_close', this);
251 ProxyPipe
.prototype.startPiping = function() {
252 debug('[KiwiProxy] ProxyPipe::startPiping()');
254 // Let the piping handle socket closures
255 this.irc_socket
.removeAllListeners('error');
256 this.irc_socket
.removeAllListeners('timeout');
258 this.irc_socket
.on('close', this._onSocketClose
.bind(this));
260 this.kiwi_socket
.setEncoding('binary');
262 this.kiwi_socket
.pipe(this.irc_socket
);
263 this.irc_socket
.pipe(this.kiwi_socket
);
272 * Transparent socket interface to a kiwi proxy
274 function ProxySocket(proxy_port
, proxy_addr
, meta
, proxy_opts
) {
275 stream
.Duplex
.call(this);
277 this.connected_fn
= null;
278 this.proxy_addr
= proxy_addr
;
279 this.proxy_port
= proxy_port
;
280 this.proxy_opts
= proxy_opts
|| {};
282 this.setMeta(meta
|| {});
284 this.state
= 'disconnected';
287 util
.inherits(ProxySocket
, stream
.Duplex
);
290 ProxySocket
.prototype.setMeta = function(meta
) {
295 ProxySocket
.prototype.connectTls = function() {
296 this.meta
.ssl
= true;
297 return this.connect
.apply(this, arguments
);
301 ProxySocket
.prototype.connect = function(dest_port
, dest_addr
, connected_fn
) {
302 this.meta
.host
= dest_addr
;
303 this.meta
.port
= dest_port
;
304 this.connected_fn
= connected_fn
;
306 if (!this.meta
.host
|| !this.meta
.port
) {
307 debug('[KiwiProxy] Invalid destination addr/port', this.meta
);
311 debug('[KiwiProxy] Connecting to proxy ' + this.proxy_addr
+ ':' + this.proxy_port
.toString() + ' SSL: ' + (!!this.proxy_opts
.ssl
).toString());
312 if (this.proxy_opts
.ssl
) {
313 this.socket
= tls
.connect({
314 port
: this.proxy_port
,
315 host
: this.proxy_addr
,
316 rejectUnauthorized
: !!global
.config
.reject_unauthorised_certificates
,
317 }, this._onSocketConnect
.bind(this));
320 this.socket
= net
.connect(this.proxy_port
, this.proxy_addr
, this._onSocketConnect
.bind(this));
323 this.socket
.setTimeout(10000);
324 this.socket
.on('data', this._onSocketData
.bind(this));
325 this.socket
.on('close', this._onSocketClose
.bind(this));
326 this.socket
.on('error', this._onSocketError
.bind(this));
332 ProxySocket
.prototype.destroySocket = function() {
336 this.socket
.removeAllListeners();
337 this.socket
.destroy();
340 debug('[KiwiProxy] Destroying socket');
344 ProxySocket
.prototype._read = function() {
347 if (this.state
=== 'connected' && this.socket
) {
348 while ((data
= this.socket
.read()) !== null) {
349 if (this.push(data
) === false) {
359 ProxySocket
.prototype._write = function(chunk
, encoding
, callback
) {
360 if (this.state
=== 'connected' && this.socket
) {
361 return this.socket
.write(chunk
, encoding
, callback
);
363 debug('[KiwiProxy] Trying to write to an unfinished socket. State=' + this.state
);
364 callback('Not connected');
369 ProxySocket
.prototype._onSocketConnect = function() {
370 var meta
= this.meta
|| {};
372 this.state
= 'handshaking';
374 debug('[KiwiProxy] Connected to proxy, sending meta');
375 this.socket
.write(JSON
.stringify(meta
) + '\n');
379 ProxySocket
.prototype._onSocketData = function(data
) {
380 if (this.state
=== 'connected') {
381 this.emit('data', data
);
385 var buffer_str
= data
.toString(),
386 status
= buffer_str
[0],
390 error_codes
[RESPONSE_ERROR
] = 'ERROR';
391 error_codes
[RESPONSE_ECONNRESET
] = 'ECONNRESET';
392 error_codes
[RESPONSE_ECONNREFUSED
] = 'ECONNREFUSED';
393 error_codes
[RESPONSE_ENOTFOUND
] = 'ENOTFOUND';
394 error_codes
[RESPONSE_ETIMEDOUT
] = 'ETIMEDOUT';
396 debug('[KiwiProxy] Recieved socket status: ' + data
.toString());
397 if (status
=== RESPONSE_OK
) {
398 debug('[KiwiProxy] Remote socket connected OK');
399 this.state
= 'connected';
401 if (typeof this.connected_fn
=== 'function')
404 this.emit('connect');
407 this.destroySocket();
409 error_code
= error_codes
[status
] || error_codes
[RESPONSE_ERROR
];
410 debug('[KiwiProxy] Error: ' + error_code
);
411 this.emit('error', new Error(error_code
));
416 ProxySocket
.prototype._onSocketClose = function(had_error
) {
417 debug('[KiwiProxy] _onSocketClose() had_error=' + had_error
.toString());
418 if (this.state
=== 'connected') {
419 this.emit('close', had_error
);
423 if (!this.ignore_close
)
424 this.emit('error', new Error(RESPONSE_ERROR
));
428 ProxySocket
.prototype._onSocketError = function(err
) {
429 debug('[KiwiProxy] _onSocketError() err=' + err
.toString());
430 this.ignore_close
= true;
431 this.emit('error', err
);