Blame view

GstarCluster.py 11.7 KB
2382daaac   Thanasis Naskos   first commit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
  import paramiko,time
  import Utils
  from pysqlite2 import dbapi2 as sqlite
  import pexpect, os, shutil, fileinput, sys, logging
  
  class GstarCluster(object):
      '''
      This class holds all nodes of the db in the virtual cluster. It can start/stop individual 
      daemons as needed, thus adding/removing nodes at will. It also sets up the configuration 
      files as needed. 
      '''
  
  
      def __init__(self, properties_file, initial_cluster_id = "default"):
          '''
          Constructor
          '''
          ## Necessary variables
          self.cluster = {}
          self.host_template = ""
          self.cluster_id = initial_cluster_id
          self.utils = Utils.Utils(properties_file)
          
          # Make sure the sqlite file exists. if not, create it and add the table we need
          con = sqlite.connect(self.utils.db_file)
          cur = con.cursor()
          try:
              clusters = cur.execute('select * from clusters',
                              ).fetchall()
              if len(clusters) > 0 :
                  print """Already discovered cluster id from previous database file. Will select the defined one to work with (if it exists)."""
  #                print "Found records:
  ", clusters 
  
                  clustersfromcid = cur.execute('select * from clusters where cluster_id=\"'+self.cluster_id+"\"",
                              ).fetchall()
                  if len(clustersfromcid) > 0 :
                      self.cluster = self.utils.get_cluster_from_db(self.cluster_id)
      #                print self.cluster
                      for clusterkey in self.cluster.keys():
                          if not (clusterkey.find("master") == -1):
                              self.host_template = clusterkey.replace("master","")
                      # Add self to db (eliminates existing records of same id)
                      self.utils.add_to_cluster_db(self.cluster, self.cluster_id)
                  else:
                      print "No known cluster with this id - run configure before you proceed"
                       
          except sqlite.DatabaseError:
              cur.execute('create table clusters(cluster_id text, hostname text, ip_address text)')
              con.commit()
              
          cur.close()
          con.close()
          
          ## Install logger
          LOG_FILENAME = self.utils.install_dir+'/'+self.utils.logs_folder_name+'/Coordinator.log'
          self.my_logger = logging.getLogger('GstarCluster')
          self.my_logger.setLevel(self.utils.logging_level)
          
          handler = logging.handlers.RotatingFileHandler(
                        LOG_FILENAME, maxBytes=2*1024*1024, backupCount=5)
          formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s")
          handler.setFormatter(formatter)
          self.my_logger.addHandler(handler)
          
          
  #     def describe_nodes(self)
  
      def configure_cluster(self, nodes=None, host_template=""):
          
          ## Check installation and print errors for nodes that do not exist/
          ## can not connect/have incorrect installed versions and/or paths 
  	#nodes = self.block_until_ping(nodes)
  
          hosts = open('/tmp/hosts', 'w')
          i = 0
          hosts.write("127.0.0.1\tlocalhost
  ")
  
  
          # Connect to client2 in order to update the /etc/hosts file so that new nodes will be sent requests
          #ssh_client2= paramiko.SSHClient()
          #ssh_client2.set_missing_host_key_policy(paramiko.AutoAddPolicy())
          #self.my_logger.debug("Connecting to client2") 
          #ssh_client2.connect('2001:648:2000:3:16:3eff:fe01:269a', username='root', password='secretpw')
          for node in nodes:
              ssh = paramiko.SSHClient()
              ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
              self.my_logger.debug("Starting config for node: " + node.ip_address) 
              ssh.connect(node.ip_address, username='root', password='secretpw')
              
              ## Check for installation dirs, otherwise exit with error message
              stderr_all = []
              stdin, stdout, stderr = ssh.exec_command('echo "root    -       nofile  200000" >> /etc/security/limits.conf')
              stderr_all.append(stderr.readlines())
              #stdin, stdout, stderr = ssh.exec_command('swapoff -a -v')
              for stderr in stderr_all:
                  if len(stderr) > 0 :
                      self.my_logger.debug("ERROR - some installation files are missing")
                      return
  
  	    stdin, stdout, stderr = ssh.exec_command('mkdir /root/.ssh')
  
              if i==0:
                  # Add the master to the /etc/hosts file
                  hosts.write(node.ip_address + "\t" + host_template+"master
  ")
                  # Set hostname on the machine
                  stdin, stdout, stderr = ssh.exec_command('echo \"'+host_template+"master\" > /etc/hostname")
                  stdin, stdout, stderr = ssh.exec_command('hostname \"'+host_template+"master\"")
                  ssh.exec_command('echo "JAVA_OPTS=\"-Xms'+str(int(int(self.utils.m_ram)*0.9))+'m -Xmx'+str(int(int(self.utils.m_ram)*0.9))+'m\"" >> /root/.bashrc')
  		ssh.exec_command('source /root/.bashrc')                
  		# Add node to cluster
                  self.cluster[host_template+"master"] = node
  
  
  		# Mount datasets dir
                  stdin, stdout, stderr = ssh.exec_command('mkdir /media/datasets')
  		stdin, stdout, stderr = ssh.exec_command("echo '192.168.1.4:/media/CloudImages/datasets  /media/datasets   nfs      rw,sync,hard,intr  0     0' >> /etc/fstab")
  		stdin, stdout, stderr = ssh.exec_command('mount /media/datasets')
  
  		# sending experiment execution files
  		stdin, stdout, stderr = ssh.exec_command('mkdir /root/gstar')
  		transport = paramiko.Transport((node.ip_address, 22))
              	transport.connect(username = 'root', password = 'secretpw')
  	    	transport.open_channel("session", node.ip_address, "localhost")
  	    	sftp = paramiko.SFTPClient.from_transport(transport)
  		experiment_assets = [os.path.join(root, name) for root, dirs, files in os.walk("./templates/gstar") for name in files]
  		for expasset in experiment_assets:
  	    		sftp.put(expasset,expasset.replace("./templates","/root"))
  		sftp.close()
  
  		#install terminal browser w3m
  		stdin, stdout, stderr = ssh.exec_command("apt-get install -y w3m w3m-img htop")
  
              else:
                  # Make a /etc/hosts file as you go
                  hosts.write(node.ip_address + "\t" + host_template + str(i) +"
  ")
                  
                  
                  # Set hostname on the machine
                  stdin, stdout, stderr = ssh.exec_command('echo \"'+host_template+str(i)+"\" > /etc/hostname")
                  stdin, stdout, stderr = ssh.exec_command('hostname \"'+host_template+str(i)+"\"")
                  ssh.exec_command('echo "JAVA_OPTS=\"-Xms'+str(int(int(self.utils.ram)*0.9))+'m -Xmx'+str(int(int(self.utils.ram)*0.9))+'m\"" >> /root/.bashrc')
  		ssh.exec_command('source /root/.bashrc')
                  # Add node to cluster
                  self.cluster[host_template+ str(i)] = node
                  
                  
              ssh.close()
              time.sleep(1)
  
              
              # Save all collected known keys
              ssh.save_host_keys("/tmp/known_hosts_"+str(i))
              
              # Increase i
              i = i+1
          
          # Copy /etc/hosts on all clients
          
          # Decrase to have the last node in i
          i = i-1
  
              
          hosts.close()
          
          key_template_path="./templates/ssh_keys"
  
          
          ## Copy standard templates and name each node accordingly
          for node in nodes:
              ssh = paramiko.SSHClient()
              ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
              ssh.connect(node.ip_address, username='root', password='secretpw')
              
              ## Enlarge the user limit on open file descriptors 
              ## (workaround for HDFS-127:http://wiki.apache.org/hadoop/Hbase/Troubleshooting#A7) 
              stdin, stdout, stderr = ssh.exec_command('ulimit -HSn 32768')
              
              ## Sync clocks
              stdin, stdout, stderr = ssh.exec_command('service ntp stop')
  	    stdin, stdout, stderr = ssh.exec_command('ntpd -gq')
  	    stdin, stdout, stderr = ssh.exec_command('service ntp start')
              
              transport = paramiko.Transport((node.ip_address, 22))
              transport.connect(username = 'root', password = 'secretpw')
              transport.open_channel("session", node.ip_address, "localhost")
              sftp = paramiko.SFTPClient.from_transport(transport)
  #           Copy private and public key
              sftp.put( key_template_path+"/id_rsa","/root/.ssh/id_rsa")
              sftp.put( key_template_path+"/id_rsa.pub", "/root/.ssh/id_rsa.pub")
              sftp.put( key_template_path+"/config", "/root/.ssh/config")
              
              ## Change permissions for private key
              stdin, stdout, stderr = ssh.exec_command('chmod 0600 /root/.ssh/id_rsa')
              
              # Add public key to authorized_keys
              stdin, stdout, stderr = ssh.exec_command('cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys')
  	    with open('./templates/ssh_keys/authorized_keys') as f:
  		stdin, stdout, stderr = ssh.exec_command('echo \''+f.readline().strip()+'\' >> /root/.ssh/authorized_keys')
  
  #            print stdout.readlines()
              
              # Copy files (/etc/hosts, masters, slaves and conf templates) removing empty lines
              sftp.put( "/tmp/hosts", "/etc/hosts")
              sftp.close()
              
              ssh.close()
              
          self.host_template = host_template
          
          ## Manipulate known hosts to make a good file
          known_hosts_name = '/tmp/known_hosts'
          known_hosts = open(known_hosts_name, 'w')
          j = 0
          while j <= i:
              loop = open('/tmp/known_hosts_'+str(j), 'r')
              for fileLine in loop.readlines():
                  known_hosts.write(fileLine.strip())
              loop.close()
              os.system("sed -i '/^$/d' /tmp/known_hosts_"+str(j))
              j = j + 1 
          known_hosts.close()
              
          for (clusterkey, clusternode) in self.cluster.items():
              for line in fileinput.FileInput(known_hosts_name,inplace=1):
                  line = line.replace(clusternode.ip_address, clusterkey).strip()
                  print line
  #            print clusterkey, clusternode.public_dns_name
          
          
          ## Upload perfect file
          for node in nodes:
              transport = paramiko.Transport((node.ip_address, 22))
              transport.connect(username = 'root', password = 'secretpw')
              transport.open_channel("session", node.ip_address, "localhost")
              sftp = paramiko.SFTPClient.from_transport(transport)
              os.system("sed -i '/^$/d' /tmp/known_hosts")
              sftp.put( "/tmp/known_hosts", "/root/.ssh/known_hosts")
              sftp.close()
          
          ## Save to database
          self.utils.add_to_cluster_db(self.cluster, self.cluster_id)
  	con = sqlite.connect(self.utils.db_file)
          cur = con.cursor()
          ## Now you should be ok, so return the nodes with hostnames
          return self.cluster
                  
              
  #    def block_until_ping(self, nodes):
  #        ''' Blocks until all defined instances have reached running state and an ip has been assigned'''
  #        ## Run describe instances until everyone is running
  #        tmpnodes = nodes
  #        nodes = []
  #        while len(tmpnodes) > 0 :
  #	    print "Waiting for", len(tmpnodes), "nodes."
  #            response = os.system("ping -c 1 -W 4 " + tmpnodes[0].private_dns_name)
  #	    if response == 0:
  #		nodes.append(tmpnodes.pop(0));
  #	    else:
  #		time.sleep(1);
  #        return nodes
  
      def start_cluster (self):
          print "nothing to start"
              
      def stop_cluster (self):
          print "nothing to stop"