Blame view

HadoopCluster.py 17.4 KB
f795df3ae   Thanasis Naskos   initial commit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
  import paramiko,time
  import Utils
  from pysqlite2 import dbapi2 as sqlite
  import pexpect, os, shutil, fileinput, sys, logging
  
  class HadoopCluster(object):
      '''
      This class holds all nodes of the db in the virtual cluster. It can start/stop individual 
      daemons as needed, thus adding/removing nodes at will. It also sets up the configuration 
      files as needed. 
      '''
  
  
      def __init__(self, properties_file, initial_cluster_id = "default"):
          '''
          Constructor
          '''
          ## Necessary variables
          self.cluster = {}
          self.host_template = ""
          self.cluster_id = initial_cluster_id
          self.utils = Utils.Utils(properties_file)
          
          # Make sure the sqlite file exists. if not, create it and add the table we need
          con = sqlite.connect(self.utils.db_file)
          cur = con.cursor()
          try:
              clusters = cur.execute('select * from clusters',
                              ).fetchall()
              if len(clusters) > 0 :
                  print """Already discovered cluster id from previous database file. Will select the defined one to work with (if it exists)."""
  #                print "Found records:
  ", clusters 
  
                  clustersfromcid = cur.execute('select * from clusters where cluster_id=\"'+self.cluster_id+"\"",
                              ).fetchall()
                  if len(clustersfromcid) > 0 :
                      self.cluster = self.utils.get_cluster_from_db(self.cluster_id)
      #                print self.cluster
                      for clusterkey in self.cluster.keys():
                          if not (clusterkey.find("master") == -1):
                              self.host_template = clusterkey.replace("master","")
                      # Add self to db (eliminates existing records of same id)
                      self.utils.add_to_cluster_db(self.cluster, self.cluster_id)
                  else:
                      print "No known cluster with this id - run configure before you proceed"
                       
          except sqlite.DatabaseError:
              cur.execute('create table clusters(cluster_id text, hostname text, ip_address text)')
              con.commit()
              
          cur.close()
          con.close()
          
          ## Install logger
          LOG_FILENAME = self.utils.install_dir+'/'+self.utils.logs_folder_name+'/Coordinator.log'
          self.my_logger = logging.getLogger('HadoopCluster')
          self.my_logger.setLevel(self.utils.logging_level)
          
          handler = logging.handlers.RotatingFileHandler(
                        LOG_FILENAME, maxBytes=2*1024*1024, backupCount=5)
          formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s")
          handler.setFormatter(formatter)
          self.my_logger.addHandler(handler)
          
          
  #     def describe_nodes(self)
  
      def configure_cluster(self, nodes=None, host_template=""):
          
          ## Check installation and print errors for nodes that do not exist/
          ## can not connect/have incorrect installed versions and/or paths 
  	#nodes = self.block_until_ping(nodes)
  
          hosts = open('/tmp/hosts', 'w')
          masters = open('/tmp/masters', 'w')
          slaves = open('/tmp/slaves', 'w')
          
          # copy necessary templates to /tmp to alter them
          shutil.copy("./templates/hadoop121/core-site.xml", "/tmp/core-site.xml")
          shutil.copy("./templates/hadoop121/mapred-site.xml", "/tmp/mapred-site.xml")
          shutil.copy("./templates/hadoop121/hdfs-site.xml", "/tmp/hdfs-site.xml")
  	shutil.copy("./templates/hadoop121/hadoop-env.sh", "/tmp/hadoop-env.sh")
          shutil.copy("./templates/hadoop121/hadoop-metrics2.properties","/tmp/hadoop-metrics2.properties")
          
          core_site = '/tmp/core-site.xml'
          mapred_site = '/tmp/mapred-site.xml'
  	hdfs_site = '/tmp/hdfs-site.xml'
  	hadoop_env = '/tmp/hadoop-env.sh'
          hadoop_properties = "/tmp/hadoop-metrics2.properties"
          
          i = 0
          hosts.write("127.0.0.1\tlocalhost
  ")
  
  
          # Connect to client2 in order to update the /etc/hosts file so that new nodes will be sent requests
          #ssh_client2= paramiko.SSHClient()
          #ssh_client2.set_missing_host_key_policy(paramiko.AutoAddPolicy())
          #self.my_logger.debug("Connecting to client2") 
          #ssh_client2.connect('2001:648:2000:3:16:3eff:fe01:269a', username='root', password='secretpw')
          for node in nodes:
              ssh = paramiko.SSHClient()
              ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
              self.my_logger.debug("Starting config for node: " + node.ip_address) 
              ssh.connect(node.ip_address, username='root', password='secretpw')
              
              ## Check for installation dirs, otherwise exit with error message
              stderr_all = []
              stdin, stdout, stderr = ssh.exec_command('ls /opt/hadoop-1.2.1/')
              stderr_all.append(stderr.readlines())
              stdin, stdout, stderr = ssh.exec_command('echo "root    -       nofile  200000" >> /etc/security/limits.conf')
              stderr_all.append(stderr.readlines())
              #stdin, stdout, stderr = ssh.exec_command('swapoff -a -v')
              for stderr in stderr_all:
                  if len(stderr) > 0 :
                      self.my_logger.debug("ERROR - some installation files are missing")
                      return
              
  	    for line in fileinput.FileInput(mapred_site,inplace=1):
                      line = line.replace("MAPTASKSPERTASKTRACKER",self.utils.map_tasks).strip()
  		    line = line.replace("REDUCETASKSPERTASKTRACKER",self.utils.reduce_tasks).strip()
  		    line = line.replace("TOTALMAPTASKS",self.utils.total_map_tasks).strip()
  		    line = line.replace("TOTALREDUCETASK",self.utils.total_reduce_tasks).strip()
  		    line = line.replace("MAPMEMORY",self.utils.map_memory).strip()
  		    line = line.replace("REDUCEMEMORY",self.utils.reduce_memory).strip()
  		    print line
  
  	    stdin, stdout, stderr = ssh.exec_command('mkdir /root/.ssh')
  
  	    for line in fileinput.FileInput(hdfs_site,inplace=1):
                      line = line.replace("DFSREPLICATION",self.utils.dfs_replication).strip()
                      print line
  
  	    for line in fileinput.FileInput(hadoop_env,inplace=1):
                      line = line.replace("HADOOPHEAPSIZE",self.utils.hadoop_heap).strip()
                      print line
  
              if i==0:
                  # Add the master to the /etc/hosts file
                  hosts.write(node.ip_address + "\t" + host_template+"master
  ")
                  # Add the master to the masters file
                  masters.write(host_template+"master
  ")
                  # Set hostname on the machine
                  stdin, stdout, stderr = ssh.exec_command('echo \"'+host_template+"master\" > /etc/hostname")
                  stdin, stdout, stderr = ssh.exec_command('hostname \"'+host_template+"master\"")
                  
                  for line in fileinput.FileInput(core_site,inplace=1):
                      line = line.replace("NAMENODE_IP",host_template+"master").strip()
                      print line
                  for line in fileinput.FileInput(mapred_site,inplace=1):
                      line = line.replace("JOBTRACKER_IP",host_template+"master").strip()
                      print line
                  for line in fileinput.FileInput(hadoop_properties,inplace=1):
                      line = line.replace("GMETADHOST_IP",host_template+"master").strip()
                      print line
                  
                  ## create namenode/datanode dirs
                  stdin, stdout, stderr = ssh.exec_command('mkdir /opt/hdfsnames/')
                  
                  # Add node to cluster
                  self.cluster[host_template+"master"] = node
  
  		# Add master to the slaves file
  #                slaves.write(host_template+"master"+"
  ")
  
  		# Mount datasets dir
                  stdin, stdout, stderr = ssh.exec_command('mkdir /media/datasets')
  		stdin, stdout, stderr = ssh.exec_command("echo '192.168.1.4:/media/CloudImages/datasets  /media/datasets   nfs      rw,sync,hard,intr  0     0' >> /etc/fstab")
  		stdin, stdout, stderr = ssh.exec_command('mount /media/datasets')
  
  		# add bin dir to PATH
  		stdin, stdout, stderr = ssh.exec_command("echo 'export PATH=/opt/hadoop-1.2.1/bin/:$PATH' >> /root/.bashrc")
  		stdin, stdout, stderr = ssh.exec_command("source /root/.bashrc")
  
  		# sending experiment execution files
  		stdin, stdout, stderr = ssh.exec_command('mkdir /root/btj')
  		transport = paramiko.Transport((node.ip_address, 22))
              	transport.connect(username = 'root', password = 'secretpw')
  	    	transport.open_channel("session", node.ip_address, "localhost")
  	    	sftp = paramiko.SFTPClient.from_transport(transport)
  		experiment_assets = [os.path.join(root, name) for root, dirs, files in os.walk("./templates/btj") for name in files]
  		for expasset in experiment_assets:
  	    		sftp.put(expasset,expasset.replace("./templates","/root"))
  		sftp.close()
  
  		#install terminal browser w3m
  		stdin, stdout, stderr = ssh.exec_command("apt-get install -y w3m w3m-img")
  
              else:
                  # Make a /etc/hosts file as you go
                  hosts.write(node.ip_address + "\t" + host_template + str(i) +"
  ")
                  
                  # Add all to the slaves file
                  slaves.write(host_template+ str(i)+"
  ")
                  
                  # Set hostname on the machine
                  stdin, stdout, stderr = ssh.exec_command('echo \"'+host_template+str(i)+"\" > /etc/hostname")
                  stdin, stdout, stderr = ssh.exec_command('hostname \"'+host_template+str(i)+"\"")
                  
                  ## create namenode/datanode dirs
                  stdin, stdout, stderr = ssh.exec_command('mkdir /opt/hdfsdata/')
                  
                  # Add node to cluster
                  self.cluster[host_template+ str(i)] = node
                  
                  
              ssh.close()
              time.sleep(1)
  
              
              # Save all collected known keys
              ssh.save_host_keys("/tmp/known_hosts_"+str(i))
              
              # Increase i
              i = i+1
          
          # Copy /etc/hosts on all clients
          
          # Decrase to have the last node in i
          i = i-1
          
          # Add the last node to the masters file (secondary namenode)
          masters.write(host_template+ str(i)+"
  ")
          
              
          hosts.close()
          masters.close()
          slaves.close()
          
          key_template_path="./templates/ssh_keys"
  
          
          ## Copy standard templates and name each node accordingly
          for node in nodes:
              ssh = paramiko.SSHClient()
              ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
              ssh.connect(node.ip_address, username='root', password='secretpw')
              
              ## Enlarge the user limit on open file descriptors 
              ## (workaround for HDFS-127:http://wiki.apache.org/hadoop/Hbase/Troubleshooting#A7) 
              stdin, stdout, stderr = ssh.exec_command('ulimit -HSn 32768')
              
              ## Sync clocks
              stdin, stdout, stderr = ssh.exec_command('service ntp stop')
  	    stdin, stdout, stderr = ssh.exec_command('ntpd -gq')
  	    stdin, stdout, stderr = ssh.exec_command('service ntp start')
              
              transport = paramiko.Transport((node.ip_address, 22))
              transport.connect(username = 'root', password = 'secretpw')
              transport.open_channel("session", node.ip_address, "localhost")
              sftp = paramiko.SFTPClient.from_transport(transport)
  #           Copy private and public key
              sftp.put( key_template_path+"/id_rsa","/root/.ssh/id_rsa")
              sftp.put( key_template_path+"/id_rsa.pub", "/root/.ssh/id_rsa.pub")
              sftp.put( key_template_path+"/config", "/root/.ssh/config")
              
              ## Change permissions for private key
              stdin, stdout, stderr = ssh.exec_command('chmod 0600 /root/.ssh/id_rsa')
              
              # Add public key to authorized_keys
              stdin, stdout, stderr = ssh.exec_command('cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys')
  	    with open('./templates/ssh_keys/authorized_keys') as f:
  		stdin, stdout, stderr = ssh.exec_command('echo \''+f.readline().strip()+'\' >> /root/.ssh/authorized_keys')
  
  #            print stdout.readlines()
              
              # Copy files (/etc/hosts, masters, slaves and conf templates) removing empty lines
              sftp.put( "/tmp/hosts", "/etc/hosts")
              os.system("sed -i '/^$/d' /tmp/core-site.xml")
              sftp.put( "/tmp/core-site.xml","/opt/hadoop-1.2.1/conf/core-site.xml")
              os.system("sed -i '/^$/d' /tmp/mapred-site.xml")
              sftp.put( "/tmp/mapred-site.xml","/opt/hadoop-1.2.1/conf/mapred-site.xml")
              os.system("sed -i '/^$/d' /tmp/hdfs-site.xml")
              sftp.put( "/tmp/hdfs-site.xml","/opt/hadoop-1.2.1/conf/hdfs-site.xml")
              sftp.put( "/tmp/masters", "/opt/hadoop-1.2.1/conf/masters")
              sftp.put( "/tmp/slaves", "/opt/hadoop-1.2.1/conf/slaves")
              sftp.put( "/tmp/hadoop-env.sh", "/opt/hadoop-1.2.1/conf/hadoop-env.sh")
              sftp.put( "/tmp/hadoop-metrics2.properties", "/opt/hadoop-1.2.1/conf/hadoop-metrics2.properties")
              sftp.close()
              
              ssh.close()
              
          self.host_template = host_template
          
          ## Manipulate known hosts to make a good file
          known_hosts_name = '/tmp/known_hosts'
          known_hosts = open(known_hosts_name, 'w')
          j = 0
          while j <= i:
              loop = open('/tmp/known_hosts_'+str(j), 'r')
              for fileLine in loop.readlines():
                  known_hosts.write(fileLine.strip())
              loop.close()
              os.system("sed -i '/^$/d' /tmp/known_hosts_"+str(j))
              j = j + 1 
          known_hosts.close()
              
          for (clusterkey, clusternode) in self.cluster.items():
              for line in fileinput.FileInput(known_hosts_name,inplace=1):
                  line = line.replace(clusternode.ip_address, clusterkey).strip()
                  print line
  #            print clusterkey, clusternode.public_dns_name
          
          
          ## Upload perfect file
          for node in nodes:
              transport = paramiko.Transport((node.ip_address, 22))
              transport.connect(username = 'root', password = 'secretpw')
              transport.open_channel("session", node.ip_address, "localhost")
              sftp = paramiko.SFTPClient.from_transport(transport)
              os.system("sed -i '/^$/d' /tmp/known_hosts")
              sftp.put( "/tmp/known_hosts", "/root/.ssh/known_hosts")
              sftp.close()
          
          ## Format namenode on the master
          ssh = paramiko.SSHClient()
          ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
          ssh.connect(self.cluster[host_template+"master"].ip_address, username='root', password='secretpw')
          ## format the namenode (all previous data will be lost!!!
          stdin, stdout, stderr = ssh.exec_command('echo "Y" | /opt/hadoop-1.2.1/bin/hadoop namenode -format')
          self.my_logger.debug("Namenode formatted:" + str(stderr.readlines()))
          ssh.close()
          
          ## Save to database
          self.utils.add_to_cluster_db(self.cluster, self.cluster_id)
  	con = sqlite.connect(self.utils.db_file)
          cur = con.cursor()
          ## Now you should be ok, so return the nodes with hostnames
          return self.cluster
                  
              
  #    def block_until_ping(self, nodes):
  #        ''' Blocks until all defined instances have reached running state and an ip has been assigned'''
  #        ## Run describe instances until everyone is running
  #        tmpnodes = nodes
  #        nodes = []
  #        while len(tmpnodes) > 0 :
  #	    print "Waiting for", len(tmpnodes), "nodes."
  #            response = os.system("ping -c 1 -W 4 " + tmpnodes[0].private_dns_name)
  #	    if response == 0:
  #		nodes.append(tmpnodes.pop(0));
  #	    else:
  #		time.sleep(1);
  #        return nodes
  
      def start_cluster (self):
          ssh = paramiko.SSHClient()
          ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
          ssh.connect(self.cluster[self.host_template+"master"].ip_address, username='root', password='secretpw')
          stdin, stdout, stderr = ssh.exec_command('/opt/hadoop-1.2.1/bin/start-all.sh')
          self.my_logger.debug(str(stdout.readlines()))
          ssh.close()
              
      def stop_cluster (self):
          ssh = paramiko.SSHClient()
          ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
          ssh.connect(self.cluster[self.host_template+"master"].ip_address, username='root', password='secretpw')
          stdin, stdout, stderr = ssh.exec_command('/opt/hadoop-1.2.1/bin/stop-all.sh')
          self.my_logger.debug(str(stdout.readlines()))
          ssh.close()
  
  
  
      def switch_hadoop_balancer(self,state=False):
  	master_node = self.cluster[self.host_template+"master"];
  	try:
              ssh = paramiko.SSHClient()
              ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
              ssh.connect(master_node.ip_address, username='root', password='secretpw')
  	    #stdin, stdout, stderr = ssh.exec_command('/opt/hadoop121/bin/start-balancer.sh')
  	    if(state):
  		    stdin, stdout, stderr = ssh.exec_command('/opt/hadoop-1.2.1/bin/hadoop balancer')
  		    self.my_logger.debug(str(stdout.readlines()));
  		    self.my_logger.debug(str(stderr.readlines()));
              else:
  		    ssh.exec_command('/opt/hadoop121/bin/stop-balancer.sh 2>&1 /dev/null &')
              ssh.close()
          except paramiko.SSHException:
              self.my_logger.debug("Failed to invoke shell!")