From 34f9f1515085930d61279402092d23f3d5844305 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Tue, 5 Feb 2013 23:33:44 +0400 Subject: [PATCH] socket.io rooms instead of own session management Replaces homegrown pad session management with socket.io build-in rooms --- src/node/handler/PadMessageHandler.js | 432 ++++++++++---------------- 1 file changed, 163 insertions(+), 269 deletions(-) diff --git a/src/node/handler/PadMessageHandler.js b/src/node/handler/PadMessageHandler.js index 6781cd88..69bb568d 100644 --- a/src/node/handler/PadMessageHandler.js +++ b/src/node/handler/PadMessageHandler.js @@ -35,11 +35,6 @@ var messageLogger = log4js.getLogger("message"); var _ = require('underscore'); var hooks = require("ep_etherpad-lite/static/js/pluginfw/hooks.js"); -/** - * A associative array that saves which sessions belong to a pad - */ -var pad2sessions = {}; - /** * A associative array that saves informations about a session * key = sessionId @@ -83,14 +78,11 @@ exports.handleConnect = function(client) exports.kickSessionsFromPad = function(padID) { //skip if there is nobody on this pad - if(!pad2sessions[padID]) + if(socketio.sockets.clients(padID).length == 0) return; //disconnect everyone from this pad - for(var i in pad2sessions[padID]) - { - socketio.sockets.sockets[pad2sessions[padID][i]].json.send({disconnect:"deleted"}); - } + socketio.sockets.in(padID).json.send({disconnect:"deleted"}); } /** @@ -100,15 +92,13 @@ exports.kickSessionsFromPad = function(padID) exports.handleDisconnect = function(client) { //save the padname of this session - var sessionPad=sessioninfos[client.id].padId; + var session = sessioninfos[client.id]; //if this connection was already etablished with a handshake, send a disconnect message to the others - if(sessioninfos[client.id] && sessioninfos[client.id].author) + if(session && session.author) { - var author = sessioninfos[client.id].author; - //get the author color out of the db - authorManager.getAuthorColorId(author, function(err, color) + authorManager.getAuthorColorId(session.author, function(err, color) { ERR(err); @@ -121,33 +111,16 @@ exports.handleDisconnect = function(client) "ip": "127.0.0.1", "colorId": color, "userAgent": "Anonymous", - "userId": author + "userId": session.author } } }; //Go trough all user that are still on the pad, and send them the USER_LEAVE message - for(i in pad2sessions[sessionPad]) - { - var socket = socketio.sockets.sockets[pad2sessions[sessionPad][i]]; - if(socket !== undefined){ - socket.json.send(messageToTheOtherUsers); - } - - } + client.broadcast.to(session.padId).json.send(messageToTheOtherUsers); }); } - //Go trough all sessions of this pad, search and destroy the entry of this client - for(i in pad2sessions[sessionPad]) - { - if(pad2sessions[sessionPad][i] == client.id) - { - pad2sessions[sessionPad].splice(i, 1); - break; - } - } - //Delete the sessioninfos entrys of this session delete sessioninfos[client.id]; } @@ -228,11 +201,10 @@ exports.handleMessage = function(client, message) function(callback) { - if(!message.padId){ - // If the message has a padId we assume the client is already known to the server and needs no re-authorization - callback(); - return; - } + // If the message has a padId we assume the client is already known to the server and needs no re-authorization + if(!message.padId) + return callback(); + // Note: message.sessionID is an entirely different kind of // session from the sessions we use here! Beware! FIXME: Call // our "sessions" "connections". @@ -292,9 +264,7 @@ exports.handleCustomMessage = function (padID, msg, cb) { time: time } }; - for (var i in pad2sessions[padID]) { - socketio.sockets.sockets[pad2sessions[padID][i]].json.send(msg); - } + socketio.sockets.in(padID).json.send(msg); cb(null, {}); } @@ -352,10 +322,7 @@ function handleChatMessage(client, message) }; //broadcast the chat message to everyone on the pad - for(var i in pad2sessions[padId]) - { - socketio.sockets.sockets[pad2sessions[padId][i]].json.send(msg); - } + socketio.sockets.in(padId).json.send(msg); callback(); } @@ -413,23 +380,16 @@ function handleGetChatMessages(client, message) { if(ERR(err, callback)) return; - var infoMsg = { - type: "COLLABROOM", - data: { - type: "CHAT_MESSAGES", - messages: chatMessages - } - }; - - // send the messages back to the client - for(var i in pad2sessions[padId]) - { - if(pad2sessions[padId][i] == client.id) - { - socketio.sockets.sockets[pad2sessions[padId][i]].json.send(infoMsg); - break; + var infoMsg = { + type: "COLLABROOM", + data: { + type: "CHAT_MESSAGES", + messages: chatMessages } - } + }; + + // send the messages back to the client + client.json.send(infoMsg); }); }]); } @@ -453,14 +413,14 @@ function handleSuggestUserName(client, message) return; } - var padId = sessioninfos[client.id].padId; + var padId = sessioninfos[client.id].padId, + clients = socketio.sockets.clients(padId); //search the author and send him this message - for(var i in pad2sessions[padId]) - { - if(sessioninfos[pad2sessions[padId][i]].author == message.data.payload.unnamedId) - { - socketio.sockets.sockets[pad2sessions[padId][i]].send(message); + for(var i = 0; i < clients.length; i++) { + var session = sessioninfos[clients[i].id]; + if(session && session.author == message.data.payload.unnamedId) { + clients[i].json.send(message); break; } } @@ -501,7 +461,8 @@ function handleUserInfoUpdate(client, message) type: "USER_NEWINFO", userInfo: { userId: author, - name: message.data.userInfo.name, + //set a null name, when there is no name set. cause the client wants it null + name: message.data.userInfo.name || null, colorId: message.data.userInfo.colorId, userAgent: "Anonymous", ip: "127.0.0.1", @@ -509,20 +470,8 @@ function handleUserInfoUpdate(client, message) } }; - //set a null name, when there is no name set. cause the client wants it null - if(infoMsg.data.userInfo.name == null) - { - infoMsg.data.userInfo.name = null; - } - //Send the other clients on the pad the update message - for(var i in pad2sessions[padId]) - { - if(pad2sessions[padId][i] != client.id) - { - socketio.sockets.sockets[pad2sessions[padId][i]].json.send(infoMsg); - } - } + client.broadcast.to(padId).json.send(infoMsg); } /** @@ -682,90 +631,76 @@ function handleUserChanges(client, message) exports.updatePadClients = function(pad, callback) { //skip this step if noone is on this pad - if(!pad2sessions[pad.id]) - { - callback(); - return; - } + var roomClients = socketio.sockets.clients(pad.id); + if(roomClients.length==0) + return callback(); + // since all clients usually get the same set of changesets, store them in local cache + // to remove unnecessary roundtrip to the datalayer + // TODO: in REAL world, if we're working without datalayer cache, all requests to revisions will be fired + // BEFORE first result will be landed to our cache object. The solution is to replace parallel processing + // via async.forEach with sequential for() loop. There is no real benefits of running this in parallel, + // but benefit of reusing cached revision object is HUGE + var revCache = {}; + //go trough all sessions on this pad - async.forEach(pad2sessions[pad.id], function(session, callback) + async.forEach(roomClients, function(client, callback) { + var sid = client.id; //https://github.com/caolan/async#whilst //send them all new changesets async.whilst( - function (){ return sessioninfos[session] && sessioninfos[session].rev < pad.getHeadRevisionNumber()}, + function (){ return sessioninfos[sid] && sessioninfos[sid].rev < pad.getHeadRevisionNumber()}, function(callback) { - var author, revChangeset, currentTime; - var r = sessioninfos[session].rev + 1; - - async.parallel([ - function (callback) - { - pad.getRevisionAuthor(r, function(err, value) - { - if(ERR(err, callback)) return; - author = value; - callback(); - }); - }, - function (callback) - { - pad.getRevisionChangeset(r, function(err, value) - { - if(ERR(err, callback)) return; - revChangeset = value; - callback(); - }); - }, - function (callback) - { - pad.getRevisionDate(r, function(err, date) - { - if(ERR(err, callback)) return; - currentTime = date; - callback(); - }); - } - ], function(err) - { - if(ERR(err, callback)) return; - // next if session has not been deleted - if(sessioninfos[session] == null) - { - callback(null); - return; - } - if(author == sessioninfos[session].author) - { - socketio.sockets.sockets[session].json.send({"type":"COLLABROOM","data":{type:"ACCEPT_COMMIT", newRev:r}}); - } - else - { - var forWire = Changeset.prepareForWire(revChangeset, pad.pool); - var wireMsg = {"type":"COLLABROOM", - "data":{type:"NEW_CHANGES", - newRev:r, - changeset: forWire.translated, - apool: forWire.pool, - author: author, - currentTime: currentTime, - timeDelta: currentTime - sessioninfos[session].time - }}; - - socketio.sockets.sockets[session].json.send(wireMsg); - } + var r = sessioninfos[sid].rev + 1; - if(sessioninfos[session] != null) - { - sessioninfos[session].time = currentTime; - sessioninfos[session].rev = r; - } - - callback(null); - }); + async.waterfall([ + function(callback) { + if(revCache[r]) + callback(null, revCache[r]); + else + pad.getRevision(r, callback); + }, + function(revision, callback) + { + revCache[r] = revision; + + var author = revision.meta.author, + revChangeset = revision.changeset, + currentTime = revision.meta.timestamp; + + // next if session has not been deleted + if(sessioninfos[sid] == null) + return callback(null); + + if(author == sessioninfos[sid].author) + { + client.json.send({"type":"COLLABROOM","data":{type:"ACCEPT_COMMIT", newRev:r}}); + } + else + { + var forWire = Changeset.prepareForWire(revChangeset, pad.pool); + var wireMsg = {"type":"COLLABROOM", + "data":{type:"NEW_CHANGES", + newRev:r, + changeset: forWire.translated, + apool: forWire.pool, + author: author, + currentTime: currentTime, + timeDelta: currentTime - sessioninfos[sid].time + }}; + + client.json.send(wireMsg); + } + + sessioninfos[sid].time = currentTime; + sessioninfos[sid].rev = r; + + callback(null); + } + ], callback); }, callback ); @@ -895,23 +830,14 @@ function handleClientReady(client, message) function(callback) { async.parallel([ - //get colorId + //get colorId and name function(callback) { - authorManager.getAuthorColorId(author, function(err, value) + authorManager.getAuthor(author, function(err, value) { if(ERR(err, callback)) return; - authorColorId = value; - callback(); - }); - }, - //get author name - function(callback) - { - authorManager.getAuthorName(author, function(err, value) - { - if(ERR(err, callback)) return; - authorName = value; + authorColorId = value.colorId; + authorName = value.name; callback(); }); }, @@ -965,21 +891,17 @@ function handleClientReady(client, message) { //Check that the client is still here. It might have disconnected between callbacks. if(sessioninfos[client.id] === undefined) - { - callback(); - return; - } + return callback(); //Check if this author is already on the pad, if yes, kick the other sessions! - if(pad2sessions[padIds.padId]) - { - for(var i in pad2sessions[padIds.padId]) - { - if(sessioninfos[pad2sessions[padIds.padId][i]] && sessioninfos[pad2sessions[padIds.padId][i]].author == author) - { - var socket = socketio.sockets.sockets[pad2sessions[padIds.padId][i]]; - if(socket) socket.json.send({disconnect:"userdup"}); - } + var roomClients = socketio.sockets.clients(padIds.padId); + for(var i = 0; i < roomClients.length; i++) { + var sinfo = sessioninfos[roomClients[i].id]; + if(sinfo && sinfo.author == author) { + // fix user's counter, works on page refresh or if user closes browser window and then rejoins + sessioninfos[roomClients[i].id] = {}; + roomClients[i].leave(padIds.padId); + roomClients[i].json.send({disconnect:"userdup"}); } } @@ -988,15 +910,6 @@ function handleClientReady(client, message) sessioninfos[client.id].readOnlyPadId = padIds.readOnlyPadId; sessioninfos[client.id].readonly = padIds.readonly; - //check if there is already a pad2sessions entry, if not, create one - if(!pad2sessions[padIds.padId]) - { - pad2sessions[padIds.padId] = []; - } - - //Saves in pad2sessions that this session belongs to this pad - pad2sessions[padIds.padId].push(client.id); - //If this is a reconnect, we don't have to send the client the ClientVars again if(message.reconnect == true) { @@ -1044,7 +957,7 @@ function handleClientReady(client, message) // tell the client the number of the latest chat-message, which will be // used to request the latest 100 chat-messages later (GET_CHAT_MESSAGES) "chatHead": pad.chatHead, - "numConnectedUsers": pad2sessions[padIds.padId].length, + "numConnectedUsers": roomClients.length, "isProPad": false, "readOnlyId": padIds.readOnlyPadId, "readonly": padIds.readonly, @@ -1080,6 +993,8 @@ function handleClientReady(client, message) } }); + //Join the pad and start receiving updates + client.join(padIds.padId); //Send the clientVars to the Client client.json.send({type: "CLIENT_VARS", data: clientVars}); //Save the current revision in sessioninfos, should be the same as in clientVars @@ -1108,74 +1023,56 @@ function handleClientReady(client, message) { messageToTheOtherUsers.data.userInfo.name = authorName; } + + // notify all existing users about new user + client.broadcast.to(padIds.padIds).json.send(messageToTheOtherUsers); //Run trough all sessions of this pad - async.forEach(pad2sessions[padIds.padId], function(sessionID, callback) + async.forEach(socketio.sockets.clients(padIds.padId), function(roomClient, callback) { - var author, socket, sessionAuthorName, sessionAuthorColorId; + var author; + + //Jump over, if this session is the connection session + if(roomClient.id == client.id) + return callback(); + //Since sessioninfos might change while being enumerated, check if the //sessionID is still assigned to a valid session - if(sessioninfos[sessionID] !== undefined && - socketio.sockets.sockets[sessionID] !== undefined){ - author = sessioninfos[sessionID].author; - socket = socketio.sockets.sockets[sessionID]; - }else { - // If the sessionID is not valid, callback(); - callback(); - return; - } - async.series([ + if(sessioninfos[roomClient.id] !== undefined) + author = sessioninfos[roomClient.id].author; + else // If the client id is not valid, callback(); + return callback(); + + async.waterfall([ //get the authorname & colorId function(callback) { - async.parallel([ - function(callback) - { - authorManager.getAuthorColorId(author, function(err, value) - { - if(ERR(err, callback)) return; - sessionAuthorColorId = value; - callback(); - }) - }, - function(callback) - { - authorManager.getAuthorName(author, function(err, value) - { - if(ERR(err, callback)) return; - sessionAuthorName = value; - callback(); - }) - } - ],callback); + // reuse previously created cache of author's data + if(historicalAuthorData[author]) + callback(null, historicalAuthorData[author]); + else + authorManager.getAuthor(author, callback); }, - function (callback) + function (authorInfo, callback) { - //Jump over, if this session is the connection session - if(sessionID != client.id) - { - //Send this Session the Notification about the new user - socket.json.send(messageToTheOtherUsers); - - //Send the new User a Notification about this other user - var messageToNotifyTheClientAboutTheOthers = { - "type": "COLLABROOM", - "data": { - type: "USER_NEWINFO", - userInfo: { - "ip": "127.0.0.1", - "colorId": sessionAuthorColorId, - "name": sessionAuthorName, - "userAgent": "Anonymous", - "userId": author - } + //Send the new User a Notification about this other user + var msg = { + "type": "COLLABROOM", + "data": { + type: "USER_NEWINFO", + userInfo: { + "ip": "127.0.0.1", + "colorId": authorInfo.colorId, + "name": authorInfo.name, + "userAgent": "Anonymous", + "userId": author } - }; - client.json.send(messageToNotifyTheClientAboutTheOthers); - } + } + }; + client.json.send(msg); } - ], callback); + ], callback); }, callback); } ],function(err) @@ -1521,33 +1418,30 @@ function composePadChangesets(padId, startNum, endNum, callback) * Get the number of users in a pad */ exports.padUsersCount = function (padID, callback) { - if (!pad2sessions[padID] || typeof pad2sessions[padID] != typeof []) { - callback(null, {padUsersCount: 0}); - } else { - callback(null, {padUsersCount: pad2sessions[padID].length}); - } + callback(null, { + padUsersCount: socketio.sockets.clients(padId).length + }); } /** * Get the list of users in a pad */ exports.padUsers = function (padID, callback) { - if (!pad2sessions[padID] || typeof pad2sessions[padID] != typeof []) { - callback(null, {padUsers: []}); - } else { - var authors = []; - for ( var ix in sessioninfos ) { - if ( sessioninfos[ix].padId !== padID ) { - continue; - } - var aid = sessioninfos[ix].author; - authorManager.getAuthor( aid, function ( err, author ) { - author.id = aid; - authors.push( author ); - if ( authors.length === pad2sessions[padID].length ) { - callback(null, {padUsers: authors}); - } - } ); + var result = []; + + async.forEach(socketio.sockets.clients(padId), function(roomClient, callback) { + var s = sessioninfos[roomClient.id]; + if(s) { + authorManager.getAuthor(s.author, function(err, author) { + if(ERR(err, callback)) return; + + author.id = s.author; + result.push(author); + }); } - } + }, function(err) { + if(ERR(err, callback)) return; + + callback(null, {padUsers: result}); + }); }