4 * Copyright(c) 2011 LearnBoost <dev@learnboost.com>
12 var parser
= require('./parser');
15 * Expose the constructor.
18 exports
= module
.exports
= Transport
;
21 * Transport constructor.
26 function Transport (mng
, data
, req
) {
29 this.disconnected
= false;
31 this.handleRequest(req
);
40 Transport
.prototype.__defineGetter__('log', function () {
41 return this.manager
.log
;
50 Transport
.prototype.__defineGetter__('store', function () {
51 return this.manager
.store
;
55 * Handles a request when it's set.
60 Transport
.prototype.handleRequest = function (req
) {
61 this.log
.debug('setting request', req
.method
, req
.url
);
64 if (req
.method
== 'GET') {
65 this.socket
= req
.socket
;
68 this.setHeartbeatInterval();
71 this.onSocketConnect();
76 * Called when a connection is first set.
81 Transport
.prototype.onSocketConnect = function () { };
84 * Sets transport handlers
89 Transport
.prototype.setHandlers = function () {
92 // we need to do this in a pub/sub way since the client can POST the message
93 // over a different socket (ie: different Transport instance)
94 this.store
.subscribe('heartbeat-clear:' + this.id
, function () {
95 self
.onHeartbeatClear();
98 this.store
.subscribe('disconnect-force:' + this.id
, function () {
99 self
.onForcedDisconnect();
102 this.store
.subscribe('dispatch:' + this.id
, function (packet
, volatile
) {
103 self
.onDispatch(packet
, volatile
);
107 end
: this.onSocketEnd
.bind(this)
108 , close
: this.onSocketClose
.bind(this)
109 , error
: this.onSocketError
.bind(this)
110 , drain
: this.onSocketDrain
.bind(this)
113 this.socket
.on('end', this.bound
.end
);
114 this.socket
.on('close', this.bound
.close
);
115 this.socket
.on('error', this.bound
.error
);
116 this.socket
.on('drain', this.bound
.drain
);
118 this.handlersSet
= true;
122 * Removes transport handlers
127 Transport
.prototype.clearHandlers = function () {
128 if (this.handlersSet
) {
129 this.store
.unsubscribe('disconnect-force:' + this.id
);
130 this.store
.unsubscribe('heartbeat-clear:' + this.id
);
131 this.store
.unsubscribe('dispatch:' + this.id
);
133 this.socket
.removeListener('end', this.bound
.end
);
134 this.socket
.removeListener('close', this.bound
.close
);
135 this.socket
.removeListener('error', this.bound
.error
);
136 this.socket
.removeListener('drain', this.bound
.drain
);
141 * Called when the connection dies
146 Transport
.prototype.onSocketEnd = function () {
147 this.end('socket end');
151 * Called when the connection dies
156 Transport
.prototype.onSocketClose = function (error
) {
157 this.end(error
? 'socket error' : 'socket close');
161 * Called when the connection has an error.
166 Transport
.prototype.onSocketError = function (err
) {
168 this.socket
.destroy();
172 this.log
.info('socket error ' + err
.stack
);
176 * Called when the connection is drained.
181 Transport
.prototype.onSocketDrain = function () {
186 * Called upon receiving a heartbeat packet.
191 Transport
.prototype.onHeartbeatClear = function () {
192 this.clearHeartbeatTimeout();
193 this.setHeartbeatInterval();
197 * Called upon a forced disconnection.
202 Transport
.prototype.onForcedDisconnect = function () {
203 if (!this.disconnected
) {
204 this.log
.info('transport end by forced client disconnection');
206 this.packet({ type
: 'disconnect' });
213 * Dispatches a packet.
218 Transport
.prototype.onDispatch = function (packet
, volatile
) {
220 this.writeVolatile(packet
);
227 * Sets the close timeout.
230 Transport
.prototype.setCloseTimeout = function () {
231 if (!this.closeTimeout
) {
234 this.closeTimeout
= setTimeout(function () {
235 self
.log
.debug('fired close timeout for client', self
.id
);
236 self
.closeTimeout
= null;
237 self
.end('close timeout');
238 }, this.manager
.get('close timeout') * 1000);
240 this.log
.debug('set close timeout for client', this.id
);
245 * Clears the close timeout.
248 Transport
.prototype.clearCloseTimeout = function () {
249 if (this.closeTimeout
) {
250 clearTimeout(this.closeTimeout
);
251 this.closeTimeout
= null;
253 this.log
.debug('cleared close timeout for client', this.id
);
258 * Sets the heartbeat timeout
261 Transport
.prototype.setHeartbeatTimeout = function () {
262 if (!this.heartbeatTimeout
) {
265 this.heartbeatTimeout
= setTimeout(function () {
266 self
.log
.debug('fired heartbeat timeout for client', self
.id
);
267 self
.heartbeatTimeout
= null;
268 self
.end('heartbeat timeout');
269 }, this.manager
.get('heartbeat timeout') * 1000);
271 this.log
.debug('set heartbeat timeout for client', this.id
);
276 * Clears the heartbeat timeout
281 Transport
.prototype.clearHeartbeatTimeout = function () {
282 if (this.heartbeatTimeout
) {
283 clearTimeout(this.heartbeatTimeout
);
284 this.heartbeatTimeout
= null;
285 this.log
.debug('cleared heartbeat timeout for client', this.id
);
290 * Sets the heartbeat interval. To be called when a connection opens and when
291 * a heartbeat is received.
296 Transport
.prototype.setHeartbeatInterval = function () {
297 if (!this.heartbeatInterval
) {
300 this.heartbeatInterval
= setTimeout(function () {
302 self
.heartbeatInterval
= null;
303 }, this.manager
.get('heartbeat interval') * 1000);
305 this.log
.debug('set heartbeat interval for client', this.id
);
310 * Clears all timeouts.
315 Transport
.prototype.clearTimeouts = function () {
316 this.clearCloseTimeout();
317 this.clearHeartbeatTimeout();
318 this.clearHeartbeatInterval();
327 Transport
.prototype.heartbeat = function () {
329 this.log
.debug('emitting heartbeat for client', this.id
);
330 this.packet({ type
: 'heartbeat' });
331 this.setHeartbeatTimeout();
340 * @param {Object} packet object
344 Transport
.prototype.onMessage = function (packet
) {
345 var current
= this.manager
.transports
[this.id
];
347 if ('heartbeat' == packet
.type
) {
348 this.log
.debug('got heartbeat packet');
350 if (current
&& current
.open
) {
351 current
.onHeartbeatClear();
353 this.store
.publish('heartbeat-clear:' + this.id
);
356 if ('disconnect' == packet
.type
&& packet
.endpoint
== '') {
357 this.log
.debug('got disconnection packet');
360 current
.onForcedDisconnect();
362 this.store
.publish('disconnect-force:' + this.id
);
368 if (packet
.id
&& packet
.ack
!= 'data') {
369 this.log
.debug('acknowledging packet automatically');
371 var ack
= parser
.encodePacket({
374 , endpoint
: packet
.endpoint
|| ''
377 if (current
&& current
.open
) {
378 current
.onDispatch(ack
);
380 this.manager
.onClientDispatch(this.id
, ack
);
381 this.store
.publish('dispatch:' + this.id
, ack
);
385 // handle packet locally or publish it
387 this.manager
.onClientMessage(this.id
, packet
);
389 this.store
.publish('message:' + this.id
, packet
);
395 * Clears the heartbeat interval
400 Transport
.prototype.clearHeartbeatInterval = function () {
401 if (this.heartbeatInterval
) {
402 clearTimeout(this.heartbeatInterval
);
403 this.heartbeatInterval
= null;
404 this.log
.debug('cleared heartbeat interval for client', this.id
);
409 * Finishes the connection and makes sure client doesn't reopen
414 Transport
.prototype.disconnect = function (reason
) {
415 this.packet({ type
: 'disconnect' });
422 * Closes the connection.
427 Transport
.prototype.close = function () {
435 * Called upon a connection close.
440 Transport
.prototype.onClose = function () {
442 this.setCloseTimeout();
443 this.clearHandlers();
445 this.manager
.onClose(this.id
);
446 this.store
.publish('close', this.id
);
451 * Cleans up the connection, considers the client disconnected.
456 Transport
.prototype.end = function (reason
) {
457 if (!this.disconnected
) {
458 this.log
.info('transport end');
460 var local
= this.manager
.transports
[this.id
];
463 this.clearTimeouts();
464 this.disconnected
= true;
467 this.manager
.onClientDisconnect(this.id
, reason
, true);
469 this.store
.publish('disconnect:' + this.id
, reason
);
475 * Signals that the transport should pause and buffer data.
480 Transport
.prototype.discard = function () {
481 this.log
.debug('discarding transport');
482 this.discarded
= true;
483 this.clearTimeouts();
484 this.clearHandlers();
490 * Writes an error packet with the specified reason and advice.
492 * @param {Number} advice
493 * @param {Number} reason
497 Transport
.prototype.error = function (reason
, advice
) {
504 this.log
.warn(reason
, advice
? ('client should ' + advice
) : '');
514 Transport
.prototype.packet = function (obj
) {
515 return this.write(parser
.encodePacket(obj
));
519 * Writes a volatile message.
524 Transport
.prototype.writeVolatile = function (msg
) {
529 this.log
.debug('ignoring volatile packet, buffer not drained');
532 this.log
.debug('ignoring volatile packet, transport not open');