channel:join + channel:leave plugin events
[KiwiIRC.git] / server / proxy.js
1 var stream = require('stream'),
2 util = require('util'),
3 events = require('events'),
4 net = require('net'),
5 tls = require('tls'),
6 fs = require('fs');
7
8
9 module.exports = {
10 ProxyServer: ProxyServer,
11 ProxySocket: ProxySocket
12 };
13
14 function debug() {
15 //console.log.apply(console, arguments);
16 }
17
18 // Socket connection responses
19 var RESPONSE_ERROR = '0';
20 var RESPONSE_OK = '1';
21 var RESPONSE_ECONNRESET = '2';
22 var RESPONSE_ECONNREFUSED = '3';
23 var RESPONSE_ENOTFOUND = '4';
24 var RESPONSE_ETIMEDOUT = '5';
25
26
27
28 /**
29 * ProxyServer
30 * Listens for connections from a kiwi server, dispatching ProxyPipe
31 * instances for each connection
32 */
33 function ProxyServer() {
34 events.EventEmitter.call(this);
35 }
36 util.inherits(ProxyServer, events.EventEmitter);
37
38
39 ProxyServer.prototype.listen = function(listen_port, listen_addr, opts) {
40 var that = this,
41 serv_opts = {},
42 connection_event = 'connection';
43
44 opts = opts || {};
45
46 // Listen using SSL?
47 if (opts.ssl) {
48 serv_opts = {
49 key: fs.readFileSync(opts.ssl_key),
50 cert: fs.readFileSync(opts.ssl_cert)
51 };
52
53 // Do we have an intermediate certificate?
54 if (typeof opts.ssl_ca !== 'undefined') {
55 // An array of them?
56 if (typeof opts.ssl_ca.map !== 'undefined') {
57 serv_opts.ca = opts.ssl_ca.map(function (f) { return fs.readFileSync(f); });
58
59 } else {
60 serv_opts.ca = fs.readFileSync(opts.ssl_ca);
61 }
62 }
63
64 this.server = tls.createServer(serv_opts);
65
66 connection_event = 'secureConnection';
67
68 }
69
70 // No SSL, start a simple clear text server
71 else {
72 this.server = new net.Server();
73 }
74
75 this.server.listen(listen_port, listen_addr, function() {
76 that.emit('listening');
77 });
78
79 this.server.on(connection_event, function(socket) {
80 new ProxyPipe(socket, that);
81 });
82 };
83
84
85 ProxyServer.prototype.close = function(callback) {
86 if (this.server) {
87 return this.server.close(callback);
88 }
89
90 if (typeof callback === 'function')
91 callback();
92 };
93
94
95
96
97 /**
98 * ProxyPipe
99 * Takes connections from a kiwi server, then:
100 * 1. Reads its meta data such as username for identd lookups
101 * 2. Make the connection to the IRC server
102 * 3. Reply to the kiwi server with connection status
103 * 4. If all ok, pipe data between the 2 sockets as a proxy
104 */
105 function ProxyPipe(kiwi_socket, proxy_server) {
106 debug('[KiwiProxy] New Kiwi connection');
107
108 this.kiwi_socket = kiwi_socket;
109 this.proxy_server = proxy_server;
110 this.irc_socket = null;
111 this.buffers = [];
112 this.meta = null;
113
114 kiwi_socket.on('readable', this.kiwiSocketOnReadable.bind(this));
115 }
116
117
118 ProxyPipe.prototype.destroy = function() {
119 this.buffers = null;
120 this.meta = null;
121
122 if (this.irc_socket) {
123 this.irc_socket.destroy();
124 this.irc_socket.removeAllListeners();
125 this.irc_socket = null;
126 }
127
128 if (this.kiwi_socket) {
129 this.kiwi_socket.destroy();
130 this.kiwi_socket.removeAllListeners();
131 this.kiwi_socket = null;
132 }
133 };
134
135
136 ProxyPipe.prototype.kiwiSocketOnReadable = function() {
137 var chunk, buffer, meta;
138
139 while ((chunk = this.kiwi_socket.read()) !== null) {
140 this.buffers.push(chunk);
141 }
142
143 // Not got a complete line yet? Wait some more
144 chunk = this.buffers[this.buffers.length-1];
145 if (!chunk || chunk[chunk.length-1] !== 0x0A)
146 return;
147
148 buffer = new Buffer.concat(this.buffers);
149 this.buffers = null;
150
151 try {
152 debug('[KiwiProxy] Found a complete line in the buffer');
153 meta = JSON.parse(buffer.toString('utf8'));
154 } catch (err) {
155 debug('[KiwiProxy] Error parsing meta');
156 this.destroy();
157 return;
158 }
159
160 if (!meta.username) {
161 debug('[KiwiProxy] Meta does not contain a username');
162 this.destroy();
163 return;
164 }
165
166 this.meta = meta;
167 this.kiwi_socket.removeAllListeners('readable');
168
169 this.makeIrcConnection();
170 };
171
172
173 ProxyPipe.prototype.makeIrcConnection = function() {
174 debug('[KiwiProxy] Opening proxied connection to: ' + this.meta.host + ':' + this.meta.port.toString());
175
176 var local_address = this.meta.interface ?
177 this.meta.interface :
178 '0.0.0.0';
179
180 if (this.meta.ssl) {
181 this.irc_socket = tls.connect({
182 port: parseInt(this.meta.port, 10),
183 host: this.meta.host,
184 rejectUnauthorized: global.config.reject_unauthorised_certificates,
185 localAddress: local_address
186 }, this._onSocketConnect.bind(this));
187
188 } else {
189 this.irc_socket = net.connect({
190 port: parseInt(this.meta.port, 10),
191 host: this.meta.host,
192 localAddress: local_address
193 }, this._onSocketConnect.bind(this));
194 }
195
196 this.irc_socket.setTimeout(10000);
197 this.irc_socket.on('error', this._onSocketError.bind(this));
198 this.irc_socket.on('timeout', this._onSocketTimeout.bind(this));
199
200 // We need the raw socket connect event, not after any SSL handshakes or anything
201 if (this.irc_socket.socket) {
202 this.irc_socket.socket.on('connect', this._onRawSocketConnect.bind(this));
203 } else {
204 this.irc_socket.on('connect', this._onRawSocketConnect.bind(this));
205 }
206 };
207
208
209 ProxyPipe.prototype._onRawSocketConnect = function() {
210 this.proxy_server.emit('socket_connected', this);
211 };
212
213
214 ProxyPipe.prototype._onSocketConnect = function() {
215 debug('[KiwiProxy] ProxyPipe::_onSocketConnect()');
216
217 this.proxy_server.emit('connection_open', this);
218
219 // Now that we're connected to the detination, return no
220 // error back to the kiwi server and start piping
221 this.kiwi_socket.write(new Buffer(RESPONSE_OK.toString()), this.startPiping.bind(this));
222 };
223
224
225 ProxyPipe.prototype._onSocketError = function(err) {
226 var replies = {
227 ECONNRESET: RESPONSE_ECONNRESET,
228 ECONNREFUSED: RESPONSE_ECONNREFUSED,
229 ENOTFOUND: RESPONSE_ENOTFOUND,
230 ETIMEDOUT: RESPONSE_ETIMEDOUT
231 };
232 debug('[KiwiProxy] IRC Error ' + err.code);
233 this.kiwi_socket.write(new Buffer((replies[err.code] || RESPONSE_ERROR).toString()), 'UTF8', this.destroy.bind(this));
234 };
235
236
237 ProxyPipe.prototype._onSocketTimeout = function() {
238 this.has_timed_out = true;
239 debug('[KiwiProxy] IRC Timeout');
240 this.irc_socket.destroy();
241 this.kiwi_socket.write(new Buffer(RESPONSE_ETIMEDOUT.toString()), 'UTF8', this.destroy.bind(this));
242 };
243
244
245 ProxyPipe.prototype._onSocketClose = function() {
246 debug('[KiwiProxy] IRC Socket closed');
247 this.proxy_server.emit('connection_close', this);
248 this.destroy();
249 };
250
251
252 ProxyPipe.prototype.startPiping = function() {
253 debug('[KiwiProxy] ProxyPipe::startPiping()');
254
255 // Let the piping handle socket closures
256 this.irc_socket.removeAllListeners('error');
257 this.irc_socket.removeAllListeners('timeout');
258
259 this.irc_socket.on('close', this._onSocketClose.bind(this));
260
261 this.kiwi_socket.pipe(this.irc_socket);
262 this.irc_socket.pipe(this.kiwi_socket);
263 };
264
265
266
267
268
269 /**
270 * ProxySocket
271 * Transparent socket interface to a kiwi proxy
272 */
273 function ProxySocket(proxy_port, proxy_addr, meta, proxy_opts) {
274 stream.Duplex.call(this);
275
276 this.connected_fn = null;
277 this.proxy_addr = proxy_addr;
278 this.proxy_port = proxy_port;
279 this.proxy_opts = proxy_opts || {};
280
281 this.setMeta(meta || {});
282
283 this.state = 'disconnected';
284 }
285
286 util.inherits(ProxySocket, stream.Duplex);
287
288
289 ProxySocket.prototype.setMeta = function(meta) {
290 this.meta = meta;
291 };
292
293
294 ProxySocket.prototype.connectTls = function() {
295 this.meta.ssl = true;
296 return this.connect.apply(this, arguments);
297 };
298
299
300 ProxySocket.prototype.connect = function(dest_port, dest_addr, connected_fn) {
301 this.meta.host = dest_addr;
302 this.meta.port = dest_port;
303 this.connected_fn = connected_fn;
304
305 if (!this.meta.host || !this.meta.port) {
306 debug('[KiwiProxy] Invalid destination addr/port', this.meta);
307 return false;
308 }
309
310 debug('[KiwiProxy] Connecting to proxy ' + this.proxy_addr + ':' + this.proxy_port.toString() + ' SSL: ' + (!!this.proxy_opts.ssl).toString());
311 if (this.proxy_opts.ssl) {
312 this.socket = tls.connect({
313 port: this.proxy_port,
314 host: this.proxy_addr,
315 rejectUnauthorized: !!global.config.reject_unauthorised_certificates,
316 }, this._onSocketConnect.bind(this));
317
318 } else {
319 this.socket = net.connect(this.proxy_port, this.proxy_addr, this._onSocketConnect.bind(this));
320 }
321
322 this.socket.setTimeout(10000);
323 this.socket.on('data', this._onSocketData.bind(this));
324 this.socket.on('close', this._onSocketClose.bind(this));
325 this.socket.on('error', this._onSocketError.bind(this));
326
327 return this;
328 };
329
330
331 ProxySocket.prototype.destroySocket = function() {
332 if (!this.socket)
333 return;
334
335 this.socket.removeAllListeners();
336 this.socket.destroy();
337 delete this.socket;
338
339 debug('[KiwiProxy] Destroying socket');
340 };
341
342
343 ProxySocket.prototype._read = function() {
344 var data;
345
346 if (this.state === 'connected' && this.socket) {
347 while ((data = this.socket.read()) !== null) {
348 if (this.push(data) === false) {
349 break;
350 }
351 }
352 } else {
353 this.push('');
354 }
355 };
356
357
358 ProxySocket.prototype._write = function(chunk, encoding, callback) {
359 if (this.state === 'connected' && this.socket) {
360 return this.socket.write(chunk, encoding, callback);
361 } else {
362 debug('[KiwiProxy] Trying to write to an unfinished socket. State=' + this.state);
363 callback('Not connected');
364 }
365 };
366
367
368 ProxySocket.prototype._onSocketConnect = function() {
369 var meta = this.meta || {};
370
371 this.state = 'handshaking';
372
373 debug('[KiwiProxy] Connected to proxy, sending meta');
374 this.socket.write(JSON.stringify(meta) + '\n');
375 };
376
377
378 ProxySocket.prototype._onSocketData = function(data) {
379 if (this.state === 'connected') {
380 this.emit('data', data);
381 return;
382 }
383
384 var buffer_str = data.toString(),
385 status = buffer_str[0],
386 error_code,
387 error_codes = {};
388
389 error_codes[RESPONSE_ERROR] = 'ERROR';
390 error_codes[RESPONSE_ECONNRESET] = 'ECONNRESET';
391 error_codes[RESPONSE_ECONNREFUSED] = 'ECONNREFUSED';
392 error_codes[RESPONSE_ENOTFOUND] = 'ENOTFOUND';
393 error_codes[RESPONSE_ETIMEDOUT] = 'ETIMEDOUT';
394
395 debug('[KiwiProxy] Recieved socket status: ' + data.toString());
396 if (status === RESPONSE_OK) {
397 debug('[KiwiProxy] Remote socket connected OK');
398 this.state = 'connected';
399
400 if (typeof this.connected_fn === 'function')
401 connected_fn();
402
403 this.emit('connect');
404
405 } else {
406 this.destroySocket();
407
408 error_code = error_codes[status] || error_codes[RESPONSE_ERROR];
409 debug('[KiwiProxy] Error: ' + error_code);
410 this.emit('error', new Error(error_code));
411 }
412 };
413
414
415 ProxySocket.prototype._onSocketClose = function(had_error) {
416 debug('[KiwiProxy] _onSocketClose() had_error=' + had_error.toString());
417 if (this.state === 'connected') {
418 this.emit('close', had_error);
419 return;
420 }
421
422 if (!this.ignore_close)
423 this.emit('error', new Error(RESPONSE_ERROR));
424 };
425
426
427 ProxySocket.prototype._onSocketError = function(err) {
428 debug('[KiwiProxy] _onSocketError() err=' + err.toString());
429 this.ignore_close = true;
430 this.emit('error', err);
431 };