Blame view

Coordinator.py 3.97 KB
f795df3ae   Thanasis Naskos   initial commit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
  #!/usr/bin/env python
   
  from Deamon import Daemon
  import sys, os, time, logging
  import GanetiCluster, HadoopCluster
  
  
  class MyDaemon(Daemon):
      
          def run(self):
             
              ## Install logger
              LOG_FILENAME = self.utils.install_dir+'/'+self.utils.logs_folder_name+'/Coordinator.log'
              self.my_logger = logging.getLogger('Coordinator')
              self.my_logger.setLevel(self.utils.logging_level)
              
              handler = logging.handlers.RotatingFileHandler(
                            LOG_FILENAME, maxBytes=2*1024*1024, backupCount=5)
              formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s")
              handler.setFormatter(formatter)
              self.my_logger.addHandler(handler)
  
              ## Return the environment with which the daemon is run
              self.my_logger.debug(os.environ)
              self.my_logger.debug(self.utils.bucket_name)
              
  	    self.eucacluster, self.nosqlCluster, self.masterNode = self.initializeHadoopCluster()
  
  	    
              self.my_logger.debug("OK! Cluster is ready! Master is: " + self.masterNode)
  	    self.my_logger.debug("make directory datasets: \"hadoop dfs -mkdir /home/hduser/datasets\"")
  	    self.my_logger.debug("upload dataset to hdfs: \"hadoop dfs -put /media/datasets/btj_experiments/cloudSolarAltitude19 /home/hduser/datasets/.\"")
  	    self.my_logger.debug("run the command \"w3m http://localhost:50030\" to monitor the cluster using terminal browser")
  
  
          def reload(self):
              ''' Replaces the decision maker to reload the configuration properties '''
              return None
  
  
  	def initializeHadoopCluster(self):
  
              # Assume running when eucarc sourced 
  	    if self.utils.cloud_api_type == "ganeti":
                  eucacluster = GanetiCluster.GanetiCluster(self.properties_file)
                  self.my_logger.debug("Created GanetiCluster")
              
  #	    if self.utils.reconfigure:   
  #            	instances = eucacluster.describe_instances()
  #            	self.my_logger.debug("All user instances:" + str(instances))
              
              ## creates a new Hadoop cluster
              nosqlcluster=None
              
  	    if self.utils.cluster_type == "HADOOP":
                  nosqlcluster = HadoopCluster.HadoopCluster(self.properties_file, self.utils.cluster_name)
  
              instances = []
  	    if self.utils.reconfigure:
  		instances = self.utils.query_instance_db("running")
  	    else:
  		    self.my_logger.debug("Removing previous instance of cluster from db")
  		    self.utils.delete_cluster_from_db(self.utils.cluster_name)
  		    self.my_logger.debug("Launching new instances")
  		    instances = eucacluster.run_instances(self.utils.initial_cluster_size, self.utils.bucket_name, self.utils.disk, self.utils.cpu, self.utils.ram)
  		    self.my_logger.debug("Launched new instances: " + str(instances))
  		    instances = eucacluster.block_until_running(instances)
  	    self.my_logger.debug("Running instances: " + str(instances))
  
              self.my_logger.debug(nosqlcluster.configure_cluster(instances, self.utils.hostname_template))
  	    nosqlcluster.start_cluster()
              self.my_logger.debug("Cluster configured... sleep for 30secs")
              time.sleep(30)
              return eucacluster, nosqlcluster, instances[0].ip_address
  
  		
  if __name__ == "__main__":
  	if len(sys.argv) == 2:
          	daemon = MyDaemon('/tmp/hadoop_coordinator', 'Coordinator.properties')
                  if 'start' == sys.argv[1]:
                          daemon.start()
                  elif 'stop' == sys.argv[1]:
                          daemon.stop()
                  elif 'restart' == sys.argv[1]:
                          daemon.restart()
                  elif 'reload' == sys.argv[1]:
                          daemon.reload()
                  else:
                          print "Unknown command"
                          sys.exit(2)
                  sys.exit(0)
          else:
                  print "usage: %s start|stop|restart" % sys.argv[0]
                  sys.exit(2)