Blame view

Coordinator.py 3.73 KB
2382daaac   Thanasis Naskos   first commit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
  #!/usr/bin/env python
   
  from Deamon import Daemon
  import sys, os, time, logging
  import GanetiCluster, GstarCluster
  
  
  class MyDaemon(Daemon):
      
          def run(self):
              ## Install logger
              LOG_FILENAME = self.utils.install_dir+'/'+self.utils.logs_folder_name+'/Coordinator.log'
              self.my_logger = logging.getLogger('Coordinator')
              self.my_logger.setLevel(self.utils.logging_level)
              
              handler = logging.handlers.RotatingFileHandler(
                            LOG_FILENAME, maxBytes=2*1024*1024, backupCount=5)
              formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s")
              handler.setFormatter(formatter)
              self.my_logger.addHandler(handler)
  
              ## Return the environment with which the daemon is run
              self.my_logger.debug(os.environ)
              self.my_logger.debug(self.utils.bucket_name)
              
  	    self.eucacluster, self.nosqlCluster, self.masterNode = self.initializeGstarCluster()
  
  	    
              self.my_logger.debug("OK! Cluster is ready! Master is: " + self.masterNode)
  	    self.my_logger.debug("datasets dir: \"/media/datasets/.\"")
  
  
          def reload(self):
              ''' Replaces the decision maker to reload the configuration properties '''
              return None
  
  
  	def initializeGstarCluster(self):
  
              # Assume running when eucarc sourced 
  	    if self.utils.cloud_api_type == "ganeti":
                  eucacluster = GanetiCluster.GanetiCluster(self.properties_file)
                  self.my_logger.debug("Created GanetiCluster")
              
  #	    if self.utils.reconfigure:   
  #            	instances = eucacluster.describe_instances()
  #            	self.my_logger.debug("All user instances:" + str(instances))
              
              ## creates a new cluster
              nosqlcluster=None
              
  	    if self.utils.cluster_type == "GSTAR":
                  nosqlcluster = GstarCluster.GstarCluster(self.properties_file, self.utils.cluster_name)
  
              instances = []
  	    if self.utils.reconfigure:
  		instances = self.utils.query_instance_db("running")
  	    else:
  		    self.my_logger.debug("Removing previous instance of cluster from db")
  		    self.utils.delete_cluster_from_db(self.utils.cluster_name)
  		    self.my_logger.debug("Launching new instances")
  		    instances = eucacluster.run_instances(self.utils.initial_cluster_size, self.utils.bucket_name, self.utils.disk, self.utils.cpu, self.utils.ram)
  		    self.my_logger.debug("Launched new instances: " + str(instances))
  		    instances = eucacluster.block_until_running(instances)
  	    self.my_logger.debug("Running instances: " + str(instances))
6a9412d84   Thanasis Naskos   fixing ip less th...
66
67
  	    self.my_logger.debug("Waiting 5mins before configuring")
  	    time.sleep(300)
2382daaac   Thanasis Naskos   first commit
68
69
              self.my_logger.debug(nosqlcluster.configure_cluster(instances, self.utils.hostname_template))
  	    nosqlcluster.start_cluster()
1f630f3b1   Thanasis Naskos   bug fixes
70
71
              self.my_logger.debug("Cluster configured... sleep for 10secs")
              time.sleep(10)
2382daaac   Thanasis Naskos   first commit
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
              return eucacluster, nosqlcluster, instances[0].ip_address
  
  		
  if __name__ == "__main__":
  	if len(sys.argv) == 2:
          	daemon = MyDaemon('/tmp/hadoop_coordinator', 'Coordinator.properties')
                  if 'start' == sys.argv[1]:
                          daemon.start()
                  elif 'stop' == sys.argv[1]:
                          daemon.stop()
                  elif 'restart' == sys.argv[1]:
                          daemon.restart()
                  elif 'reload' == sys.argv[1]:
                          daemon.reload()
                  else:
                          print "Unknown command"
                          sys.exit(2)
                  sys.exit(0)
          else:
                  print "usage: %s start|stop|restart" % sys.argv[0]
                  sys.exit(2)