'''
Created on Jun 8, 2010
@author: vagos
'''
import paramiko, logging
import Utils
from pysqlite2 import dbapi2 as sqlite
#import sys, os, shutil, fileinput
import os, shutil, fileinput
import time, re;
class CassandraCluster39(object):
'''
This class holds all nodes of the db in the virtual cluster. It can start/stop individual
daemons as needed, thus adding/removing nodes at will. It also sets up the configuration
files as needed.
'''
def __init__(self, properties_file, initial_cluster_id="default"):
'''
Constructor
'''
## Necessary variables
self.cluster = {}
self.host_template = ""
self.cluster_id = initial_cluster_id
self.utils = Utils.Utils(properties_file)
self.templates_path = os.path.abspath(os.path.join(os.getcwd(), os.pardir))+"/templates"
# Make sure the sqlite file exists. if not, create it and add the table we need
con = sqlite.connect(self.utils.db_file)
cur = con.cursor()
try:
clusters = cur.execute('select * from clusters',
).fetchall()
if len(clusters) > 0 :
print """Already discovered cluster id from previous database file. Will select the defined one to work with (if it exists)."""
# print "Found records:\n", clusters
clustersfromcid = cur.execute('select * from clusters where cluster_id=\"' + self.cluster_id + "\"",
).fetchall()
if len(clustersfromcid) > 0 :
self.cluster = self.utils.get_cluster_from_db(self.cluster_id)
# print self.cluster
for clusterkey in self.cluster.keys():
if not (clusterkey.find("master") == -1):
self.host_template = clusterkey.replace("master", "")
# Add self to db (eliminates existing records of same id)
self.utils.add_to_cluster_db(self.cluster, self.cluster_id)
else:
print "No known cluster with this id - run configure before you proceed"
except sqlite.DatabaseError:
cur.execute('create table clusters(cluster_id text, hostname text, ip_address text)')
con.commit()
cur.close()
con.close()
## Install logger
LOG_FILENAME = self.utils.install_dir+'/'+self.utils.logs_folder_name+'/Coordinator.log'
self.my_logger = logging.getLogger('CassandraCluster39')
self.my_logger.setLevel(self.utils.logging_level)
handler = logging.handlers.RotatingFileHandler(
LOG_FILENAME, maxBytes=2*1024*1024, backupCount=5)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s")
handler.setFormatter(formatter)
self.my_logger.addHandler(handler)
def configure_cluster(self, nodes=None, host_template="", reconfigure=True, seeds=""):
hosts = open('/tmp/hosts', 'w')
i = 0
hosts.write("127.0.0.1\tlocalhost\n")
for node in nodes:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.my_logger.debug("Starting config for node: " + node.ip_address)
ssh.connect(str(node.ip_address), username="root", password="secretpw")
## Check for installation dirs, otherwise exit with error message
stderr_all = []
stdin, stdout, stderr = ssh.exec_command('ls /opt/apache-cassandra-3.9/')
stderr_all.append(stderr.readlines())
stderr_all.append(stderr.readlines())
stdin, stdout, stderr = ssh.exec_command('swapoff -a -v')
for stderr in stderr_all:
if len(stderr) > 0 :
self.my_logger.debug("ERROR - some installation files are missing")
return
#Already done inside cassandra image
# stdin, stdout, stderr = ssh.exec_command('echo "root - nofile 100000" >> /etc/security/limits.conf')
# self.my_logger.debug(str(stdout.readlines()))
# self.my_logger.debug(str(stderr.readlines()))
# stdin, stdout, stderr = ssh.exec_command('echo "root - memlock unlimited" >> /etc/security/limits.conf')
# self.my_logger.debug(str(stdout.readlines()))
# self.my_logger.debug(str(stderr.readlines()))
# stdin, stdout, stderr = ssh.exec_command('echo "root - nproc 32768" >> /etc/security/limits.conf')
# self.my_logger.debug(str(stdout.readlines()))
# self.my_logger.debug(str(stderr.readlines()))
# stdin, stdout, stderr = ssh.exec_command('echo "root - as unlimited" >> /etc/security/limits.conf')
# self.my_logger.debug(str(stdout.readlines()))
# self.my_logger.debug(str(stderr.readlines()))
# stdin, stdout, stderr = ssh.exec_command('echo "vm.max_map_count = 131072" >> /etc/sysctl.conf')
# self.my_logger.debug(str(stdout.readlines()))
# self.my_logger.debug(str(stderr.readlines()))
# stdin, stdout, stderr = ssh.exec_command('/sbin/sysctl -p')
# self.my_logger.debug(str(stdout.readlines()))
# self.my_logger.debug(str(stderr.readlines()))
# stdin, stdout, stderr = ssh.exec_command('echo 0 > /proc/sys/vm/zone_reclaim_mode')
# self.my_logger.debug(str(stdout.readlines()))
# self.my_logger.debug(str(stderr.readlines()))
stdin, stdout, stderr = ssh.exec_command('apt-get -y install ntp')
# self.my_logger.debug(str(stdout.readlines()))
# self.my_logger.debug(str(stderr.readlines()))
stdin, stdout, stderr = ssh.exec_command('service ntp stop; ntpdate -s 2.pool.ntp.org; service ntp start; exit;')
self.my_logger.debug(str(stdout.readlines()))
self.my_logger.debug(str(stderr.readlines()))
if i == 0:
# Add the master to the /etc/hosts file
hosts.write(node.ip_address + "\t" + host_template + "master\n")
# Set hostname on the machine
stdin, stdout, stderr = ssh.exec_command('echo \"' + host_template + "master\" > /etc/hostname")
stdin, stdout, stderr = ssh.exec_command('hostname \"' + host_template + "master\"")
# Add node to cluster
self.cluster[host_template + "master"] = node
else:
# Make a /etc/hosts file as you go
hosts.write(node.ip_address + "\t" + host_template + str(i) + "\n")
# Set hostname on the machine
stdin, stdout, stderr = ssh.exec_command('echo \"' + host_template + str(i) + "\" > /etc/hostname")
stdin, stdout, stderr = ssh.exec_command('hostname \"' + host_template + str(i) + "\"")
# Add node to cluster
self.cluster[host_template + str(i)] = node
ssh.close()
# Save all collected known keys
ssh.save_host_keys("/tmp/known_hosts_" + str(i))
# Increase i
i = i + 1
# Decrase to have the last node in i
i = i - 1
hosts.close()
key_template_path = self.templates_path+"/ssh_keys"
## Copy standard templates and name each node accordingly
for node in nodes:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(str(node.ip_address), username="root", password="secretpw")
## Sync clocks over IPv6
# stdin, stdout, stderr = ssh.exec_command('ntpdate 2.pool.ntp.org;')
transport = paramiko.Transport((str(node.ip_address), 22))
transport.connect(username="root", password="secretpw")
transport.open_channel("session", str(node.ip_address), "localhost")
sftp = paramiko.SFTPClient.from_transport(transport)
# Copy private and public key
sftp.put(key_template_path + "/id_rsa", "/root/.ssh/id_rsa")
sftp.put(key_template_path + "/id_rsa.pub", "/root/.ssh/id_rsa.pub")
sftp.put(key_template_path + "/config", "/root/.ssh/config")
sftp.put( key_template_path+"/authorized_keys", "/root/.ssh/authorized_keys")
## Change permissions for private key
stdin, stdout, stderr = ssh.exec_command('chmod 0600 /root/.ssh/id_rsa')
##Add ycsb create table script
sftp.put(self.templates_path+"/ycsb/create_ycsbtable", "/root/create_ycsbtable")
# Add public key to authorized_keys
stdin, stdout, stderr = ssh.exec_command('cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys')
# copy necessary templates to /tmp to alter them
shutil.copy(self.templates_path+"/cassandra39/cassandra.yaml", "/tmp/cassandra.yaml")
cassandra_yaml = '/tmp/cassandra.yaml'
# Edit cassandra.yaml to reflect seed changes
if(seeds == ""):
j = 1
for node_seed in nodes:
seeds = seeds + node_seed.ip_address
if j < len(nodes):
seeds = seeds + ","
j = j + 1
for line in fileinput.FileInput(cassandra_yaml, inplace=1):
line = line.replace("SEEDS", seeds).strip('\n')
print line
# os.system("sed -i '/^$/d' /tmp/cassandra.yaml")
# Add the node's ip to cassandra.yaml
for line in fileinput.FileInput(cassandra_yaml, inplace=1):
line = line.replace("LISTEN_ADDRESS", str(node.ip_address)).strip('\n')
print line
# os.system("sed -i '/^$/d' /tmp/cassandra.yaml")
shutil.copy(self.templates_path+"/cassandra39/cassandra-env.sh", "/tmp/cassandra-env.sh")
cassandra_env = '/tmp/cassandra-env.sh'
# for line in fileinput.FileInput(cassandra_env,inplace=1):
# line = line.replace("HEAPMEM",str(int(int(self.utils.ram)*0.6))).strip()
# print line
if not reconfigure:
stdin, stdout, stderr = ssh.exec_command('rm -fr /opt/apache-cassandra-3.9/data/')
shutil.copy(self.templates_path+"/cassandra39/bashrc", "/tmp/bashrc")
bashrc = "/tmp/bashrc"
for line in fileinput.FileInput(bashrc,inplace=1):
line = line.replace("HEAPMEM",str(int(int(self.utils.ram)*0.6))).strip()
print line
# Copy files (/etc/hosts, masters, slaves and conf templates)
sftp.put("/tmp/hosts", "/etc/hosts")
sftp.put(bashrc,"/root/.bashrc")
sftp.put(cassandra_yaml, "/opt/apache-cassandra-3.9/conf/cassandra.yaml")
sftp.put(cassandra_env, "/opt/apache-cassandra-3.9/conf/cassandra-env.sh")
sftp.put(self.templates_path+"/cassandra39/logback.xml", "/opt/apache-cassandra-3.9/conf/logback.xml")
sftp.put(self.templates_path+"/cassandra39/metrics-reporter-config.yaml", "/opt/apache-cassandra-3.9/conf/metrics-reporter-config.yaml")
#sftp.put( self.templates_path+"/network/bandwidth_limit.sh","/root/bandwidth_limit.sh")
sftp.close()
#ssh = paramiko.SSHClient()
#ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
#ssh.connect(node.ip_address, username='root', password='secretpw')
#self.my_logger.debug("Bandwidth_limit")
#stdin, stdout, stderr = ssh.exec_command('cd /root; source .bashrc; chmod 755 bandwidth_limit.sh; ./bandwidth_limit.sh')
#self.my_logger.debug(str(stdout.readlines()))
#self.my_logger.debug(str(stderr.readlines()))
#ssh.close()
self.host_template = host_template
## Manipulate known hosts to make a good file
known_hosts_name = '/tmp/known_hosts'
known_hosts = open(known_hosts_name, 'w')
j = 0
while j <= i:
loop = open('/tmp/known_hosts_' + str(j), 'r')
for fileLine in loop.readlines():
known_hosts.write(fileLine.strip('\n'))
loop.close()
os.system("sed -i '/^$/d' /tmp/known_hosts")
j = j + 1
known_hosts.close()
for (clusterkey, clusternode) in self.cluster.items():
for line in fileinput.FileInput(known_hosts_name, inplace=1):
line = line.replace(str(clusternode.ip_address), clusterkey).strip('\n')
print line
## Upload perfect known hosts file
for node in nodes:
transport = paramiko.Transport((str(node.ip_address), 22))
transport.connect(username="root", password="secretpw")
transport.open_channel("session", str(node.ip_address), "localhost")
sftp = paramiko.SFTPClient.from_transport(transport)
# os.system("sed -i '/^$/d' /tmp/known_hosts")
sftp.put("/tmp/known_hosts", "/root/.ssh/known_hosts")
sftp.close()
## Save to database
self.utils.add_to_cluster_db(self.cluster, self.cluster_id)
## Now you should be ok, so return the nodes with hostnames
return self.cluster
def start_cluster (self, nodes, sleepFor=0):
master_instance_ip = self.utils.get_instance_from_cluster_db(self.utils.cluster_name, self.utils.hostname_template+"master")[2]
for node in nodes:
self.my_logger.debug("Starting Cassandra on: " + str(node.ip_address))
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(str(node.ip_address), username="root", password="secretpw")
stdin, stdout, stderr = ssh.exec_command('cd /opt/apache-cassandra-3.9/bin; ./cassandra -R -Dcassandra.metricsReporterConfigFile=metrics-reporter-config.yaml; exit;')
# self.my_logger.debug(stdout.readlines())
# self.my_logger.debug(stderr.readlines())
ssh.close()
time.sleep(sleepFor)
for node in nodes:
if (master_instance_ip == node.ip_address):
self.my_logger.debug("Waiting "+str(len(nodes)*30)+" secs for cassandra to run on master and then create the keyspace ycsb")
time.sleep(len(nodes)*30)
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(str(node.ip_address), username="root", password="secretpw")
stdin, stdout, stderr = ssh.exec_command('cd /opt/apache-cassandra-3.9/bin; ./cqlsh -f /root/create_ycsbtable; exit;')
self.my_logger.debug(stdout.readlines())
self.my_logger.debug(stderr.readlines())
ssh.close()
break
def stop_cluster (self):
for node in self.cluster.values():
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(str(node.ip_address), username="root", password="secretpw")
stdin, stdout, stderr = ssh.exec_command('pkill java')
print stdout.readlines()
ssh.close()
def add_nodes (self, new_nodes=None):
## Reconfigure the cluster with the last node as the provided one
nodes = []
nodes.append(self.cluster[self.host_template + "master"])
for i in range(1, len(self.cluster)):
nodes.append(self.cluster[self.host_template + str(i)])
seeds = ""
j = 1
for node_seed in nodes:
seeds = seeds + node_seed.ip_address
if j < len(nodes):
seeds = seeds + ","
j = j + 1
nodes.extend(new_nodes)
self.my_logger.debug("New nodes:"+str(nodes))
## Stop the cluster (should ensure that new nodes are added to the ring)
# self.stop_cluster()
self.my_logger.debug("sleep for 20secs and then configure")
time.sleep(20)
self.configure_cluster(nodes , self.host_template, True, seeds)
## Start the new configuration!
self.my_logger.debug("Sleeping for 5 mins after every addition")
self.start_cluster(new_nodes,300)
self.update_clients_hosts_file()
## Try to rebalance the cluster (usually rebalances the new node)
# self.rebalance_cluster()
## Now you should be ok, so return the new node
return nodes
def update_clients_hosts_file(self):
clients = self.utils.getClients()
#Creating /etc/hosts file
cluster = self.utils.get_cluster(self.utils.cluster_name)
hosts = ""
for instance in cluster:
hosts = hosts + (str(instance[2])+"\t"+str(instance[1])+"\n")
shutil.copy(self.templates_path+"/hbase94/hosts", "/tmp/hosts")
hosts_file = '/tmp/hosts'
for line in fileinput.FileInput(hosts_file,inplace=1):
line = line.replace("HOSTNAMES",hosts).strip()
print line
for client in clients:
self.my_logger.debug("Sending hosts file to: " + client.ip_address)
transport = paramiko.Transport((client.ip_address, 22))
transport.connect(username = client.username, password = client.password)
transport.open_channel("session", client.ip_address, "localhost")
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.put( hosts_file,"/etc/hosts")
sftp.close()
def remove_node (self, hostname=""):
master_instance_ip = self.utils.get_instance_from_cluster_db(self.utils.cluster_name, self.utils.hostname_template+"master")[2]
## Remove node by hostname -- DOES NOST REMOVE THE MASTER
nodes = []
nodes.append(self.cluster[self.host_template + "master"])
for i in range(1, len(self.cluster)):
if not (self.host_template + str(i)).endswith(hostname):
nodes.append(self.cluster[self.host_template + str(i)])
self.my_logger.debug("New nodes:"+str( nodes))
## keep node
node = self.cluster.pop(hostname)
print "Removing:", hostname
## Kill all java processes on the removed node
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(str(node.ip_address), username="root", password="secretpw")
# stdin, stdout, stderr = ssh.exec_command('/opt/apache-cassandra-2.2.3/bin/nodetool decommission')
# stdin, stdout, stderr = ssh.exec_command('pkill -9 java')
stdin, stdout, stderr = ssh.exec_command('cd /opt/apache-cassandra-3.9/bin; chmod 755 nodetool; ./nodetool status; exit;')
lines=stdout.readlines()
ssh.close()
uuid = ""
for line in lines:
if(str(node.ip_address) in line):
uuid = re.sub(' +',' ',line).split(" ")[6]
break
ssh.close()
ssh.connect(str(master_instance_ip), username="root", password="secretpw")
stdin, stdout, stderr = ssh.exec_command('cd /opt/apache-cassandra-3.9/bin; chmod 755 nodetool; ./nodetool removenode '+uuid+'; exit;')
self.my_logger.debug("Waiting 3 minutes to allow nodetool removenode "+uuid+" command to run")
time.sleep(180)
ssh.close()
## Reconfigure cluster
self.configure_cluster(nodes , self.host_template, True)
self.my_logger.debug(str(self.cluster))
## Stop the cluster (should ensure that new nodes are added to the ring)
# self.stop_cluster()
self.update_clients_hosts_file()
## Now you should be ok, so return the new node
return node
def rebalance_cluster (self):
n = 1
numOfNodes = len(self.cluster)
for node in self.cluster.values():
token = 2 ** 127 / numOfNodes * n
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(str(node.ip_address), username="root", password="secretpw")
stdin, stdout, stderr = ssh.exec_command('/opt/apache-cassandra-3.9/bin/nodetool move ' + str(token))
print stdout.readlines()
ssh.close()
n+=1
for node in self.cluster.values():
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(str(node.ip_address), username="root", password="secretpw")
stdin, stdout, stderr = ssh.exec_command('/opt/apache-cassandra-3.9/bin/nodetool cleanup')
print stdout.readlines()
ssh.close()
return True