Replace calls to console.log() with Winston.info() etc.
[KiwiIRC.git] / server / proxy.js
1 var stream = require('stream'),
2 util = require('util'),
3 events = require('events'),
4 net = require('net'),
5 tls = require('tls'),
6 fs = require('fs');
7
8
9 module.exports = {
10 ProxyServer: ProxyServer,
11 ProxySocket: ProxySocket
12 };
13
14 function debug() {
15 //console.log.apply(console, arguments);
16 }
17
18 // Socket connection responses
19 var RESPONSE_ERROR = '0';
20 var RESPONSE_OK = '1';
21 var RESPONSE_ECONNRESET = '2';
22 var RESPONSE_ECONNREFUSED = '3';
23 var RESPONSE_ENOTFOUND = '4';
24 var RESPONSE_ETIMEDOUT = '5';
25
26
27
28 /**
29 * ProxyServer
30 * Listens for connections from a kiwi server, dispatching ProxyPipe
31 * instances for each connection
32 */
33 function ProxyServer() {
34 events.EventEmitter.call(this);
35 }
36 util.inherits(ProxyServer, events.EventEmitter);
37
38
39 ProxyServer.prototype.listen = function(listen_port, listen_addr, opts) {
40 var that = this,
41 serv_opts = {},
42 connection_event = 'connection';
43
44 opts = opts || {};
45
46 // Listen using SSL?
47 if (opts.ssl) {
48 serv_opts = {
49 key: fs.readFileSync(opts.ssl_key),
50 cert: fs.readFileSync(opts.ssl_cert)
51 };
52
53 // Do we have an intermediate certificate?
54 if (typeof opts.ssl_ca !== 'undefined') {
55 // An array of them?
56 if (typeof opts.ssl_ca.map !== 'undefined') {
57 serv_opts.ca = opts.ssl_ca.map(function (f) { return fs.readFileSync(f); });
58
59 } else {
60 serv_opts.ca = fs.readFileSync(opts.ssl_ca);
61 }
62 }
63
64 this.server = tls.createServer(serv_opts);
65
66 connection_event = 'secureConnection';
67
68 }
69
70 // No SSL, start a simple clear text server
71 else {
72 this.server = new net.Server();
73 }
74
75 this.server.listen(listen_port, listen_addr, function() {
76 that.emit('listening');
77 });
78
79 this.server.on(connection_event, function(socket) {
80 new ProxyPipe(socket, that);
81 });
82 };
83
84
85 ProxyServer.prototype.close = function(callback) {
86 if (this.server) {
87 return this.server.close(callback);
88 }
89
90 if (typeof callback === 'function')
91 callback();
92 };
93
94
95
96
97 /**
98 * ProxyPipe
99 * Takes connections from a kiwi server, then:
100 * 1. Reads its meta data such as username for identd lookups
101 * 2. Make the connection to the IRC server
102 * 3. Reply to the kiwi server with connection status
103 * 4. If all ok, pipe data between the 2 sockets as a proxy
104 */
105 function ProxyPipe(kiwi_socket, proxy_server) {
106 debug('[KiwiProxy] New Kiwi connection');
107
108 this.kiwi_socket = kiwi_socket;
109 this.proxy_server = proxy_server;
110 this.irc_socket = null;
111 this.buffer = '';
112 this.meta = null;
113
114 debug('[KiwiProxy] Setting encoding to utf8');
115 kiwi_socket.setEncoding('utf8');
116 kiwi_socket.on('readable', this.kiwiSocketOnReadable.bind(this));
117 }
118
119
120 ProxyPipe.prototype.destroy = function() {
121 this.buffer = null;
122 this.meta = null;
123
124 if (this.irc_socket) {
125 this.irc_socket.destroy();
126 this.irc_socket.removeAllListeners();
127 this.irc_socket = null;
128 }
129
130 if (this.kiwi_socket) {
131 this.kiwi_socket.destroy();
132 this.kiwi_socket.removeAllListeners();
133 this.kiwi_socket = null;
134 }
135 };
136
137
138 ProxyPipe.prototype.kiwiSocketOnReadable = function() {
139 var chunk, meta;
140
141 while ((chunk = this.kiwi_socket.read()) !== null) {
142 this.buffer += chunk;
143 }
144
145 // Not got a complete line yet? Wait some more
146 if (this.buffer.indexOf('\n') === -1)
147 return;
148
149 try {
150 debug('[KiwiProxy] Found a complete line in the buffer');
151 meta = JSON.parse(this.buffer.substr(0, this.buffer.indexOf('\n')));
152 } catch (err) {
153 debug('[KiwiProxy] Error parsing meta');
154 this.destroy();
155 return;
156 }
157
158 if (!meta.username) {
159 debug('[KiwiProxy] Meta does not contain a username');
160 this.destroy();
161 return;
162 }
163
164 this.buffer = '';
165 this.meta = meta;
166 this.kiwi_socket.removeAllListeners('readable');
167
168 this.makeIrcConnection();
169 };
170
171
172 ProxyPipe.prototype.makeIrcConnection = function() {
173 debug('[KiwiProxy] Opening proxied connection to: ' + this.meta.host + ':' + this.meta.port.toString());
174
175 var local_address = this.meta.interface ?
176 this.meta.interface :
177 '0.0.0.0';
178
179 if (this.meta.ssl) {
180 this.irc_socket = tls.connect({
181 port: parseInt(this.meta.port, 10),
182 host: this.meta.host,
183 rejectUnauthorized: global.config.reject_unauthorised_certificates,
184 localAddress: local_address
185 }, this._onSocketConnect.bind(this));
186
187 } else {
188 this.irc_socket = net.connect({
189 port: parseInt(this.meta.port, 10),
190 host: this.meta.host,
191 localAddress: local_address
192 }, this._onSocketConnect.bind(this));
193 }
194
195 this.irc_socket.setTimeout(10000);
196 this.irc_socket.on('error', this._onSocketError.bind(this));
197 this.irc_socket.on('timeout', this._onSocketTimeout.bind(this));
198
199 // We need the raw socket connect event, not after any SSL handshakes or anything
200 if (this.irc_socket.socket) {
201 this.irc_socket.socket.on('connect', this._onRawSocketConnect.bind(this));
202 } else {
203 this.irc_socket.on('connect', this._onRawSocketConnect.bind(this));
204 }
205 };
206
207
208 ProxyPipe.prototype._onRawSocketConnect = function() {
209 this.proxy_server.emit('socket_connected', this);
210 };
211
212
213 ProxyPipe.prototype._onSocketConnect = function() {
214 debug('[KiwiProxy] ProxyPipe::_onSocketConnect()');
215
216 this.proxy_server.emit('connection_open', this);
217
218 // Now that we're connected to the detination, return no
219 // error back to the kiwi server and start piping
220 this.kiwi_socket.write(new Buffer(RESPONSE_OK.toString()), this.startPiping.bind(this));
221 };
222
223
224 ProxyPipe.prototype._onSocketError = function(err) {
225 var replies = {
226 ECONNRESET: RESPONSE_ECONNRESET,
227 ECONNREFUSED: RESPONSE_ECONNREFUSED,
228 ENOTFOUND: RESPONSE_ENOTFOUND,
229 ETIMEDOUT: RESPONSE_ETIMEDOUT
230 };
231 debug('[KiwiProxy] IRC Error ' + err.code);
232 this.kiwi_socket.write(new Buffer((replies[err.code] || RESPONSE_ERROR).toString()), 'UTF8', this.destroy.bind(this));
233 };
234
235
236 ProxyPipe.prototype._onSocketTimeout = function() {
237 this.has_timed_out = true;
238 debug('[KiwiProxy] IRC Timeout');
239 this.irc_socket.destroy();
240 this.kiwi_socket.write(new Buffer(RESPONSE_ETIMEDOUT.toString()), 'UTF8', this.destroy.bind(this));
241 };
242
243
244 ProxyPipe.prototype._onSocketClose = function() {
245 debug('[KiwiProxy] IRC Socket closed');
246 this.proxy_server.emit('connection_close', this);
247 this.destroy();
248 };
249
250
251 ProxyPipe.prototype.startPiping = function() {
252 debug('[KiwiProxy] ProxyPipe::startPiping()');
253
254 // Let the piping handle socket closures
255 this.irc_socket.removeAllListeners('error');
256 this.irc_socket.removeAllListeners('timeout');
257
258 this.irc_socket.on('close', this._onSocketClose.bind(this));
259
260 this.kiwi_socket.setEncoding('binary');
261
262 this.kiwi_socket.pipe(this.irc_socket);
263 this.irc_socket.pipe(this.kiwi_socket);
264 };
265
266
267
268
269
270 /**
271 * ProxySocket
272 * Transparent socket interface to a kiwi proxy
273 */
274 function ProxySocket(proxy_port, proxy_addr, meta, proxy_opts) {
275 stream.Duplex.call(this);
276
277 this.connected_fn = null;
278 this.proxy_addr = proxy_addr;
279 this.proxy_port = proxy_port;
280 this.proxy_opts = proxy_opts || {};
281
282 this.setMeta(meta || {});
283
284 this.state = 'disconnected';
285 }
286
287 util.inherits(ProxySocket, stream.Duplex);
288
289
290 ProxySocket.prototype.setMeta = function(meta) {
291 this.meta = meta;
292 };
293
294
295 ProxySocket.prototype.connectTls = function() {
296 this.meta.ssl = true;
297 return this.connect.apply(this, arguments);
298 };
299
300
301 ProxySocket.prototype.connect = function(dest_port, dest_addr, connected_fn) {
302 this.meta.host = dest_addr;
303 this.meta.port = dest_port;
304 this.connected_fn = connected_fn;
305
306 if (!this.meta.host || !this.meta.port) {
307 debug('[KiwiProxy] Invalid destination addr/port', this.meta);
308 return false;
309 }
310
311 debug('[KiwiProxy] Connecting to proxy ' + this.proxy_addr + ':' + this.proxy_port.toString() + ' SSL: ' + (!!this.proxy_opts.ssl).toString());
312 if (this.proxy_opts.ssl) {
313 this.socket = tls.connect({
314 port: this.proxy_port,
315 host: this.proxy_addr,
316 rejectUnauthorized: !!global.config.reject_unauthorised_certificates,
317 }, this._onSocketConnect.bind(this));
318
319 } else {
320 this.socket = net.connect(this.proxy_port, this.proxy_addr, this._onSocketConnect.bind(this));
321 }
322
323 this.socket.setTimeout(10000);
324 this.socket.on('data', this._onSocketData.bind(this));
325 this.socket.on('close', this._onSocketClose.bind(this));
326 this.socket.on('error', this._onSocketError.bind(this));
327
328 return this;
329 };
330
331
332 ProxySocket.prototype.destroySocket = function() {
333 if (!this.socket)
334 return;
335
336 this.socket.removeAllListeners();
337 this.socket.destroy();
338 delete this.socket;
339
340 debug('[KiwiProxy] Destroying socket');
341 };
342
343
344 ProxySocket.prototype._read = function() {
345 var data;
346
347 if (this.state === 'connected' && this.socket) {
348 while ((data = this.socket.read()) !== null) {
349 if (this.push(data) === false) {
350 break;
351 }
352 }
353 } else {
354 this.push('');
355 }
356 };
357
358
359 ProxySocket.prototype._write = function(chunk, encoding, callback) {
360 if (this.state === 'connected' && this.socket) {
361 return this.socket.write(chunk, encoding, callback);
362 } else {
363 debug('[KiwiProxy] Trying to write to an unfinished socket. State=' + this.state);
364 callback('Not connected');
365 }
366 };
367
368
369 ProxySocket.prototype._onSocketConnect = function() {
370 var meta = this.meta || {};
371
372 this.state = 'handshaking';
373
374 debug('[KiwiProxy] Connected to proxy, sending meta');
375 this.socket.write(JSON.stringify(meta) + '\n');
376 };
377
378
379 ProxySocket.prototype._onSocketData = function(data) {
380 if (this.state === 'connected') {
381 this.emit('data', data);
382 return;
383 }
384
385 var buffer_str = data.toString(),
386 status = buffer_str[0],
387 error_code,
388 error_codes = {};
389
390 error_codes[RESPONSE_ERROR] = 'ERROR';
391 error_codes[RESPONSE_ECONNRESET] = 'ECONNRESET';
392 error_codes[RESPONSE_ECONNREFUSED] = 'ECONNREFUSED';
393 error_codes[RESPONSE_ENOTFOUND] = 'ENOTFOUND';
394 error_codes[RESPONSE_ETIMEDOUT] = 'ETIMEDOUT';
395
396 debug('[KiwiProxy] Recieved socket status: ' + data.toString());
397 if (status === RESPONSE_OK) {
398 debug('[KiwiProxy] Remote socket connected OK');
399 this.state = 'connected';
400
401 if (typeof this.connected_fn === 'function')
402 connected_fn();
403
404 this.emit('connect');
405
406 } else {
407 this.destroySocket();
408
409 error_code = error_codes[status] || error_codes[RESPONSE_ERROR];
410 debug('[KiwiProxy] Error: ' + error_code);
411 this.emit('error', new Error(error_code));
412 }
413 };
414
415
416 ProxySocket.prototype._onSocketClose = function(had_error) {
417 debug('[KiwiProxy] _onSocketClose() had_error=' + had_error.toString());
418 if (this.state === 'connected') {
419 this.emit('close', had_error);
420 return;
421 }
422
423 if (!this.ignore_close)
424 this.emit('error', new Error(RESPONSE_ERROR));
425 };
426
427
428 ProxySocket.prototype._onSocketError = function(err) {
429 debug('[KiwiProxy] _onSocketError() err=' + err.toString());
430 this.ignore_close = true;
431 this.emit('error', err);
432 };