Blame view
Coordinator.py
3.65 KB
2382daaac first commit |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
#!/usr/bin/env python from Deamon import Daemon import sys, os, time, logging import GanetiCluster, GstarCluster class MyDaemon(Daemon): def run(self): ## Install logger LOG_FILENAME = self.utils.install_dir+'/'+self.utils.logs_folder_name+'/Coordinator.log' self.my_logger = logging.getLogger('Coordinator') self.my_logger.setLevel(self.utils.logging_level) handler = logging.handlers.RotatingFileHandler( LOG_FILENAME, maxBytes=2*1024*1024, backupCount=5) formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s") handler.setFormatter(formatter) self.my_logger.addHandler(handler) ## Return the environment with which the daemon is run self.my_logger.debug(os.environ) self.my_logger.debug(self.utils.bucket_name) self.eucacluster, self.nosqlCluster, self.masterNode = self.initializeGstarCluster() self.my_logger.debug("OK! Cluster is ready! Master is: " + self.masterNode) self.my_logger.debug("datasets dir: \"/media/datasets/.\"") def reload(self): ''' Replaces the decision maker to reload the configuration properties ''' return None def initializeGstarCluster(self): # Assume running when eucarc sourced if self.utils.cloud_api_type == "ganeti": eucacluster = GanetiCluster.GanetiCluster(self.properties_file) self.my_logger.debug("Created GanetiCluster") # if self.utils.reconfigure: # instances = eucacluster.describe_instances() # self.my_logger.debug("All user instances:" + str(instances)) ## creates a new cluster nosqlcluster=None if self.utils.cluster_type == "GSTAR": nosqlcluster = GstarCluster.GstarCluster(self.properties_file, self.utils.cluster_name) instances = [] if self.utils.reconfigure: instances = self.utils.query_instance_db("running") else: self.my_logger.debug("Removing previous instance of cluster from db") self.utils.delete_cluster_from_db(self.utils.cluster_name) self.my_logger.debug("Launching new instances") instances = eucacluster.run_instances(self.utils.initial_cluster_size, self.utils.bucket_name, self.utils.disk, self.utils.cpu, self.utils.ram) self.my_logger.debug("Launched new instances: " + str(instances)) instances = eucacluster.block_until_running(instances) self.my_logger.debug("Running instances: " + str(instances)) self.my_logger.debug(nosqlcluster.configure_cluster(instances, self.utils.hostname_template)) nosqlcluster.start_cluster() |
1f630f3b1 bug fixes |
69 70 |
self.my_logger.debug("Cluster configured... sleep for 10secs") time.sleep(10) |
2382daaac first commit |
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
return eucacluster, nosqlcluster, instances[0].ip_address if __name__ == "__main__": if len(sys.argv) == 2: daemon = MyDaemon('/tmp/hadoop_coordinator', 'Coordinator.properties') if 'start' == sys.argv[1]: daemon.start() elif 'stop' == sys.argv[1]: daemon.stop() elif 'restart' == sys.argv[1]: daemon.restart() elif 'reload' == sys.argv[1]: daemon.reload() else: print "Unknown command" sys.exit(2) sys.exit(0) else: print "usage: %s start|stop|restart" % sys.argv[0] sys.exit(2) |