#import paramiko
import re, random
#import boto
#import boto.ec2
#from euca2ools import Euca2ool, InstanceValidationError, ConnectionFailed, FileValidationError
from pysqlite2 import dbapi2 as sqlite
#import sys, os
import os, logging, errno
from ConfigParser import ConfigParser
#from scipy.interpolate import InterpolatedUnivariateSpline
from scipy.interpolate import UnivariateSpline
from collections import OrderedDict
from Instance import Instance
import numpy as np
class Utils(object):
'''
This class holds utility functions.
'''
def __init__(self,properties_file):
self.read_properties(properties_file)
self.mac_ip_mapping = {
"vm40.delab.csd.auth.gr":("e4:dd:4a:c2:36:33","192.168.1.40"),
"vm41.delab.csd.auth.gr":("73:3d:a0:aa:4e:bf","192.168.1.41"),
"vm42.delab.csd.auth.gr":("a2:da:bb:32:01:02","192.168.1.42"),
"vm43.delab.csd.auth.gr":("b2:50:a9:ac:12:19","192.168.1.43"),
"vm44.delab.csd.auth.gr":("31:65:3b:77:11:78","192.168.1.44"),
"vm45.delab.csd.auth.gr":("d2:a5:e5:33:cf:09","192.168.1.45"),
"vm46.delab.csd.auth.gr":("29:72:ff:69:cc:c7","192.168.1.46"),
"vm47.delab.csd.auth.gr":("5e:72:9d:12:25:3a","192.168.1.47"),
"vm48.delab.csd.auth.gr":("45:fa:7b:a3:c2:5b","192.168.1.48"),
"vm49.delab.csd.auth.gr":("99:0d:28:13:69:97","192.168.1.49"),
"vm50.delab.csd.auth.gr":("a6:89:d4:1b:6d:21","192.168.1.50"),
"vm51.delab.csd.auth.gr":("36:86:86:3c:01:6d","192.168.1.51"),
"vm52.delab.csd.auth.gr":("44:66:11:45:bf:aa","192.168.1.52"),
"vm53.delab.csd.auth.gr":("f0:37:d7:e2:1f:d1","192.168.1.53"),
"vm54.delab.csd.auth.gr":("00:b8:a2:a0:af:70","192.168.1.54"),
"vm55.delab.csd.auth.gr":("9c:66:ab:89:32:cc","192.168.1.55"),
"vm56.delab.csd.auth.gr":("98:43:bd:58:34:b1","192.168.1.56"),
"vm57.delab.csd.auth.gr":("1c:45:09:a7:f4:e9","192.168.1.57"),
"vm58.delab.csd.auth.gr":("d6:40:d1:6a:e8:b8","192.168.1.58"),
"vm59.delab.csd.auth.gr":("e8:cb:ef:24:06:93","192.168.1.59"),
"vm60.delab.csd.auth.gr":("72:46:86:76:ed:bf","192.168.1.60"),
"vm61.delab.csd.auth.gr":("e4:44:2d:a2:c4:0f","192.168.1.61"),
"vm62.delab.csd.auth.gr":("34:5e:9e:ba:a7:cf","192.168.1.62"),
"vm63.delab.csd.auth.gr":("00:e5:78:bf:e5:b8","192.168.1.63"),
"vm64.delab.csd.auth.gr":("1e:72:d2:c8:f3:a7","192.168.1.64"),
"vm65.delab.csd.auth.gr":("4a:f0:da:97:65:ef","192.168.1.65"),
"vm66.delab.csd.auth.gr":("0c:a0:a8:54:f4:5a","192.168.1.66"),
"vm67.delab.csd.auth.gr":("f4:61:40:9a:11:8c","192.168.1.67"),
"vm68.delab.csd.auth.gr":("e2:c0:b4:8d:96:f8","192.168.1.68"),
"vm69.delab.csd.auth.gr":("de:f1:08:d6:62:62","192.168.1.69"),
"vm70.delab.csd.auth.gr":("86:50:cf:f8:10:ff","192.168.1.70"),
"vm71.delab.csd.auth.gr":("e2:15:66:9a:6a:65","192.168.1.71"),
"vm72.delab.csd.auth.gr":("18:56:1c:f7:5f:05","192.168.1.72"),
"vm73.delab.csd.auth.gr":("a6:53:94:09:7c:84","192.168.1.73"),
"vm74.delab.csd.auth.gr":("f6:d3:3d:53:ca:04","192.168.1.74"),
"vm75.delab.csd.auth.gr":("e8:c2:49:ac:52:4c","192.168.1.75"),
"vm76.delab.csd.auth.gr":("38:ac:45:43:20:22","192.168.1.76"),
"vm77.delab.csd.auth.gr":("86:48:d8:7a:16:c4","192.168.1.77"),
"vm78.delab.csd.auth.gr":("1a:00:1b:91:6c:bf","192.168.1.78"),
"vm79.delab.csd.auth.gr":("20:cf:31:a7:bf:c5","192.168.1.79"),
"vm80.delab.csd.auth.gr":("36:9b:a9:49:76:05","192.168.1.80"),
"vm81.delab.csd.auth.gr":("C4:C2:A5:02:AD:E3","192.168.1.81"),
"vm82.delab.csd.auth.gr":("34:BF:73:B3:9D:EF","192.168.1.82"),
"vm83.delab.csd.auth.gr":("18:69:D3:06:C5:78","192.168.1.83"),
"vm84.delab.csd.auth.gr":("9A:4E:1E:EA:29:68","192.168.1.84"),
"vm85.delab.csd.auth.gr":("F6:EE:BC:3B:AB:AF","192.168.1.85"),
"vm86.delab.csd.auth.gr":("FE:1E:32:02:0F:56","192.168.1.86"),
"vm87.delab.csd.auth.gr":("6E:98:16:C7:8A:D3","192.168.1.87"),
"vm88.delab.csd.auth.gr":("3E:52:A7:B1:9B:7A","192.168.1.88"),
"vm89.delab.csd.auth.gr":("4C:04:95:BA:EC:82","192.168.1.89"),
"vm90.delab.csd.auth.gr":("E4:46:51:70:E1:BA","192.168.1.90"),
"vm91.delab.csd.auth.gr":("A2:66:B2:37:E0:6F","192.168.1.91"),
"vm92.delab.csd.auth.gr":("02:A9:83:64:4D:50","192.168.1.92"),
"vm93.delab.csd.auth.gr":("78:18:73:B0:44:71","192.168.1.93"),
"vm94.delab.csd.auth.gr":("08:D2:28:67:AD:8D","192.168.1.94"),
"vm95.delab.csd.auth.gr":("C0:44:25:C0:E5:C6","192.168.1.95"),
"vm96.delab.csd.auth.gr":("D2:89:2E:89:44:96","192.168.1.96"),
"vm97.delab.csd.auth.gr":("AA:7E:4A:3A:32:68","192.168.1.97"),
"vm98.delab.csd.auth.gr":("3E:B8:A0:96:BF:30","192.168.1.98"),
"vm99.delab.csd.auth.gr":("20:DA:56:86:29:13","192.168.1.99")
}
self.free_ips = self.mac_ip_mapping.keys()
def getFree_dns_names(self, count, occupied=[]):
names = []
for ip in occupied:
if ip in self.free_ips:
self.free_ips.remove(ip)
if (count <= len(self.free_ips)):
for i in range(0,count):
names.append(self.free_ips.pop())
else:
print "no available dns_names"
return names
def return_instance_from_tuple(self, tuple):
instance = Instance(tuple[0], tuple[1], tuple[2], tuple[3], tuple[4], tuple[5], tuple[6], tuple[7], tuple[8], tuple[9], tuple[10],tuple[11]);
return instance
def query_instance_db(self, pattern):
""" A helpful search for the sqlite db - returns instances"""
search_field = None
search_field1 = None
if re.match('i-', pattern):
## looking by instance id
search_field = "id"
else:
if re.match('emi-', pattern):
## looking by image id
search_field = "image_id"
else:
if pattern.find(".") != -1:
## looking by ip
search_field = "public_dns_name"
search_field1 = "private_dns_name"
else:
## looking by state
search_field = "state"
instances = []
## read from the local database
con = sqlite.connect(self.db_file)
cur = con.cursor()
instancesfromDB = []
if search_field1 :
try:
instancesfromDB = cur.execute('select * from instances where ' + search_field +"=\"" +
pattern + "\" OR " + search_field1 + "=\"" + pattern + "\""
).fetchall()
except sqlite.DatabaseError:
con.rollback()
else:
try:
instancesfromDB = cur.execute('select * from instances where ' + search_field +"=\"" +
pattern + "\"").fetchall()
except sqlite.DatabaseError:
con.rollback()
cur.close()
con.close()
for instancefromDB in instancesfromDB:
instances.append(self.return_instance_from_tuple(instancefromDB))
return instances
def query_instance_db_using_id(self, pattern):
print "pattern: "+ str(pattern)
""" A helpful search for the sqlite db - returns instances"""
search_field = "ip_address"
instances = []
## read from the local database
con = sqlite.connect(self.db_file)
cur = con.cursor()
instancesfromDB = []
try:
instancesfromDB = cur.execute('select * from instances where ' + search_field +"=\"" +
pattern + "\"").fetchall()
except sqlite.DatabaseError:
con.rollback()
cur.close()
con.close()
for instancefromDB in instancesfromDB:
instances.append(self.return_instance_from_tuple(instancefromDB))
return instances
def refresh_instance_db(self, instances):
## Update instance DB with provided instances (removes all previous entries!)
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
cur.execute('delete from instances'
)
con.commit()
except sqlite.DatabaseError:
print "ERROR in truncate"
for instance in instances:
try:
cur.execute(""" insert into instances(id, image_id, dns_name, ip_address, mac_address, vpn,
state, instance_type, launch_time, placement, username, password)
values (?,?,?,?,?,?,?,?,?,?,?,?)""",
(instance.id, instance.image_id, instance.dns_name, instance.ip_address, instance.mac_address, instance.vpn, instance.state, instance.instance_type, instance.launch_time, instance.placement,instance.username,instance.password)
)
con.commit()
except sqlite.DatabaseError, e:
print e.message
print "ERROR in insert"
cur.close()
con.close()
def add_to_instance_db(self, instances):
## Update instance DB with provided instances (keeps previous entries!)
con = sqlite.connect(self.db_file)
cur = con.cursor()
for instance in instances:
try:
cur.execute(""" insert into instances(id, image_id, public_dns_name, private_dns_name,private_ip_address_2,state,
key_name, ami_launch_index, product_codes,instance_type,
launch_time, placement, kernel, ramdisk )
values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(instance.id, instance.image_id,instance.public_dns_name,instance.private_dns_name,instance.private_ip_address_2, instance.state,
instance.key_name,instance.ami_launch_index,instance.product_codes,instance.instance_type,
instance.launch_time,instance.placement,instance.kernel,instance.ramdisk)
)
con.commit()
except sqlite.DatabaseError, e:
print e.message
print "ERROR in insert"
cur.close()
con.close()
########################################################
## Cluster DB functions
########################################################
def delete_cluster_from_db(self, clusterid = "default"):
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
cur.execute('delete from clusters where cluster_id=\"'+clusterid+"\"")
con.commit()
except sqlite.DatabaseError:
print "ERROR in truncate"
cur.close()
con.close()
def refresh_cluster_db(self, cluster=None):
## Update cluster DB with provided cluster (removes all previous entries!)
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
cur.execute('delete from clusters'
)
con.commit()
except sqlite.DatabaseError:
print "ERROR in truncate"
for (clusterkey,clustervalue) in cluster.items():
try:
cur.execute(""" insert into clusters(cluster_id, hostname, ip_address)
values (?,?,?)""",
(clusterkey, clustervalue.id, clustervalue.ip_address)
)
con.commit()
clusters = cur.execute('select * from clusters',).fetchall()
con.commit()
print "clusters refresh" + str(clusters);
except sqlite.DatabaseError, e:
print e.message
print "ERROR in insert"
cur.close()
con.close()
def get_cluster_from_db(self, cluster_id=None):
if not cluster_id:
print "Got to provide cluster id!!!"
else:
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
clusterfromDB = cur.execute('select * from clusters where cluster_id = \"'+cluster_id+"\""
).fetchall()
con.commit()
except sqlite.DatabaseError:
print "ERROR in select"
return None
if len(clusterfromDB) < 1:
print "Have not found the requested cluster - exiting."
else:
## build a cluster object
cluster = {}
for clusternode in clusterfromDB:
print (clusternode[2])
## query db to get the corresponding instance
instance = self.query_instance_db_using_id(clusternode[2])
## Populate cluster if instance in db
if instance:
cluster[clusternode[1]] = instance[0]
# print cluster
# print "Instance:", instance
# sys.stdout.flush()
return cluster
return None
def get_cluster(self, cluster_id=None):
if not cluster_id:
print "Got to provide cluster id!!!"
else:
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
clusterfromDB = cur.execute('select * from clusters where cluster_id = \"'+cluster_id+"\""
).fetchall()
con.commit()
except sqlite.DatabaseError:
print "ERROR in select"
return None
if len(clusterfromDB) < 1:
print "Have not found the requested cluster - exiting."
else:
return clusterfromDB
return None
def get_instance_from_cluster_db(self, cluster_id=None, hostname=None):
if (not cluster_id) or (not hostname):
print "Got to provide cluster id and hostname!!!"
else:
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
instanceFromClusterDB = cur.execute('select * from clusters where cluster_id = \"'+cluster_id+'\" and hostname = \"'+hostname+'\"'
).fetchall()
con.commit()
except sqlite.DatabaseError:
print "ERROR in select"
return None
if len(instanceFromClusterDB) < 1:
print "Have not found the requested instance fromcluster - exiting."
else:
return instanceFromClusterDB[0]
return None
def get_num_of_running_instances(self, cluster_id=None):
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
instanceFromClusterDB = cur.execute('select * from clusters where cluster_id = \"'+cluster_id+'\"').fetchall()
con.commit()
return len(instanceFromClusterDB)
except sqlite.DatabaseError:
print "ERROR in select"
return 0
def add_to_cluster_db(self, cluster=None, cluster_id=None):
## Add cluster to DB (check for existing records with the same id and remove)
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
cur.execute('delete from clusters where cluster_id = \"'+cluster_id+"\""
)
con.commit()
except sqlite.DatabaseError:
print "No previous entries"
for (clusterkey,clustervalue) in cluster.items():
try:
cur.execute(""" insert into clusters(cluster_id, hostname, ip_address)
values (?,?,?)""",
(cluster_id, clusterkey, clustervalue.ip_address)
)
con.commit()
except sqlite.DatabaseError, e:
print e.message
print "ERROR in insert"
cur.close()
con.close()
def rem_from_cluster_db(self, cluster_id=None, hostname=None):
## Add cluster to DB (check for existing records with the same id and remove)
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
cur.execute('delete from clusters where cluster_id = \"'+cluster_id+"\" and hostname = \""+hostname+"\""
)
con.commit()
except sqlite.DatabaseError:
print "Error in delete"
cur.close()
con.close()
def get_hostname(self, cluster_id=None, ip_address=None):
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
hostnames = cur.execute('select hostname from clusters where cluster_id = \"'+cluster_id+"\" and ip_address = \""+ip_address+"\""
).fetchall()
con.commit()
return hostnames[0]
except sqlite.DatabaseError:
print "Error in delete"
return []
cur.close()
con.close()
def mkdir_p(self,path):
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else: raise
def median(self, mylist):
sorts = sorted(mylist)
length = len(sorts)
if not length % 2:
return (sorts[length / 2] + sorts[length / 2 - 1]) / 2.0
return sorts[length / 2]
def interpolate_metrics(self, memory, state, cur_lambda):
state = str(state)
lambdaDict = {}
throughputList = []
latencyList = []
#the maximum from the smaller lambdas
max_smaller_lambda = [-1,0,0,0]
#the minimum from the greater lambdas
min_greater_lambda = [-1,0,0,0]
for j in memory[state]['arrayMeas']:
if cur_lambda <= j[0]:
if (j[0] <= min_greater_lambda[0] or min_greater_lambda[0] == -1):
min_greater_lambda = j
else:
if (j[0] >= max_smaller_lambda[0] or max_smaller_lambda[0] == -1):
max_smaller_lambda = j
if j[0] in lambdaDict:
lambdaDict[j[0]][0].append(j[1])
lambdaDict[j[0]][1].append(j[2])
else:
lambdaDict[j[0]] = ([j[1]],[j[2]])
ordered_lambda = OrderedDict(sorted(lambdaDict.items()))
throughputList = []
latencyList = []
for l in ordered_lambda.values():
throughputList.append(self.median(l[0]))
latencyList.append(self.median(l[1]))
if (min_greater_lambda[0] == -1) or (max_smaller_lambda[0] == -1):
# print state
# print cur_lambda
# print str(memory[state]['arrayMeas'])
# print "ordered set -> " + str(ordered_lambda)
fc_thr = UnivariateSpline(ordered_lambda.keys(),throughputList, k=1)
thr = fc_thr(cur_lambda)
fc_lat = UnivariateSpline(ordered_lambda.keys(),latencyList, k=1)
lat = fc_lat(cur_lambda)
else:
thr = np.interp(cur_lambda,ordered_lambda.keys(),throughputList)
lat = np.interp(cur_lambda,ordered_lambda.keys(),latencyList)
return (thr, lat)
# def thr(self, vm, inlambda):
# if(3000*vm<inlambda) :
# return (3000.0*vm - 0.03*inlambda + self.myR(-0.05*(3000.0*vm - 0.03*inlambda),0.0));
# else:
# return inlambda+self.myR(-0.1*inlambda, 0.0);
# def lat(self, vm, inlambda):
# if(8000*vm<inlambda):
# return (4*vm)+0.00001*(inlambda-8000.0*vm)*(inlambda-8000.0*vm)+self.myR(0.0, 0.000001*(inlambda-8000.0*vm)*(inlambda-8000.0*vm));
# else:
# return (0.0005*inlambda + self.myR(0.0, 10.0));
# def myR(self, apo,ews):
# return ((ews-apo)*random.uniform(0,1))+apo;
def read_properties(self, property_file="Coordinator.properties"):
""" process properties file """
## Reads the configuration properties
cfg = ConfigParser()
cfg.read(property_file)
self.install_dir = cfg.get("config", "install_dir")
self.logs_folder_name = 'logs'
#self.euca_rc_dir = cfg.get("config", "euca_rc_dir")
self.bucket_name = cfg.get("config", "bucket_name")
self.master_bucket_name = cfg.get("config", "master_bucket_name")
self.instance_type = cfg.get("config", "instance_type")
self.master_instance_type = cfg.get("config", "master_instance_type")
self.cluster_name = cfg.get("config", "cluster_name")
self.hostname_template = cfg.get("config", "hostname_template")
self.cluster_type = cfg.get("config", "cluster_type")
self.db_file = cfg.get("config", "db_file")
self.cloud_api_type = cfg.get("config", "cloud_api_type")
self.rabbit_maxmem = int(cfg.get("config", "rabbit_maxmem"))
self.panther_maxmem = int(cfg.get("config", "panther_maxmem"))
self.deer_maxmem = int(cfg.get("config", "deer_maxmem"))
self.anaconda_maxmem = int(cfg.get("config", "anaconda_maxmem"))
self.initial_cluster_size = int(cfg.get("config", "initial_cluster_size"))
self.ownerTag = cfg.get("config", "ownerTag")
self.reconfigure = eval(cfg.get("config", "reconfigure"))
if(self.cloud_api_type == 'ganeti'):
self.disk = self.instance_type.split('_')[0]
self.cpu = self.instance_type.split('_')[1]
self.ram = self.instance_type.split('_')[2]
self.m_disk = self.master_instance_type.split('_')[0]
self.m_cpu = self.master_instance_type.split('_')[1]
self.m_ram = self.master_instance_type.split('_')[2]
self.logging_level = logging.DEBUG if cfg.get("config","logging_level") == "debug" else logging.INFO;