Source: socketcomm.js

var io = require('socket.io')();
var config=require('../config/config.js');
var CommunicationEngine=require('./communicationengine.js');
var ExecutionEngine=require('./executionengine.js');
var _=require('lodash');
var async=require('async');
var ipaddr = require('ipaddr.js');
var winston = require('winston');
var portastic = require('portastic');
var eventEmitter = require('events').EventEmitter;
var util = require('util');
var socketClient=require('socket.io-client');
/**
 * Default Constructor that initializes the Communicator
 * @class
 * @classdesc
 * @extends EventEmitter
 * A global singleton object that handles all network communication with the clients
 */
function SocketCommunicator() {
	eventEmitter.call(this);
	SocketCommunicator.socketCommunicator = this;
	this.connectedClients = [];
	this.serversConnectedTo = [];
	this.server = null;
	this.initalizeServer();
}
util.inherits(SocketCommunicator, eventEmitter);
/**
 * Function that initializes the server to listen for connection on the specified port
 * and starts listening for connections
 * @function
 */
SocketCommunicator.prototype.initalizeServer=function(){
	var that = this;
	io.on('connection', function(socket) {
		var socketAddress = ipaddr.process(socket.handshake.address).octets.join('.');
		winston.info('a client has connected');
		winston.info(socketAddress);
		var communicationEngine = new CommunicationEngine(socket, ExecutionEngine.getExecutionEngine());
		var oldIndex = _.findIndex(that.connectedClients, {address:socketAddress});
		if(oldIndex > -1)
			that.connectedClients[oldIndex] = {address:socketAddress, engine:communicationEngine, socket:socket};
		else
			that.connectedClients.push({address:socketAddress, engine:communicationEngine, socket:socket});
		socket.on('disconnect', function() {
			winston.info('a user is disconnected');
			var index = _.findIndex(that.connectedClients, {address:socketAddress});
			if(index&&index > -1)
				that.connectedClients.splice(index, 1);
		});
	});
	var that = this;
	portastic.test(config.port).then(function(isOpen) {
		if (isOpen) {
			that.server=io.listen(config.port);
			winston.info('server listening at '+config.port);
			that.emit('server initialized');
		}
		else {
			winston.warn('WARNING: port occupied. Did not start server. Either choose a different port, or ignore if port is being ' +
				'used by node-parallel-task run as a daemon')
			that.emit('server initialized');
		}
	});
};
/**
 * Function that connects to the remote servers found on the local area network
 * @function
 * @param  {Array}   hostArray  Clients to connect to.
 * @param  {Function} finalCallback Function to be called after all connections are done.
 */
SocketCommunicator.prototype.connectTo = function(hostArray, finalCallback) {
	var comm = this;
	async.each(hostArray, function(host, innerCallback) {
		var currentSocket = socketClient('http://'+host+':'+config.port);
		currentSocket.on('connect', function() {
			winston.info('connected to server at '+host);
			var communicationEngine = new CommunicationEngine(currentSocket, ExecutionEngine.getExecutionEngine());
			comm.serversConnectedTo.push({address:host, engine:communicationEngine, socket:currentSocket});
			innerCallback();
		});
		currentSocket.on('disconnect', function() {
			var index = _.findIndex(comm.serversConnectedTo, {address:currentSocket.io.host});
			if(index&&index > -1)
				comm.serversConnectedTo.splice(index, 1);
		});
	},finalCallback);
};

/**
 * Function that emits the specified event and data to all sockets. If an array of sockets are
 * given, then the event is emitted to only those clients
 * @function
 * @param  {String}   event    The name of the event
 * @param  {JSON}   data     Data to be send
 * @param  {Array}   sockets  Clients to emit event to (if applicable)
 * @param  {Function} callback Called after all events are emitted
 */
SocketCommunicator.prototype.emitEvent = function(event, data, sockets, callback) {
	if(!sockets) {
		var completeArray = _.flatten([this.connectedClients, this.serversConnectedTo]);
		completeArray = _.uniq(completeArray, 'address');
		async.each(completeArray, function(clientObj, innerCallback) {
			clientObj.socket.emit(event, data);
			innerCallback();
		}, callback);
	}
	else {
		async.each(sockets, function(sock, innerCallback) {
			clientObj.socket.emit(event, data);
			innerCallback();
		}, callback);
	}
};
/**
 * Static Function to get the singleton SocketCommunicator
 * @function
 */
SocketCommunicator.getSocketCommunicator = function() {
	if(this.socketCommunicator)
		return this.socketCommunicator;
	else return new SocketCommunicator();
};

module.exports=SocketCommunicator;