Coordinator.py 3.65 KB
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
#!/usr/bin/env python
from Deamon import Daemon
import sys, os, time, logging
import GanetiCluster, GstarCluster


class MyDaemon(Daemon):
def run(self):
## Install logger
LOG_FILENAME = self.utils.install_dir+'/'+self.utils.logs_folder_name+'/Coordinator.log'
self.my_logger = logging.getLogger('Coordinator')
self.my_logger.setLevel(self.utils.logging_level)
handler = logging.handlers.RotatingFileHandler(
LOG_FILENAME, maxBytes=2*1024*1024, backupCount=5)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s")
handler.setFormatter(formatter)
self.my_logger.addHandler(handler)

## Return the environment with which the daemon is run
self.my_logger.debug(os.environ)
self.my_logger.debug(self.utils.bucket_name)
self.eucacluster, self.nosqlCluster, self.masterNode = self.initializeGstarCluster()

self.my_logger.debug("OK! Cluster is ready! Master is: " + self.masterNode)
self.my_logger.debug("datasets dir: \"/media/datasets/.\"")


def reload(self):
''' Replaces the decision maker to reload the configuration properties '''
return None


def initializeGstarCluster(self):

# Assume running when eucarc sourced
if self.utils.cloud_api_type == "ganeti":
eucacluster = GanetiCluster.GanetiCluster(self.properties_file)
self.my_logger.debug("Created GanetiCluster")
# if self.utils.reconfigure:
# instances = eucacluster.describe_instances()
# self.my_logger.debug("All user instances:" + str(instances))
## creates a new cluster
nosqlcluster=None
if self.utils.cluster_type == "GSTAR":
nosqlcluster = GstarCluster.GstarCluster(self.properties_file, self.utils.cluster_name)

instances = []
if self.utils.reconfigure:
instances = self.utils.query_instance_db("running")
else:
self.my_logger.debug("Removing previous instance of cluster from db")
self.utils.delete_cluster_from_db(self.utils.cluster_name)
self.my_logger.debug("Launching new instances")
instances = eucacluster.run_instances(self.utils.initial_cluster_size, self.utils.bucket_name, self.utils.disk, self.utils.cpu, self.utils.ram)
self.my_logger.debug("Launched new instances: " + str(instances))
instances = eucacluster.block_until_running(instances)
self.my_logger.debug("Running instances: " + str(instances))

self.my_logger.debug(nosqlcluster.configure_cluster(instances, self.utils.hostname_template))
nosqlcluster.start_cluster()
self.my_logger.debug("Cluster configured... sleep for 10secs")
time.sleep(10)
return eucacluster, nosqlcluster, instances[0].ip_address

if __name__ == "__main__":
if len(sys.argv) == 2:
daemon = MyDaemon('/tmp/hadoop_coordinator', 'Coordinator.properties')
if 'start' == sys.argv[1]:
daemon.start()
elif 'stop' == sys.argv[1]:
daemon.stop()
elif 'restart' == sys.argv[1]:
daemon.restart()
elif 'reload' == sys.argv[1]:
daemon.reload()
else:
print "Unknown command"
sys.exit(2)
sys.exit(0)
else:
print "usage: %s start|stop|restart" % sys.argv[0]
sys.exit(2)