import paramiko,time
import Utils
from pysqlite2 import dbapi2 as sqlite
import pexpect, os, shutil, fileinput, sys, logging
class GstarCluster(object):
'''
This class holds all nodes of the db in the virtual cluster. It can start/stop individual
daemons as needed, thus adding/removing nodes at will. It also sets up the configuration
files as needed.
'''
def __init__(self, properties_file, initial_cluster_id = "default"):
'''
Constructor
'''
## Necessary variables
self.cluster = {}
self.host_template = ""
self.cluster_id = initial_cluster_id
self.utils = Utils.Utils(properties_file)
# Make sure the sqlite file exists. if not, create it and add the table we need
con = sqlite.connect(self.utils.db_file)
cur = con.cursor()
try:
clusters = cur.execute('select * from clusters',
).fetchall()
if len(clusters) > 0 :
print """Already discovered cluster id from previous database file. Will select the defined one to work with (if it exists)."""
# print "Found records:\n", clusters
clustersfromcid = cur.execute('select * from clusters where cluster_id=\"'+self.cluster_id+"\"",
).fetchall()
if len(clustersfromcid) > 0 :
self.cluster = self.utils.get_cluster_from_db(self.cluster_id)
# print self.cluster
for clusterkey in self.cluster.keys():
if not (clusterkey.find("master") == -1):
self.host_template = clusterkey.replace("master","")
# Add self to db (eliminates existing records of same id)
self.utils.add_to_cluster_db(self.cluster, self.cluster_id)
else:
print "No known cluster with this id - run configure before you proceed"
except sqlite.DatabaseError:
cur.execute('create table clusters(cluster_id text, hostname text, ip_address text)')
con.commit()
cur.close()
con.close()
## Install logger
LOG_FILENAME = self.utils.install_dir+'/'+self.utils.logs_folder_name+'/Coordinator.log'
self.my_logger = logging.getLogger('GstarCluster')
self.my_logger.setLevel(self.utils.logging_level)
handler = logging.handlers.RotatingFileHandler(
LOG_FILENAME, maxBytes=2*1024*1024, backupCount=5)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s")
handler.setFormatter(formatter)
self.my_logger.addHandler(handler)
# def describe_nodes(self)
def configure_cluster(self, nodes=None, host_template=""):
## Check installation and print errors for nodes that do not exist/
## can not connect/have incorrect installed versions and/or paths
#nodes = self.block_until_ping(nodes)
hosts = open('/tmp/hosts', 'w')
i = 0
hosts.write("127.0.0.1\tlocalhost\n")
# Connect to client2 in order to update the /etc/hosts file so that new nodes will be sent requests
#ssh_client2= paramiko.SSHClient()
#ssh_client2.set_missing_host_key_policy(paramiko.AutoAddPolicy())
#self.my_logger.debug("Connecting to client2")
#ssh_client2.connect('2001:648:2000:3:16:3eff:fe01:269a', username='root', password='secretpw')
for node in nodes:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.my_logger.debug("Starting config for node: " + node.ip_address)
ssh.connect(node.ip_address, username='root', password='secretpw')
## Check for installation dirs, otherwise exit with error message
stderr_all = []
stdin, stdout, stderr = ssh.exec_command('echo "root - nofile 200000" >> /etc/security/limits.conf')
stderr_all.append(stderr.readlines())
#stdin, stdout, stderr = ssh.exec_command('swapoff -a -v')
for stderr in stderr_all:
if len(stderr) > 0 :
self.my_logger.debug("ERROR - some installation files are missing")
return
stdin, stdout, stderr = ssh.exec_command('mkdir /root/.ssh')
if i==0:
#install terminal browser w3m
stdin, stdout, stderr = ssh.exec_command("apt-get update; apt-get install -y w3m w3m-img htop python-paramiko sshfs")
# Add the master to the /etc/hosts file
hosts.write(node.ip_address + "\t" + host_template+"master\n")
# Set hostname on the machine
stdin, stdout, stderr = ssh.exec_command('echo \"'+host_template+"master\" > /etc/hostname")
stdin, stdout, stderr = ssh.exec_command('hostname \"'+host_template+"master\"")
# Add node to cluster
self.cluster[host_template+"master"] = node
# Mount datasets dir
stdin, stdout, stderr = ssh.exec_command('mkdir /media/datasets')
stdin, stdout, stderr = ssh.exec_command("/usr/bin/sshfs -o idmap=user root@192.168.1.1:/media/CloudImages/datasets /media/datasets")
# sending experiment execution files
stdin, stdout, stderr = ssh.exec_command('mkdir /root/gstar_master')
transport = paramiko.Transport((node.ip_address, 22))
transport.connect(username = 'root', password = 'secretpw')
transport.open_channel("session", node.ip_address, "localhost")
sftp = paramiko.SFTPClient.from_transport(transport)
experiment_assets = [os.path.join(root, name) for root, dirs, files in os.walk("./templates/gstar_master") for name in files]
for expasset in experiment_assets:
sftp.put(expasset,expasset.replace("./templates","/root"))
shutil.copy("./templates/bashrc", "/tmp/bashrc")
bashrc = "/tmp/bashrc"
for line in fileinput.FileInput(bashrc,inplace=1):
line = line.replace("MEM",str(int(int(self.utils.m_ram)*0.9))).strip()
print line
sftp.put( "/tmp/bashrc","/root/.bashrc")
sftp.put( "./templates/bash_history","/root/.bash_history")
stdin, stdout, stderr = ssh.exec_command('source /root/.bashrc')
sftp.close()
else:
# Make a /etc/hosts file as you go
hosts.write(node.ip_address + "\t" + host_template + str(i) +"\n")
# sending experiment execution files
# Mount datasets dir
stdin, stdout, stderr = ssh.exec_command('apt-get update; apt-get install -y htop python-paramiko sshfs')
stdin, stdout, stderr = ssh.exec_command('mkdir /media/datasets')
stdin, stdout, stderr = ssh.exec_command("/usr/bin/sshfs -o idmap=user root@192.168.1.1:/media/CloudImages/datasets /media/datasets")
stdin, stdout, stderr = ssh.exec_command('mkdir /root/gstar')
transport = paramiko.Transport((node.ip_address, 22))
transport.connect(username = 'root', password = 'secretpw')
transport.open_channel("session", node.ip_address, "localhost")
sftp = paramiko.SFTPClient.from_transport(transport)
experiment_assets = [os.path.join(root, name) for root, dirs, files in os.walk("./templates/gstar") for name in files]
for expasset in experiment_assets:
sftp.put(expasset,expasset.replace("./templates","/root"))
# Set hostname on the machine
stdin, stdout, stderr = ssh.exec_command('echo \"'+host_template+str(i)+"\" > /etc/hostname")
stdin, stdout, stderr = ssh.exec_command('hostname \"'+host_template+str(i)+"\"")
shutil.copy("./templates/bashrc", "/tmp/bashrc")
bashrc = "/tmp/bashrc"
for line in fileinput.FileInput(bashrc,inplace=1):
line = line.replace("MEM",str(int(int(self.utils.ram)*0.9))).strip()
print line
sftp.put( "/tmp/bashrc","/root/.bashrc")
stdin, stdout, stderr = ssh.exec_command('source /root/.bashrc')
sftp.close()
# Add node to cluster
self.cluster[host_template+ str(i)] = node
ssh.close()
time.sleep(15)
# Save all collected known keys
ssh.save_host_keys("/tmp/known_hosts_"+str(i))
# Increase i
i = i+1
# Copy /etc/hosts on all clients
# Decrase to have the last node in i
i = i-1
hosts.close()
key_template_path="./templates/ssh_keys"
## Copy standard templates and name each node accordingly
for node in nodes:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(node.ip_address, username='root', password='secretpw')
## Enlarge the user limit on open file descriptors
## (workaround for HDFS-127:http://wiki.apache.org/hadoop/Hbase/Troubleshooting#A7)
stdin, stdout, stderr = ssh.exec_command('ulimit -HSn 32768')
## Sync clocks
stdin, stdout, stderr = ssh.exec_command('service ntp stop')
stdin, stdout, stderr = ssh.exec_command('ntpd -gq')
stdin, stdout, stderr = ssh.exec_command('service ntp start')
transport = paramiko.Transport((node.ip_address, 22))
transport.connect(username = 'root', password = 'secretpw')
transport.open_channel("session", node.ip_address, "localhost")
sftp = paramiko.SFTPClient.from_transport(transport)
# Copy private and public key
sftp.put( key_template_path+"/id_rsa","/root/.ssh/id_rsa")
sftp.put( key_template_path+"/id_rsa.pub", "/root/.ssh/id_rsa.pub")
sftp.put( key_template_path+"/config", "/root/.ssh/config")
## Change permissions for private key
stdin, stdout, stderr = ssh.exec_command('chmod 0600 /root/.ssh/id_rsa')
# Add public key to authorized_keys
stdin, stdout, stderr = ssh.exec_command('cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys')
with open('./templates/ssh_keys/authorized_keys') as f:
stdin, stdout, stderr = ssh.exec_command('echo \''+f.readline().strip()+'\' >> /root/.ssh/authorized_keys')
# print stdout.readlines()
# Copy files (/etc/hosts, masters, slaves and conf templates) removing empty lines
sftp.put( "/tmp/hosts", "/etc/hosts")
sftp.close()
ssh.close()
self.host_template = host_template
## Manipulate known hosts to make a good file
known_hosts_name = '/tmp/known_hosts'
known_hosts = open(known_hosts_name, 'w')
j = 0
while j <= i:
loop = open('/tmp/known_hosts_'+str(j), 'r')
for fileLine in loop.readlines():
known_hosts.write(fileLine.strip())
loop.close()
os.system("sed -i '/^$/d' /tmp/known_hosts_"+str(j))
j = j + 1
known_hosts.close()
for (clusterkey, clusternode) in self.cluster.items():
for line in fileinput.FileInput(known_hosts_name,inplace=1):
line = line.replace(clusternode.ip_address, clusterkey).strip()
print line
# print clusterkey, clusternode.public_dns_name
## Upload perfect file
for node in nodes:
transport = paramiko.Transport((node.ip_address, 22))
transport.connect(username = 'root', password = 'secretpw')
transport.open_channel("session", node.ip_address, "localhost")
sftp = paramiko.SFTPClient.from_transport(transport)
os.system("sed -i '/^$/d' /tmp/known_hosts")
sftp.put( "/tmp/known_hosts", "/root/.ssh/known_hosts")
sftp.close()
## Save to database
self.utils.add_to_cluster_db(self.cluster, self.cluster_id)
con = sqlite.connect(self.utils.db_file)
cur = con.cursor()
## Now you should be ok, so return the nodes with hostnames
return self.cluster
# def block_until_ping(self, nodes):
# ''' Blocks until all defined instances have reached running state and an ip has been assigned'''
# ## Run describe instances until everyone is running
# tmpnodes = nodes
# nodes = []
# while len(tmpnodes) > 0 :
# print "Waiting for", len(tmpnodes), "nodes."
# response = os.system("ping -c 1 -W 4 " + tmpnodes[0].private_dns_name)
# if response == 0:
# nodes.append(tmpnodes.pop(0));
# else:
# time.sleep(1);
# return nodes
def start_cluster (self):
print "nothing to start"
def stop_cluster (self):
print "nothing to stop"