#!/usr/bin/env python
from Deamon import Daemon
import sys, os, time, logging
import LoadGenerator_emu, CassandraCluster223, CassandraCluster39, CassandraCluster310, GanetiCluster, HBase94Cluster, MonitorVms,ClientsController
from MDP import MDPDecisionMaker
from MDP import MDPDecisionMaker_sim
import datetime, My_Logger;
from threading import Thread
from pysqlite2 import dbapi2 as sqlite
class MyDaemon(Daemon):
def run(self):
## Install logger
LOG_FILENAME = self.utils.install_dir+'/'+self.utils.logs_folder_name+'/Coordinator.log'
self.my_logger = logging.getLogger('Coordinator')
self.my_logger.setLevel(self.utils.logging_level)
handler = logging.handlers.RotatingFileHandler(
LOG_FILENAME, maxBytes=2*1024*1024, backupCount=5)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s")
handler.setFormatter(formatter)
self.my_logger.addHandler(handler)
## Return the environment with which the daemon is run
self.my_logger.debug(os.environ)
self.my_logger.debug(self.utils.bucket_name)
self.simulationTimeSecs = 0;
self.begin_of_simulation = 0;
con = sqlite.connect(self.utils.db_file)
cur = con.cursor()
cur.execute('drop table if exists exp')
con.commit()
cur.execute('create table exp(id int, simDate text, simCount int, startDate text, removedVMs int, cost_before_remove real, inlambda real, latency real, throughput real, cpu real)')
con.commit()
cur.close()
if not self.utils.simulation:
## Initialize the nosql cluster (default HBase cluster). Runs two instances, copies all
## necessary configuration files, starts all the daemons and returns the high level nosql
## cluster object.
self.eucacluster, self.nosqlCluster = self.initializeNosqlCluster()
## Initialize monitoring for all nodes in the cluster
self.vmMonitor = MonitorVms.MonitorVms(self.properties_file, self.nosqlCluster.cluster)
## Collect and print out all metrics for the cluster.
allmetrics=self.vmMonitor.refreshMetrics()
#print "allmetrics: ", allmetrics
if self.utils.mdp:
self.decisionMaker = MDPDecisionMaker.MDPDecisionMaker(self.properties_file, self.eucacluster, self.nosqlCluster, self.vmMonitor)
self.my_logger.debug("Initializing Clients")
self.clientsController = ClientsController.ClientsController(self.properties_file, self.decisionMaker)
self.clientsController.initialize()
self.clientsController.load()
self.my_logger.debug("DB Loaded! Start running experiments")
# Uncomment for Stabilization test
self.clientsController.run(1)
else:
if self.utils.mdp:
self.decisionMaker = MDPDecisionMaker_sim.MDPDecisionMaker_sim(self.properties_file)
if self.utils.training_set_name == 'okeanos':
self.load_generator = LoadGenerator_emu.LoadGenerator(int(self.utils.meas_count), self.utils.inlambda_file_path, self.utils.load_meas_log, eval(self.utils.new_inlambda),True)
self.my_logger.debug("OK, start taking decisions!")
i = 0;
self.custom_action = "no_op"
self.begin_of_simulation = datetime.datetime.now();
# Uncomment for stabilization test
# workerThread = Thread(target = self.cmdPrompt)
# workerThread.start()
con = sqlite.connect(self.utils.db_file)
cur = con.cursor()
cur.execute(""" insert into exp(id,simDate,simCount,startDate,removedVMs,cost_before_remove,inlambda,latency,throughput) values(?,?,?,?,?,?,?,?,?) """, (1,"",0,self.begin_of_simulation.strftime("%b %d %Y %I:%M:%S%p"),0,0.0,0.0,0.0,0.0))
con.commit()
cur.close()
while True:
if not self.utils.simulation:
time.sleep(30)
# to time.sleep sets the time interval until coordinator checks ganglia stats again
# run ganglia update stats
allmetrics = self.vmMonitor.refreshMetrics()
if(self.clientsController.finished):
break
else:
#time.sleep(0.5)
if i >= int(self.utils.meas_count):
break
allmetrics = self.load_generator.get_meas(self.decisionMaker.currentState)
self.simulationTimeSecs = self.simulationTimeSecs + 30;
self.simDatetime = self.begin_of_simulation + datetime.timedelta(0,self.simulationTimeSecs)
con = sqlite.connect(self.utils.db_file)
cur = con.cursor()
cur.execute(""" update exp set simDate=?,simCount=?,inlambda=?,latency=?,throughput=? where id=? """, (self.simDatetime.strftime("%b %d %Y %I:%M:%S%p"),i,allmetrics['inlambda'],allmetrics['latency'],allmetrics['throughput'],1))
con.commit()
cur.close()
start = time.time()
if not self.utils.simulation:
if(self.custom_action != 'no_op'):
self.decisionMaker.takeDecision(allmetrics,self.custom_action)
self.custom_action = "no_op"
else:
self.decisionMaker.takeDecision(allmetrics)
else:
self.decisionMaker.takeDecision(allmetrics,self.load_generator,self.simDatetime,i)
end = time.time()
self.my_logger.debug("Time to decide: " + str(end - start))
i = i + 1
if not self.utils.simulation:
self.total_cost = self.decisionMaker.polManager.cost_before_remove + self.utils.get_the_cost_of_all(datetime.datetime.now())
self.total_spawned_vms = self.decisionMaker.polManager.removed_vms_cnt + self.utils.get_total_num_vms()
self.decisionMaker.experiment_log.log_cost("The total cost of your experiment is: " + str(self.total_cost) + " euros")
self.decisionMaker.experiment_log.log_cost("Total spawned VMs: " + str(self.total_spawned_vms))
self.decisionMaker.experiment_log.log_cost("Total running time: " + str(self.utils.get_running_time(time.mktime(self.begin_of_simulation.timetuple()), time.mktime(datetime.datetime.now().timetuple()))))
else:
self.total_cost = self.decisionMaker.cost_before_remove + self.utils.get_the_cost_of_all(self.simDatetime)
self.total_spawned_vms = self.decisionMaker.removed_vms_cnt + self.utils.get_total_num_vms()
self.decisionMaker.experiment_log.log_cost("The total cost of your experiment is: " + str(self.total_cost) + " euros")
self.decisionMaker.experiment_log.log_cost("Total spawned VMs: " + str(self.total_spawned_vms))
self.decisionMaker.experiment_log.log_cost("Total running time: " + str(self.utils.get_running_time(time.mktime(self.begin_of_simulation.timetuple()), time.mktime(self.simDatetime.timetuple()))))
def cmdPrompt(self):
while True:
command = raw_input("Give command:\n")
if('add' in command or 'rem' in command or 'no_op' in command):
self.custom_action = command
elif('clients_list' in command):
for client in self.utils.getClients():
print str(client.ip_address)
else:
if('stop_all' in command):
self.clientsController.run_command("", command)
elif('start' in command):
cS = command.split(" ")
self.clientsController.run_command(cS[0],cS[1],cS[2])
elif('stop_client' in command):
cS = command.split(" ")
self.clientsController.run_command(cS[0],cS[1])
elif('exit' in command):
self.clientsController.run_command("",command)
exit(0)
else:
print 'unknown command'
def reload(self):
''' Replaces the decision maker to reload the configuration properties '''
return None
def run_experiment(self, clients):
self.clientsController.run_experiment(clients)
def initializeNosqlCluster(self):
# Assume running when eucarc sourced
# if self.utils.cloud_api_type == "EC2":
# eucacluster = EucaCluster.EucaCluster()
# self.my_logger.debug("Created EucalyptusCluster with EC2 API")
# if self.utils.cloud_api_type == "EC2_OS":
# eucacluster = OpenStackCluster.OpenStackCluster()
# self.my_logger.debug("Created OpenStackCluster with EC2 API and public ipv6 dnsname")
if self.utils.cloud_api_type == "okeanos":
eucacluster = OkeanosCluster.OkeanosCluster()
self.my_logger.debug("Created OkeanosCluster")
elif self.utils.cloud_api_type == "ganeti":
eucacluster = GanetiCluster.GanetiCluster(self.properties_file)
self.my_logger.debug("Created GanetiCluster")
instances = eucacluster.describe_instances()
self.my_logger.debug("All user instances:" + str(instances))
## creates a new Hbase cluster
nosqlcluster=None
# if self.utils.cluster_type == "HBASE":
# nosqlcluster = HBaseCluster.HBaseCluster(self.utils.cluster_name)
# if self.utils.cluster_type == "HBASE92":
# nosqlcluster = HBase92Cluster.HBase92Cluster(self.utils.cluster_name)
if self.utils.cluster_type == "HBASE94":
nosqlcluster = HBase94Cluster.HBase94Cluster(self.properties_file, self.utils.cluster_name)
# if self.utils.cluster_type == "VOLDEMORT":
# nosqlcluster = VoldemortCluster.VoldemortCluster(self.utils.cluster_name)
# if self.utils.cluster_type == "CASSANDRA":
# nosqlcluster = CassandraCluster.CassandraCluster(self.utils.cluster_name)
elif self.utils.cluster_type == "CASSANDRA223":
nosqlcluster = CassandraCluster223.CassandraCluster223(self.properties_file, self.utils.cluster_name)
elif self.utils.cluster_type == "CASSANDRA39":
nosqlcluster = CassandraCluster39.CassandraCluster39(self.properties_file, self.utils.cluster_name)
elif self.utils.cluster_type == "CASSANDRA310":
nosqlcluster = CassandraCluster310.CassandraCluster310(self.properties_file, self.utils.cluster_name)
# if self.utils.cluster_type == "RIAK":
# nosqlcluster = RiakCluster.RiakCluster(self.utils.cluster_name)
instances = []
if not eval(self.utils.reconfigure):
self.my_logger.debug("Removing previous instance of cluster from db")
self.utils.delete_cluster_from_db(self.utils.cluster_name)
instances = eucacluster.run_instances(self.utils.initial_cluster_size, self.utils.bucket_name, self.utils.disk, self.utils.cpu, self.utils.ram, tagging=False)
self.my_logger.debug("Launched new instances: " + str(instances))
if self.utils.cloud_api_type == "okeanos":
instances = eucacluster.block_until_running(instances,'BUILD')
instances = eucacluster.connect_to_private_network(instances, True)
instances = eucacluster.block_until_running(instances,'REBOOT')
else:
instances = eucacluster.block_until_running(instances)
self.my_logger.debug("Running instances: " + str(instances))
else:
instances.append(nosqlcluster.cluster[nosqlcluster.host_template+"master"])
for i in range(1,len(nosqlcluster.cluster)):
instances.append(nosqlcluster.cluster[nosqlcluster.host_template+str(i)])
self.my_logger.debug("Found old instances: " + str(instances))
self.my_logger.debug("WARNING: Will block forever if they are not running.")
if self.utils.cloud_api_type == "okeanos":
instances = eucacluster.block_until_running(instances,'BUILD')
else:
instances = eucacluster.block_until_running(instances)
self.my_logger.debug("Running instances: " + str(instances))
time.sleep(30)
self.my_logger.debug(nosqlcluster.configure_cluster(instances, self.utils.hostname_template, eval(self.utils.reconfigure)))
self.my_logger.debug("Cluster configured... sleep for a minute")
time.sleep(30)
nosqlcluster.start_cluster(instances)
self.my_logger.debug("Cluster is running... sleep for 1/2 minute")
time.sleep(30)
#nosqlcluster.start_cluster()
return eucacluster, nosqlcluster
if __name__ == "__main__":
if len(sys.argv) == 4:
daemon = MyDaemon('/tmp/'+sys.argv[2], sys.argv[3])
if 'start' == sys.argv[1]:
daemon.start()
elif 'stop' == sys.argv[1]:
daemon.stop()
elif 'restart' == sys.argv[1]:
daemon.restart()
elif 'reload' == sys.argv[1]:
daemon.reload()
else:
print "Unknown command"
sys.exit(2)
sys.exit(0)
else:
print "usage: %s start|stop|restart" % sys.argv[0]
sys.exit(2)