#import paramiko
import re, random
#import boto
#import boto.ec2
#from euca2ools import Euca2ool, InstanceValidationError, ConnectionFailed, FileValidationError
from pysqlite2 import dbapi2 as sqlite
#import sys, os
import os, logging, errno
from ConfigParser import ConfigParser
#from scipy.interpolate import InterpolatedUnivariateSpline
from scipy.interpolate import UnivariateSpline
from collections import OrderedDict
from Instance import Instance
import numpy as np
class Utils(object):
'''
This class holds utility functions.
'''
def __init__(self,properties_file):
self.read_properties(properties_file)
self.mac_ip_mapping = {
"vm50.delab.csd.auth.gr":("a6:89:d4:1b:6d:21","192.168.1.50"),
"vm51.delab.csd.auth.gr":("36:86:86:3c:01:6d","192.168.1.51"),
"vm52.delab.csd.auth.gr":("44:66:11:45:bf:aa","192.168.1.52"),
"vm53.delab.csd.auth.gr":("f0:37:d7:e2:1f:d1","192.168.1.53"),
"vm54.delab.csd.auth.gr":("00:b8:a2:a0:af:70","192.168.1.54"),
"vm55.delab.csd.auth.gr":("9c:66:ab:89:32:cc","192.168.1.55"),
"vm56.delab.csd.auth.gr":("98:43:bd:58:34:b1","192.168.1.56"),
"vm57.delab.csd.auth.gr":("1c:45:09:a7:f4:e9","192.168.1.57"),
"vm58.delab.csd.auth.gr":("d6:40:d1:6a:e8:b8","192.168.1.58"),
"vm59.delab.csd.auth.gr":("e8:cb:ef:24:06:93","192.168.1.59"),
"vm60.delab.csd.auth.gr":("72:46:86:76:ed:bf","192.168.1.60"),
"vm61.delab.csd.auth.gr":("e4:44:2d:a2:c4:0f","192.168.1.61"),
"vm62.delab.csd.auth.gr":("34:5e:9e:ba:a7:cf","192.168.1.62"),
"vm63.delab.csd.auth.gr":("00:e5:78:bf:e5:b8","192.168.1.63"),
"vm64.delab.csd.auth.gr":("1e:72:d2:c8:f3:a7","192.168.1.64"),
"vm65.delab.csd.auth.gr":("4a:f0:da:97:65:ef","192.168.1.65"),
"vm66.delab.csd.auth.gr":("0c:a0:a8:54:f4:5a","192.168.1.66"),
"vm67.delab.csd.auth.gr":("f4:61:40:9a:11:8c","192.168.1.67"),
"vm68.delab.csd.auth.gr":("e2:c0:b4:8d:96:f8","192.168.1.68"),
"vm69.delab.csd.auth.gr":("de:f1:08:d6:62:62","192.168.1.69"),
"vm70.delab.csd.auth.gr":("86:50:cf:f8:10:ff","192.168.1.70"),
"vm71.delab.csd.auth.gr":("e2:15:66:9a:6a:65","192.168.1.71"),
"vm72.delab.csd.auth.gr":("18:56:1c:f7:5f:05","192.168.1.72"),
"vm73.delab.csd.auth.gr":("a6:53:94:09:7c:84","192.168.1.73"),
"vm74.delab.csd.auth.gr":("f6:d3:3d:53:ca:04","192.168.1.74"),
"vm75.delab.csd.auth.gr":("e8:c2:49:ac:52:4c","192.168.1.75"),
"vm76.delab.csd.auth.gr":("38:ac:45:43:20:22","192.168.1.76"),
"vm77.delab.csd.auth.gr":("86:48:d8:7a:16:c4","192.168.1.77"),
"vm78.delab.csd.auth.gr":("1a:00:1b:91:6c:bf","192.168.1.78"),
"vm79.delab.csd.auth.gr":("20:cf:31:a7:bf:c5","192.168.1.79"),
"vm80.delab.csd.auth.gr":("36:9b:a9:49:76:05","192.168.1.80")
}
self.free_ips = self.mac_ip_mapping.keys()
def getFree_dns_names(self, count, occupied=[]):
names = []
for ip in occupied:
if ip in self.free_ips:
self.free_ips.remove(ip)
if (count <= len(self.free_ips)):
for i in range(0,count):
names.append(self.free_ips.pop())
else:
print "no available dns_names"
return names
def return_instance_from_tuple(self, tuple):
instance = Instance(tuple[0], tuple[1], tuple[2], tuple[3], tuple[4], tuple[5], tuple[6], tuple[7], tuple[8], tuple[9], tuple[10],tuple[11]);
return instance
def query_instance_db(self, pattern):
""" A helpful search for the sqlite db - returns instances"""
search_field = None
search_field1 = None
if re.match('i-', pattern):
## looking by instance id
search_field = "id"
else:
if re.match('emi-', pattern):
## looking by image id
search_field = "image_id"
else:
if pattern.find(".") != -1:
## looking by ip
search_field = "public_dns_name"
search_field1 = "private_dns_name"
else:
## looking by state
search_field = "state"
instances = []
## read from the local database
con = sqlite.connect(self.db_file)
cur = con.cursor()
instancesfromDB = []
if search_field1 :
try:
instancesfromDB = cur.execute('select * from instances where ' + search_field +"=\"" +
pattern + "\" OR " + search_field1 + "=\"" + pattern + "\""
).fetchall()
except sqlite.DatabaseError:
con.rollback()
else:
try:
instancesfromDB = cur.execute('select * from instances where ' + search_field +"=\"" +
pattern + "\"").fetchall()
except sqlite.DatabaseError:
con.rollback()
cur.close()
con.close()
for instancefromDB in instancesfromDB:
instances.append(self.return_instance_from_tuple(instancefromDB))
return instances
def query_instance_db_using_id(self, pattern):
print "pattern: "+ str(pattern)
""" A helpful search for the sqlite db - returns instances"""
search_field = "ip_address"
instances = []
## read from the local database
con = sqlite.connect(self.db_file)
cur = con.cursor()
instancesfromDB = []
try:
instancesfromDB = cur.execute('select * from instances where ' + search_field +"=\"" +
pattern + "\"").fetchall()
except sqlite.DatabaseError:
con.rollback()
cur.close()
con.close()
for instancefromDB in instancesfromDB:
instances.append(self.return_instance_from_tuple(instancefromDB))
return instances
def refresh_instance_db(self, instances):
## Update instance DB with provided instances (removes all previous entries!)
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
cur.execute('delete from instances'
)
con.commit()
except sqlite.DatabaseError:
print "ERROR in truncate"
for instance in instances:
try:
cur.execute(""" insert into instances(id, image_id, dns_name, ip_address, mac_address, vpn,
state, instance_type, launch_time, placement, username, password)
values (?,?,?,?,?,?,?,?,?,?,?,?)""",
(instance.id, instance.image_id, instance.dns_name, instance.ip_address, instance.mac_address, instance.vpn, instance.state, instance.instance_type, instance.launch_time, instance.placement,instance.username,instance.password)
)
con.commit()
except sqlite.DatabaseError, e:
print e.message
print "ERROR in insert"
cur.close()
con.close()
def add_to_instance_db(self, instances):
## Update instance DB with provided instances (keeps previous entries!)
con = sqlite.connect(self.db_file)
cur = con.cursor()
for instance in instances:
try:
cur.execute(""" insert into instances(id, image_id, public_dns_name, private_dns_name,private_ip_address_2,state,
key_name, ami_launch_index, product_codes,instance_type,
launch_time, placement, kernel, ramdisk )
values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(instance.id, instance.image_id,instance.public_dns_name,instance.private_dns_name,instance.private_ip_address_2, instance.state,
instance.key_name,instance.ami_launch_index,instance.product_codes,instance.instance_type,
instance.launch_time,instance.placement,instance.kernel,instance.ramdisk)
)
con.commit()
except sqlite.DatabaseError, e:
print e.message
print "ERROR in insert"
cur.close()
con.close()
########################################################
## Cluster DB functions
########################################################
def delete_cluster_from_db(self, clusterid = "default"):
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
cur.execute('delete from clusters where cluster_id=\"'+clusterid+"\"")
con.commit()
except sqlite.DatabaseError:
print "ERROR in truncate"
cur.close()
con.close()
def refresh_cluster_db(self, cluster=None):
## Update cluster DB with provided cluster (removes all previous entries!)
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
cur.execute('delete from clusters'
)
con.commit()
except sqlite.DatabaseError:
print "ERROR in truncate"
for (clusterkey,clustervalue) in cluster.items():
try:
cur.execute(""" insert into clusters(cluster_id, hostname, ip_address)
values (?,?,?)""",
(clusterkey, clustervalue.id, clustervalue.ip_address)
)
con.commit()
clusters = cur.execute('select * from clusters',).fetchall()
con.commit()
print "clusters refresh" + str(clusters);
except sqlite.DatabaseError, e:
print e.message
print "ERROR in insert"
cur.close()
con.close()
def get_cluster_from_db(self, cluster_id=None):
if not cluster_id:
print "Got to provide cluster id!!!"
else:
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
clusterfromDB = cur.execute('select * from clusters where cluster_id = \"'+cluster_id+"\""
).fetchall()
con.commit()
except sqlite.DatabaseError:
print "ERROR in select"
return None
if len(clusterfromDB) < 1:
print "Have not found the requested cluster - exiting."
else:
## build a cluster object
cluster = {}
for clusternode in clusterfromDB:
print (clusternode[2])
## query db to get the corresponding instance
instance = self.query_instance_db_using_id(clusternode[2])
## Populate cluster if instance in db
if instance:
cluster[clusternode[1]] = instance[0]
# print cluster
# print "Instance:", instance
# sys.stdout.flush()
return cluster
return None
def get_cluster(self, cluster_id=None):
if not cluster_id:
print "Got to provide cluster id!!!"
else:
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
clusterfromDB = cur.execute('select * from clusters where cluster_id = \"'+cluster_id+"\""
).fetchall()
con.commit()
except sqlite.DatabaseError:
print "ERROR in select"
return None
if len(clusterfromDB) < 1:
print "Have not found the requested cluster - exiting."
else:
return clusterfromDB
return None
def get_instance_from_cluster_db(self, cluster_id=None, hostname=None):
if (not cluster_id) or (not hostname):
print "Got to provide cluster id and hostname!!!"
else:
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
instanceFromClusterDB = cur.execute('select * from clusters where cluster_id = \"'+cluster_id+'\" and hostname = \"'+hostname+'\"'
).fetchall()
con.commit()
except sqlite.DatabaseError:
print "ERROR in select"
return None
if len(instanceFromClusterDB) < 1:
print "Have not found the requested instance fromcluster - exiting."
else:
return instanceFromClusterDB[0]
return None
def get_num_of_running_instances(self, cluster_id=None):
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
instanceFromClusterDB = cur.execute('select * from clusters where cluster_id = \"'+cluster_id+'\"').fetchall()
con.commit()
return len(instanceFromClusterDB)
except sqlite.DatabaseError:
print "ERROR in select"
return 0
def add_to_cluster_db(self, cluster=None, cluster_id=None):
## Add cluster to DB (check for existing records with the same id and remove)
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
cur.execute('delete from clusters where cluster_id = \"'+cluster_id+"\""
)
con.commit()
except sqlite.DatabaseError:
print "No previous entries"
for (clusterkey,clustervalue) in cluster.items():
try:
cur.execute(""" insert into clusters(cluster_id, hostname, ip_address)
values (?,?,?)""",
(cluster_id, clusterkey, clustervalue.ip_address)
)
con.commit()
except sqlite.DatabaseError, e:
print e.message
print "ERROR in insert"
cur.close()
con.close()
def rem_from_cluster_db(self, cluster_id=None, hostname=None):
## Add cluster to DB (check for existing records with the same id and remove)
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
cur.execute('delete from clusters where cluster_id = \"'+cluster_id+"\" and hostname = \""+hostname+"\""
)
con.commit()
except sqlite.DatabaseError:
print "Error in delete"
cur.close()
con.close()
def get_hostname(self, cluster_id=None, ip_address=None):
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
hostnames = cur.execute('select hostname from clusters where cluster_id = \"'+cluster_id+"\" and ip_address = \""+ip_address+"\""
).fetchall()
con.commit()
return hostnames[0]
except sqlite.DatabaseError:
print "Error in delete"
return []
cur.close()
con.close()
def mkdir_p(self,path):
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else: raise
def median(self, mylist):
sorts = sorted(mylist)
length = len(sorts)
if not length % 2:
return (sorts[length / 2] + sorts[length / 2 - 1]) / 2.0
return sorts[length / 2]
def interpolate_metrics(self, memory, state, cur_lambda):
state = str(state)
lambdaDict = {}
throughputList = []
latencyList = []
#the maximum from the smaller lambdas
max_smaller_lambda = [-1,0,0,0]
#the minimum from the greater lambdas
min_greater_lambda = [-1,0,0,0]
for j in memory[state]['arrayMeas']:
if cur_lambda <= j[0]:
if (j[0] <= min_greater_lambda[0] or min_greater_lambda[0] == -1):
min_greater_lambda = j
else:
if (j[0] >= max_smaller_lambda[0] or max_smaller_lambda[0] == -1):
max_smaller_lambda = j
if j[0] in lambdaDict:
lambdaDict[j[0]][0].append(j[1])
lambdaDict[j[0]][1].append(j[2])
else:
lambdaDict[j[0]] = ([j[1]],[j[2]])
ordered_lambda = OrderedDict(sorted(lambdaDict.items()))
throughputList = []
latencyList = []
for l in ordered_lambda.values():
throughputList.append(self.median(l[0]))
latencyList.append(self.median(l[1]))
if (min_greater_lambda[0] == -1) or (max_smaller_lambda[0] == -1):
# print state
# print cur_lambda
# print str(memory[state]['arrayMeas'])
# print "ordered set -> " + str(ordered_lambda)
fc_thr = UnivariateSpline(ordered_lambda.keys(),throughputList, k=1)
thr = fc_thr(cur_lambda)
fc_lat = UnivariateSpline(ordered_lambda.keys(),latencyList, k=1)
lat = fc_lat(cur_lambda)
else:
thr = np.interp(cur_lambda,ordered_lambda.keys(),throughputList)
lat = np.interp(cur_lambda,ordered_lambda.keys(),latencyList)
return (thr, lat)
# def thr(self, vm, inlambda):
# if(3000*vm<inlambda) :
# return (3000.0*vm - 0.03*inlambda + self.myR(-0.05*(3000.0*vm - 0.03*inlambda),0.0));
# else:
# return inlambda+self.myR(-0.1*inlambda, 0.0);
# def lat(self, vm, inlambda):
# if(8000*vm<inlambda):
# return (4*vm)+0.00001*(inlambda-8000.0*vm)*(inlambda-8000.0*vm)+self.myR(0.0, 0.000001*(inlambda-8000.0*vm)*(inlambda-8000.0*vm));
# else:
# return (0.0005*inlambda + self.myR(0.0, 10.0));
# def myR(self, apo,ews):
# return ((ews-apo)*random.uniform(0,1))+apo;
def read_properties(self, property_file="Coordinator.properties"):
""" process properties file """
## Reads the configuration properties
cfg = ConfigParser()
cfg.read(property_file)
self.install_dir = cfg.get("config", "install_dir")
self.logs_folder_name = 'logs'
#self.euca_rc_dir = cfg.get("config", "euca_rc_dir")
self.bucket_name = cfg.get("config", "bucket_name")
self.instance_type = cfg.get("config", "instance_type")
self.cluster_name = cfg.get("config", "cluster_name")
self.hostname_template = cfg.get("config", "hostname_template")
self.cluster_type = cfg.get("config", "cluster_type")
self.db_file = cfg.get("config", "db_file")
self.cloud_api_type = cfg.get("config", "cloud_api_type")
self.rabbit_maxmem = int(cfg.get("config", "rabbit_maxmem"))
self.panther_maxmem = int(cfg.get("config", "panther_maxmem"))
self.deer_maxmem = int(cfg.get("config", "deer_maxmem"))
self.initial_cluster_size = int(cfg.get("config", "initial_cluster_size"))
self.ownerTag = cfg.get("config", "ownerTag")
self.hadoop_heap = cfg.get("config", "hadoop_heap")
self.dfs_replication = cfg.get("config", "dfs_replication")
self.map_memory = cfg.get("config", "map_memory")
self.reduce_memory = cfg.get("config", "reduce_memory")
self.map_tasks = cfg.get("config", "map_tasks")
self.reduce_tasks = cfg.get("config", "reduce_tasks")
self.total_map_tasks = cfg.get("config", "total_map_tasks")
self.total_reduce_tasks = cfg.get("config", "total_reduce_tasks")
self.reconfigure = eval(cfg.get("config", "reconfigure"))
if(self.cloud_api_type == 'ganeti'):
self.disk = self.instance_type.split('_')[0]
self.cpu = self.instance_type.split('_')[1]
self.ram = self.instance_type.split('_')[2]
self.logging_level = logging.DEBUG if cfg.get("config","logging_level") == "debug" else logging.INFO;