import paramiko,time
import Utils
from pysqlite2 import dbapi2 as sqlite
import pexpect, os, shutil, fileinput, sys, logging
class HadoopCluster(object):
'''
This class holds all nodes of the db in the virtual cluster. It can start/stop individual
daemons as needed, thus adding/removing nodes at will. It also sets up the configuration
files as needed.
'''
def __init__(self, properties_file, initial_cluster_id = "default"):
'''
Constructor
'''
## Necessary variables
self.cluster = {}
self.host_template = ""
self.cluster_id = initial_cluster_id
self.utils = Utils.Utils(properties_file)
# Make sure the sqlite file exists. if not, create it and add the table we need
con = sqlite.connect(self.utils.db_file)
cur = con.cursor()
try:
clusters = cur.execute('select * from clusters',
).fetchall()
if len(clusters) > 0 :
print """Already discovered cluster id from previous database file. Will select the defined one to work with (if it exists)."""
# print "Found records:\n", clusters
clustersfromcid = cur.execute('select * from clusters where cluster_id=\"'+self.cluster_id+"\"",
).fetchall()
if len(clustersfromcid) > 0 :
self.cluster = self.utils.get_cluster_from_db(self.cluster_id)
# print self.cluster
for clusterkey in self.cluster.keys():
if not (clusterkey.find("master") == -1):
self.host_template = clusterkey.replace("master","")
# Add self to db (eliminates existing records of same id)
self.utils.add_to_cluster_db(self.cluster, self.cluster_id)
else:
print "No known cluster with this id - run configure before you proceed"
except sqlite.DatabaseError:
cur.execute('create table clusters(cluster_id text, hostname text, ip_address text)')
con.commit()
cur.close()
con.close()
## Install logger
LOG_FILENAME = self.utils.install_dir+'/'+self.utils.logs_folder_name+'/Coordinator.log'
self.my_logger = logging.getLogger('HadoopCluster')
self.my_logger.setLevel(self.utils.logging_level)
handler = logging.handlers.RotatingFileHandler(
LOG_FILENAME, maxBytes=2*1024*1024, backupCount=5)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s")
handler.setFormatter(formatter)
self.my_logger.addHandler(handler)
# def describe_nodes(self)
def configure_cluster(self, nodes=None, host_template=""):
## Check installation and print errors for nodes that do not exist/
## can not connect/have incorrect installed versions and/or paths
#nodes = self.block_until_ping(nodes)
hosts = open('/tmp/hosts', 'w')
masters = open('/tmp/masters', 'w')
slaves = open('/tmp/slaves', 'w')
# copy necessary templates to /tmp to alter them
shutil.copy("./templates/hadoop121/core-site.xml", "/tmp/core-site.xml")
shutil.copy("./templates/hadoop121/mapred-site.xml", "/tmp/mapred-site.xml")
shutil.copy("./templates/hadoop121/hdfs-site.xml", "/tmp/hdfs-site.xml")
shutil.copy("./templates/hadoop121/hadoop-env.sh", "/tmp/hadoop-env.sh")
shutil.copy("./templates/hadoop121/hadoop-metrics2.properties","/tmp/hadoop-metrics2.properties")
core_site = '/tmp/core-site.xml'
mapred_site = '/tmp/mapred-site.xml'
hdfs_site = '/tmp/hdfs-site.xml'
hadoop_env = '/tmp/hadoop-env.sh'
hadoop_properties = "/tmp/hadoop-metrics2.properties"
i = 0
hosts.write("127.0.0.1\tlocalhost\n")
# Connect to client2 in order to update the /etc/hosts file so that new nodes will be sent requests
#ssh_client2= paramiko.SSHClient()
#ssh_client2.set_missing_host_key_policy(paramiko.AutoAddPolicy())
#self.my_logger.debug("Connecting to client2")
#ssh_client2.connect('2001:648:2000:3:16:3eff:fe01:269a', username='root', password='secretpw')
for node in nodes:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.my_logger.debug("Starting config for node: " + node.ip_address)
ssh.connect(node.ip_address, username='root', password='secretpw')
## Check for installation dirs, otherwise exit with error message
stderr_all = []
stdin, stdout, stderr = ssh.exec_command('ls /opt/hadoop-1.2.1/')
stderr_all.append(stderr.readlines())
stdin, stdout, stderr = ssh.exec_command('echo "root - nofile 200000" >> /etc/security/limits.conf')
stderr_all.append(stderr.readlines())
#stdin, stdout, stderr = ssh.exec_command('swapoff -a -v')
for stderr in stderr_all:
if len(stderr) > 0 :
self.my_logger.debug("ERROR - some installation files are missing")
return
for line in fileinput.FileInput(mapred_site,inplace=1):
line = line.replace("MAPTASKSPERTASKTRACKER",self.utils.map_tasks).strip()
line = line.replace("REDUCETASKSPERTASKTRACKER",self.utils.reduce_tasks).strip()
line = line.replace("TOTALMAPTASKS",self.utils.total_map_tasks).strip()
line = line.replace("TOTALREDUCETASK",self.utils.total_reduce_tasks).strip()
line = line.replace("MAPMEMORY",self.utils.map_memory).strip()
line = line.replace("REDUCEMEMORY",self.utils.reduce_memory).strip()
print line
stdin, stdout, stderr = ssh.exec_command('mkdir /root/.ssh')
for line in fileinput.FileInput(hdfs_site,inplace=1):
line = line.replace("DFSREPLICATION",self.utils.dfs_replication).strip()
print line
for line in fileinput.FileInput(hadoop_env,inplace=1):
line = line.replace("HADOOPHEAPSIZE",self.utils.hadoop_heap).strip()
print line
if i==0:
# Add the master to the /etc/hosts file
hosts.write(node.ip_address + "\t" + host_template+"master\n")
# Add the master to the masters file
masters.write(host_template+"master\n")
# Set hostname on the machine
stdin, stdout, stderr = ssh.exec_command('echo \"'+host_template+"master\" > /etc/hostname")
stdin, stdout, stderr = ssh.exec_command('hostname \"'+host_template+"master\"")
for line in fileinput.FileInput(core_site,inplace=1):
line = line.replace("NAMENODE_IP",host_template+"master").strip()
print line
for line in fileinput.FileInput(mapred_site,inplace=1):
line = line.replace("JOBTRACKER_IP",host_template+"master").strip()
print line
for line in fileinput.FileInput(hadoop_properties,inplace=1):
line = line.replace("GMETADHOST_IP",host_template+"master").strip()
print line
## create namenode/datanode dirs
stdin, stdout, stderr = ssh.exec_command('mkdir /opt/hdfsnames/')
# Add node to cluster
self.cluster[host_template+"master"] = node
# Add master to the slaves file
# slaves.write(host_template+"master"+"\n")
# Mount datasets dir
stdin, stdout, stderr = ssh.exec_command('mkdir /media/datasets')
stdin, stdout, stderr = ssh.exec_command("echo '192.168.1.4:/media/CloudImages/datasets /media/datasets nfs rw,sync,hard,intr 0 0' >> /etc/fstab")
stdin, stdout, stderr = ssh.exec_command('mount /media/datasets')
# add bin dir to PATH
stdin, stdout, stderr = ssh.exec_command("echo 'export PATH=/opt/hadoop-1.2.1/bin/:$PATH' >> /root/.bashrc")
stdin, stdout, stderr = ssh.exec_command("source /root/.bashrc")
# sending experiment execution files
stdin, stdout, stderr = ssh.exec_command('mkdir /root/btj')
transport = paramiko.Transport((node.ip_address, 22))
transport.connect(username = 'root', password = 'secretpw')
transport.open_channel("session", node.ip_address, "localhost")
sftp = paramiko.SFTPClient.from_transport(transport)
experiment_assets = [os.path.join(root, name) for root, dirs, files in os.walk("./templates/btj") for name in files]
for expasset in experiment_assets:
sftp.put(expasset,expasset.replace("./templates","/root"))
sftp.close()
#install terminal browser w3m
stdin, stdout, stderr = ssh.exec_command("apt-get install -y w3m w3m-img")
else:
# Make a /etc/hosts file as you go
hosts.write(node.ip_address + "\t" + host_template + str(i) +"\n")
# Add all to the slaves file
slaves.write(host_template+ str(i)+"\n")
# Set hostname on the machine
stdin, stdout, stderr = ssh.exec_command('echo \"'+host_template+str(i)+"\" > /etc/hostname")
stdin, stdout, stderr = ssh.exec_command('hostname \"'+host_template+str(i)+"\"")
## create namenode/datanode dirs
stdin, stdout, stderr = ssh.exec_command('mkdir /opt/hdfsdata/')
# Add node to cluster
self.cluster[host_template+ str(i)] = node
ssh.close()
time.sleep(1)
# Save all collected known keys
ssh.save_host_keys("/tmp/known_hosts_"+str(i))
# Increase i
i = i+1
# Copy /etc/hosts on all clients
# Decrase to have the last node in i
i = i-1
# Add the last node to the masters file (secondary namenode)
masters.write(host_template+ str(i)+"\n")
hosts.close()
masters.close()
slaves.close()
key_template_path="./templates/ssh_keys"
## Copy standard templates and name each node accordingly
for node in nodes:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(node.ip_address, username='root', password='secretpw')
## Enlarge the user limit on open file descriptors
## (workaround for HDFS-127:http://wiki.apache.org/hadoop/Hbase/Troubleshooting#A7)
stdin, stdout, stderr = ssh.exec_command('ulimit -HSn 32768')
## Sync clocks
stdin, stdout, stderr = ssh.exec_command('service ntp stop')
stdin, stdout, stderr = ssh.exec_command('ntpd -gq')
stdin, stdout, stderr = ssh.exec_command('service ntp start')
transport = paramiko.Transport((node.ip_address, 22))
transport.connect(username = 'root', password = 'secretpw')
transport.open_channel("session", node.ip_address, "localhost")
sftp = paramiko.SFTPClient.from_transport(transport)
# Copy private and public key
sftp.put( key_template_path+"/id_rsa","/root/.ssh/id_rsa")
sftp.put( key_template_path+"/id_rsa.pub", "/root/.ssh/id_rsa.pub")
sftp.put( key_template_path+"/config", "/root/.ssh/config")
## Change permissions for private key
stdin, stdout, stderr = ssh.exec_command('chmod 0600 /root/.ssh/id_rsa')
# Add public key to authorized_keys
stdin, stdout, stderr = ssh.exec_command('cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys')
with open('./templates/ssh_keys/authorized_keys') as f:
stdin, stdout, stderr = ssh.exec_command('echo \''+f.readline().strip()+'\' >> /root/.ssh/authorized_keys')
# print stdout.readlines()
# Copy files (/etc/hosts, masters, slaves and conf templates) removing empty lines
sftp.put( "/tmp/hosts", "/etc/hosts")
os.system("sed -i '/^$/d' /tmp/core-site.xml")
sftp.put( "/tmp/core-site.xml","/opt/hadoop-1.2.1/conf/core-site.xml")
os.system("sed -i '/^$/d' /tmp/mapred-site.xml")
sftp.put( "/tmp/mapred-site.xml","/opt/hadoop-1.2.1/conf/mapred-site.xml")
os.system("sed -i '/^$/d' /tmp/hdfs-site.xml")
sftp.put( "/tmp/hdfs-site.xml","/opt/hadoop-1.2.1/conf/hdfs-site.xml")
sftp.put( "/tmp/masters", "/opt/hadoop-1.2.1/conf/masters")
sftp.put( "/tmp/slaves", "/opt/hadoop-1.2.1/conf/slaves")
sftp.put( "/tmp/hadoop-env.sh", "/opt/hadoop-1.2.1/conf/hadoop-env.sh")
sftp.put( "/tmp/hadoop-metrics2.properties", "/opt/hadoop-1.2.1/conf/hadoop-metrics2.properties")
sftp.close()
ssh.close()
self.host_template = host_template
## Manipulate known hosts to make a good file
known_hosts_name = '/tmp/known_hosts'
known_hosts = open(known_hosts_name, 'w')
j = 0
while j <= i:
loop = open('/tmp/known_hosts_'+str(j), 'r')
for fileLine in loop.readlines():
known_hosts.write(fileLine.strip())
loop.close()
os.system("sed -i '/^$/d' /tmp/known_hosts_"+str(j))
j = j + 1
known_hosts.close()
for (clusterkey, clusternode) in self.cluster.items():
for line in fileinput.FileInput(known_hosts_name,inplace=1):
line = line.replace(clusternode.ip_address, clusterkey).strip()
print line
# print clusterkey, clusternode.public_dns_name
## Upload perfect file
for node in nodes:
transport = paramiko.Transport((node.ip_address, 22))
transport.connect(username = 'root', password = 'secretpw')
transport.open_channel("session", node.ip_address, "localhost")
sftp = paramiko.SFTPClient.from_transport(transport)
os.system("sed -i '/^$/d' /tmp/known_hosts")
sftp.put( "/tmp/known_hosts", "/root/.ssh/known_hosts")
sftp.close()
## Format namenode on the master
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self.cluster[host_template+"master"].ip_address, username='root', password='secretpw')
## format the namenode (all previous data will be lost!!!
stdin, stdout, stderr = ssh.exec_command('echo "Y" | /opt/hadoop-1.2.1/bin/hadoop namenode -format')
self.my_logger.debug("Namenode formatted:" + str(stderr.readlines()))
ssh.close()
## Save to database
self.utils.add_to_cluster_db(self.cluster, self.cluster_id)
con = sqlite.connect(self.utils.db_file)
cur = con.cursor()
## Now you should be ok, so return the nodes with hostnames
return self.cluster
# def block_until_ping(self, nodes):
# ''' Blocks until all defined instances have reached running state and an ip has been assigned'''
# ## Run describe instances until everyone is running
# tmpnodes = nodes
# nodes = []
# while len(tmpnodes) > 0 :
# print "Waiting for", len(tmpnodes), "nodes."
# response = os.system("ping -c 1 -W 4 " + tmpnodes[0].private_dns_name)
# if response == 0:
# nodes.append(tmpnodes.pop(0));
# else:
# time.sleep(1);
# return nodes
def start_cluster (self):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self.cluster[self.host_template+"master"].ip_address, username='root', password='secretpw')
stdin, stdout, stderr = ssh.exec_command('/opt/hadoop-1.2.1/bin/start-all.sh')
self.my_logger.debug(str(stdout.readlines()))
ssh.close()
def stop_cluster (self):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self.cluster[self.host_template+"master"].ip_address, username='root', password='secretpw')
stdin, stdout, stderr = ssh.exec_command('/opt/hadoop-1.2.1/bin/stop-all.sh')
self.my_logger.debug(str(stdout.readlines()))
ssh.close()
def switch_hadoop_balancer(self,state=False):
master_node = self.cluster[self.host_template+"master"];
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(master_node.ip_address, username='root', password='secretpw')
#stdin, stdout, stderr = ssh.exec_command('/opt/hadoop121/bin/start-balancer.sh')
if(state):
stdin, stdout, stderr = ssh.exec_command('/opt/hadoop-1.2.1/bin/hadoop balancer')
self.my_logger.debug(str(stdout.readlines()));
self.my_logger.debug(str(stderr.readlines()));
else:
ssh.exec_command('/opt/hadoop121/bin/stop-balancer.sh 2>&1 /dev/null &')
ssh.close()
except paramiko.SSHException:
self.my_logger.debug("Failed to invoke shell!")