Source: executionengine.js

var Sandbox=require('./sandbox.js');
var config=require('../config/config.js');
var eventEmitter=require('events').EventEmitter;
var async=require('async');
var _=require('lodash');
var winston = require('winston');
var StreamLogger = require('./streamlogger.js')


// var statuses=['idle','awaitingOrders','initializingEnvironment','processing','aggregatingResults'];
// var clientEvents=['requestResource','readyEnvironment','startExecution'];
// var serverEvents=['resourceAvailable','environmentReady','executionDone'];
// var statuses=['idle','awaitingReplies',]
/**
 * Constructor that sets up the Execution engine and initializes it
 * @class
 * @requires Sandbox
 * @classdesc
 * ExecutionEngine is a Singleton that manages the clients executing the functions.
 * It makes sure that the environment/stubs are created and ready for the required functions to be executed
 * @constructor
 * @param {SocketCommunicator} socketCommunicator The Global Singleton SocketCommunicator for this instance
 */

function ExecutionEngine(socketCommunicator) {
	ExecutionEngine.executionEngine = this;
	this.sandbox = new Sandbox(new StreamLogger(socketCommunicator));
	this.status = 'idle';
	this.resources = [];
	this.resourcesEnvironmentInitialized = [];
	this.resourcesExecutionDone = [];
	this.startTime = null;
	this.array = [];
	this.err = null;
	this.results = [];
	this.defaultCallback = this.currentCallback = function() {
		winston.info('default callback called. WARNING.');
	};
	this.socketCommunicator = socketCommunicator;

}
/**
 * @function
 * @inner
 * Function that runs after a timeout. Is used to collect the resources that are available and are willing to
 * ready to execute functions on them. This function waits for a timeout and only the machines which respond
 * are picked. All these arguments are required because this function runs on a setTimeout
 * @param  {Array} array              The whole array that needs to be shared
 * @param  {Array} resources          The current set of resources
 * @param  {Sandbox} sandbox            The current Sandbox that is being used
 * @param  {SocketCommunicator} socketCommunicator The global singleton SocketCommunicator of this instance
 * @param  {ExecutionEngine} currentEngine      The self ExecutionEngine for making changes
 */
var onResourceAcquisitionTimeout=function(array, resources, sandbox, socketCommunicator, currentEngine) {
	//we gotta move on. The current resources are all the resources available.
	if (resources.length != 0) {
		var step = Math.floor(array.length/resources.length);
		var lastStep = 0;
		var commonData = sandbox.getSandboxEmitData();
		resources.forEach(function(resource) {
			var data = _.clone(commonData);
			data.array = array.slice(lastStep, lastStep+step);
			lastStep+=step;
			data['arrn'] = []
			if (typeof(data.array[0]) == 'function') {
				var i = 0;
				for (i = 0; i < data.array.length; i++){
					var func = "func" + i.toString();
					var f = data.array[i].toString();
					data['arrn'].push(f);
				}
			}
			resource.emitEnvironmentData(data);
		});
	}
	winston.info('got the resources. There are ' + resources.length);
	if (resources.length == 0) {
		winston.info('there are no resources. Performing function locally');
		ExecutionEngine.executeFunctionLocally(array, sandbox, currentEngine);
	}
}
/**
 * This does the mapReduce capabilities of the system. Usually, these are the functions within the async callbacks.
 * @param  {JSON} array The array on which the map has to be executed
 * @param  {function} map  Function to map the stuff that needs to be done in each client
 * @param  {function} reduce  The reduce function to collate the results
 * @param  {object} options Contains options like the scope of the closure example {scope:{testVariable:1}} default scope is {}
 */
ExecutionEngine.prototype.executeMapReduce = function(array, map, reduce, options, callback) {
	if (!options) {
		options = {
			scope: {}
		};
	}
	if(this.status == 'idle') {
		this.status = 'running';
		this.resetExecutionEngine();
		this.sandbox.initializeSandbox(array, map, reduce, options.scope);
		this.array = array;
		this.options = options;
		this.startTime = new Date().getTime();
		this.socketCommunicator.emitEvent('requestResource');
		this.currentCallback = callback;
		var that = this;
		setTimeout(function() {
			onResourceAcquisitionTimeout(that.array, that.resources, that.sandbox, that.socketCommunicator, that);
		}, config.resourceAcquisitionTimeout);
	}
};
/**
 * Executes the Spawn functionality
 * @param  {function} map  Function to map the stuff that needs to be done in each client
 * @param  {function} reduce  The reduce function to collate the results
 * @param  {JSON} options Contains options like the scope of the closure example {scope:{testVariable:1}} default scope is {}
 * @param  {function} array The function to be called after completion of execution
 */
ExecutionEngine.prototype.executeBroadcast = function(map, reduce, options, callback) {
	if (!options) {
		options = {
			scope: {}
		};
	}
	if (this.status == 'idle') {
		this.status = 'running';
		this.resetExecutionEngine();
		this.sandbox.initializeParallelSandbox(map, reduce, options.scope);
		this.options = options;
		this.startTime = new Date().getTime();
		this.socketCommunicator.emitEvent('requestResource');
		this.currentCallback = callback;
		var that = this;
		setTimeout(function() {
			onResourceAcquisitionTimeout([], that.resources, that.sandbox, that.socketCommunicator, that);
		}, config.resourceAcquisitionTimeout);
	}
};
/**
 * Executes the Parallel functionality
 * @param  {Array} array The array of functions to be executed in parallel
 * @param  {function} map  Function to map the stuff that needs to be done in each client
 * @param  {function} reduce  The reduce function to collate the results
 * @param  {JSON} options Contains options like the scope of the closure example {scope:{testVariable:1}} default scope is {}
 * @param  {function} array The function to be called after completion of execution
 */
ExecutionEngine.prototype.executeParallel = function(array, map, reduce, options, callback) {
	if (!options) {
		options = {
			scope: {}
		};
	}
	if(this.status == 'idle') {
		this.status = 'running';
		this.resetExecutionEngine();
		this.sandbox.initializeSandbox(array, map, reduce, options.scope);
		this.array = array;
		this.options = options;
		this.startTime = new Date().getTime();
		this.socketCommunicator.emitEvent('requestResource');
		this.currentCallback = callback;
		var that = this;

		setTimeout(function() {
			onResourceAcquisitionTimeout(that.array, that.resources, that.sandbox, that.socketCommunicator, that);
		}, config.resourceAcquisitionTimeout);
	}
};
/**
 * Function that adds to available resources if within the resrouce timeout
 * @function
 * @param {CommunicationEngine} socketCommEngine The CommunicationEngine of the socket to add to resources
 */
ExecutionEngine.prototype.addToResources = function(socketCommEngine) {
	if (new Date().getTime()-this.startTime <= config.resourceAcquisitionTimeout)
		this.resources.push(socketCommEngine);
};
/**
 * Function that initializes environment for execution
 * @function
 * @param  {JSON} data          The data used to initialize the Sandbox
 * @param  {Function} finalCallback Callback called once environment is initialized
 */
ExecutionEngine.prototype.initializeEnvironment = function(data, finalCallback) {
	this.sandbox.initializeEnvironment(data, finalCallback);
};
/**
 * Function to trigger the map function execution
 * @function
 * @param  {Function} finalCallback Callback called after map function executes
 */
ExecutionEngine.prototype.executeMapFunction = function(finalCallback) {
	this.sandbox.executeMapFunction(finalCallback);
};
/**
 * Function that is called when a client initializes its Environment. Used to keep track of resources and start execution once they're done
 * @function
 * @param  {CommunicationEngine} socketCommEngine The CommunicationEngine of the socket
 */
ExecutionEngine.prototype.onSocketEnvironmentInitialized = function(socketCommEngine) {
	//check if everyone has initialized their env
	winston.info('one person has initialized environment');
	var ind = _.findIndex(this.resourcesEnvironmentInitialized, {id: socketCommEngine.socket.id});
	if (ind == -1) {
		this.resourcesEnvironmentInitialized.push(socketCommEngine);
	}
	if (this.resources.length == this.resourcesEnvironmentInitialized.length) {
		winston.info('everyone has initialized their environment');
		this.socketCommunicator.emitEvent('startExecution');
	}
};
/**
 * Function that aggregates the results from all the clients
 * @function
 * @param  {CommunicationEngine} socketCommEngine
 * @param  {Error} err              Error returned by client
 * @param  {JSON} results          Results returned by client
 * @param  {Function} finalCallback    Called after results are aggregated
 */
ExecutionEngine.prototype.aggregateResults = function(socketCommEngine,err, results, finalCallback) {
	winston.info('one person has executed map');
	var ind = _.findIndex(this.resourcesExecutionDone, {id: socketCommEngine.socket.id});
	if (ind == -1) {
		this.resourcesExecutionDone.push(socketCommEngine);
		if (err) {
			this.err = err;
		}
		if (results) {
			this.results = this.results.concat(results);
		}
	}
	if(this.resources.length == this.resourcesExecutionDone.length) {
		var that = this;
		//execution is done by all resources and data received. The sandbox will now
		//execute the reduce function, and then call 2 callbacks (in parallel, no dependency)-
		// 1. The callback for the communication engine to do its comm cleanup like set itself to idle
		// 2. The callback to the original function to let it know that mapreduce is done
		this.sandbox.executeReduceFunction(this.err, this.results, function(err, results) {
			finalCallback(err, results);
			var oldCallback = that.currentCallback;
			//resetting the currentCallback to the default one, so that warnings can be logged on wrong call of callback
			winston.info('resetting the callback. MapReduce is done');
			that.currentCallback = that.defaultCallback;
			oldCallback(err);
		});
	}
};
/**
 * Function to reset the Engine
 * @function
 */
ExecutionEngine.prototype.resetExecutionEngine = function() {
	this.status = 'idle';
	this.resources = [];
	this.resourcesEnvironmentInitialized = [];
	this.resourcesExecutionDone = [];
	this.startTime = null;
	this.array = [];
	this.err = null;
	this.results = [];
	this.sandbox.cleanSandbox();
};
/**
 * Function to Execute the function locally if no resources are available
 * @function
 */
ExecutionEngine.executeFunctionLocally = function(array, sandbox, execEngine) {
	async.map(array, sandbox.mapFunction, function(err, results) {
		sandbox.reduceFunction(err, results);
		execEngine.status='idle';
	});
};
/**
 * Getter for the current singleton ExecutionEngine
 * @function
 */
ExecutionEngine.getExecutionEngine = function (socketCommunicator) {
	if (this.executionEngine) {
		return this.executionEngine;
	}
	else {
		return new ExecutionEngine(socketCommunicator);
	}
};


module.exports = ExecutionEngine;