#!/usr/bin/env python
from Deamon import Daemon
import sys, os, time, logging
import GanetiCluster, GstarCluster
class MyDaemon(Daemon):
def run(self):
## Install logger
LOG_FILENAME = self.utils.install_dir+'/'+self.utils.logs_folder_name+'/Coordinator.log'
self.my_logger = logging.getLogger('Coordinator')
self.my_logger.setLevel(self.utils.logging_level)
handler = logging.handlers.RotatingFileHandler(
LOG_FILENAME, maxBytes=2*1024*1024, backupCount=5)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s")
handler.setFormatter(formatter)
self.my_logger.addHandler(handler)
## Return the environment with which the daemon is run
self.my_logger.debug(os.environ)
self.my_logger.debug(self.utils.bucket_name)
self.eucacluster, self.nosqlCluster, self.masterNode = self.initializeGstarCluster()
self.my_logger.debug("OK! Cluster is ready! Master is: " + self.masterNode)
self.my_logger.debug("datasets dir: \"/media/datasets/.\"")
def reload(self):
''' Replaces the decision maker to reload the configuration properties '''
return None
def initializeGstarCluster(self):
# Assume running when eucarc sourced
if self.utils.cloud_api_type == "ganeti":
eucacluster = GanetiCluster.GanetiCluster(self.properties_file)
self.my_logger.debug("Created GanetiCluster")
# if self.utils.reconfigure:
# instances = eucacluster.describe_instances()
# self.my_logger.debug("All user instances:" + str(instances))
## creates a new cluster
nosqlcluster=None
if self.utils.cluster_type == "GSTAR":
nosqlcluster = GstarCluster.GstarCluster(self.properties_file, self.utils.cluster_name)
instances = []
if self.utils.reconfigure:
instances = self.utils.query_instance_db("running")
else:
self.my_logger.debug("Removing previous instance of cluster from db")
self.utils.delete_cluster_from_db(self.utils.cluster_name)
self.my_logger.debug("Launching new instances")
instances = eucacluster.run_instances(self.utils.initial_cluster_size, self.utils.bucket_name, self.utils.disk, self.utils.cpu, self.utils.ram)
self.my_logger.debug("Launched new instances: " + str(instances))
instances = eucacluster.block_until_running(instances)
self.my_logger.debug("Running instances: " + str(instances))
self.my_logger.debug(nosqlcluster.configure_cluster(instances, self.utils.hostname_template))
nosqlcluster.start_cluster()
self.my_logger.debug("Cluster configured... sleep for 30secs")
time.sleep(30)
return eucacluster, nosqlcluster, instances[0].ip_address
if __name__ == "__main__":
if len(sys.argv) == 2:
daemon = MyDaemon('/tmp/hadoop_coordinator', 'Coordinator.properties')
if 'start' == sys.argv[1]:
daemon.start()
elif 'stop' == sys.argv[1]:
daemon.stop()
elif 'restart' == sys.argv[1]:
daemon.restart()
elif 'reload' == sys.argv[1]:
daemon.reload()
else:
print "Unknown command"
sys.exit(2)
sys.exit(0)
else:
print "usage: %s start|stop|restart" % sys.argv[0]
sys.exit(2)