#import paramiko
import re, random, math, time
#import boto
#import boto.ec2
#from euca2ools import Euca2ool, InstanceValidationError, ConnectionFailed, FileValidationError
from pysqlite2 import dbapi2 as sqlite
#import sys, os
import os, logging, errno
from ConfigParser import ConfigParser
from Client import Client
#from scipy.interpolate import InterpolatedUnivariateSpline
from scipy.interpolate import UnivariateSpline
from collections import OrderedDict
from Instance import Instance
import numpy as np
from datetime import datetime
class Utils(object):
'''
This class holds utility functions.
'''
def __init__(self,properties_file):
self.read_properties(properties_file)
self.mac_ip_mapping = {
"vm50.delab.csd.auth.gr":("a6:89:d4:1b:6d:21","192.168.1.50"),
"vm51.delab.csd.auth.gr":("36:86:86:3c:01:6d","192.168.1.51"),
"vm52.delab.csd.auth.gr":("44:66:11:45:bf:aa","192.168.1.52"),
"vm53.delab.csd.auth.gr":("f0:37:d7:e2:1f:d1","192.168.1.53"),
"vm54.delab.csd.auth.gr":("00:b8:a2:a0:af:70","192.168.1.54"),
"vm55.delab.csd.auth.gr":("9c:66:ab:89:32:cc","192.168.1.55"),
"vm56.delab.csd.auth.gr":("98:43:bd:58:34:b1","192.168.1.56"),
"vm57.delab.csd.auth.gr":("1c:45:09:a7:f4:e9","192.168.1.57"),
"vm58.delab.csd.auth.gr":("d6:40:d1:6a:e8:b8","192.168.1.58"),
"vm59.delab.csd.auth.gr":("e8:cb:ef:24:06:93","192.168.1.59"),
"vm60.delab.csd.auth.gr":("72:46:86:76:ed:bf","192.168.1.60"),
"vm61.delab.csd.auth.gr":("e4:44:2d:a2:c4:0f","192.168.1.61"),
"vm62.delab.csd.auth.gr":("34:5e:9e:ba:a7:cf","192.168.1.62"),
"vm63.delab.csd.auth.gr":("00:e5:78:bf:e5:b8","192.168.1.63"),
"vm64.delab.csd.auth.gr":("1e:72:d2:c8:f3:a7","192.168.1.64"),
"vm65.delab.csd.auth.gr":("4a:f0:da:97:65:ef","192.168.1.65"),
"vm66.delab.csd.auth.gr":("0c:a0:a8:54:f4:5a","192.168.1.66"),
"vm67.delab.csd.auth.gr":("f4:61:40:9a:11:8c","192.168.1.67"),
"vm68.delab.csd.auth.gr":("e2:c0:b4:8d:96:f8","192.168.1.68"),
"vm69.delab.csd.auth.gr":("de:f1:08:d6:62:62","192.168.1.69"),
"vm70.delab.csd.auth.gr":("86:50:cf:f8:10:ff","192.168.1.70"),
"vm71.delab.csd.auth.gr":("e2:15:66:9a:6a:65","192.168.1.71"),
"vm72.delab.csd.auth.gr":("18:56:1c:f7:5f:05","192.168.1.72"),
"vm73.delab.csd.auth.gr":("a6:53:94:09:7c:84","192.168.1.73"),
"vm74.delab.csd.auth.gr":("f6:d3:3d:53:ca:04","192.168.1.74"),
"vm75.delab.csd.auth.gr":("e8:c2:49:ac:52:4c","192.168.1.75"),
"vm76.delab.csd.auth.gr":("38:ac:45:43:20:22","192.168.1.76"),
"vm77.delab.csd.auth.gr":("86:48:d8:7a:16:c4","192.168.1.77"),
"vm78.delab.csd.auth.gr":("1a:00:1b:91:6c:bf","192.168.1.78"),
"vm79.delab.csd.auth.gr":("20:cf:31:a7:bf:c5","192.168.1.79"),
"vm80.delab.csd.auth.gr":("36:9b:a9:49:76:05","192.168.1.80"),
"vm81.delab.csd.auth.gr":("C4:C2:A5:02:AD:E3","192.168.1.81"),
"vm82.delab.csd.auth.gr":("34:BF:73:B3:9D:EF","192.168.1.82"),
"vm83.delab.csd.auth.gr":("18:69:D3:06:C5:78","192.168.1.83"),
"vm84.delab.csd.auth.gr":("9A:4E:1E:EA:29:68","192.168.1.84"),
"vm85.delab.csd.auth.gr":("F6:EE:BC:3B:AB:AF","192.168.1.85"),
"vm86.delab.csd.auth.gr":("FE:1E:32:02:0F:56","192.168.1.86"),
"vm87.delab.csd.auth.gr":("6E:98:16:C7:8A:D3","192.168.1.87"),
"vm88.delab.csd.auth.gr":("3E:52:A7:B1:9B:7A","192.168.1.88"),
"vm89.delab.csd.auth.gr":("4C:04:95:BA:EC:82","192.168.1.89"),
"vm90.delab.csd.auth.gr":("E4:46:51:70:E1:BA","192.168.1.90"),
"vm91.delab.csd.auth.gr":("A2:66:B2:37:E0:6F","192.168.1.91"),
"vm92.delab.csd.auth.gr":("02:A9:83:64:4D:50","192.168.1.92"),
"vm93.delab.csd.auth.gr":("78:18:73:B0:44:71","192.168.1.93"),
"vm94.delab.csd.auth.gr":("08:D2:28:67:AD:8D","192.168.1.94"),
"vm95.delab.csd.auth.gr":("C0:44:25:C0:E5:C6","192.168.1.95"),
"vm96.delab.csd.auth.gr":("D2:89:2E:89:44:96","192.168.1.96"),
"vm97.delab.csd.auth.gr":("AA:7E:4A:3A:32:68","192.168.1.97"),
"vm98.delab.csd.auth.gr":("3E:B8:A0:96:BF:30","192.168.1.98"),
"vm99.delab.csd.auth.gr":("20:DA:56:86:29:13","192.168.1.99")
}
#m3.large : 0.1540->0.019,
self.onDemandPrices = {"t2.nano" : 0.0085,
"t2.micro" : 0.0170,
"t2.small" : 0.0340,
"t2.medium" : 0.0680,
"t2.large" : 0.1360,
"m4.large" : 0.1400,
"m4.xlarge" : 0.2790,
"m4.2xlarge" : 0.5590,
"m4.4xlarge" : 1.1170,
"m4.10xlarge" : 2.7930,
"m3.medium" : 0.0770,
"m3.large" : 0.1540,
"m3.xlarge" : 0.3080,
"m3.2xlarge" : 0.6160,
"c4.large" : 0.1310,
"c4.xlarge" : 0.2620,
"c4.2xlarge" : 0.5240,
"c4.4xlarge" : 1.0490,
"c4.8xlarge" : 2.0980,
"c3.large" : 0.1200,
"c3.xlarge" : 0.2390,
"c3.2xlarge" : 0.4780,
"c3.4xlarge" : 0.9560,
"c3.8xlarge" : 1.9120,
"g2.2xlarge" : 0.7020,
"g2.8xlarge" : 2.8080,
"r3.large" : 0.1850,
"r3.xlarge" : 0.3710,
"r3.2xlarge" : 0.7410,
"r3.4xlarge" : 1.4820,
"r3.8xlarge" : 2.9640,
"i2.xlarge" : 0.9380,
"i2.2xlarge" : 1.8760,
"i2.4xlarge" : 3.7510,
"i2.8xlarge" : 7.5020
}
self.free_ips = self.mac_ip_mapping.keys()
def getFree_dns_names(self, count, occupied=[]):
names = []
for ip in occupied:
if ip in self.free_ips:
self.free_ips.remove(ip)
if (count <= len(self.free_ips)):
for i in range(0,count):
names.append(self.free_ips.pop())
else:
print "no available dns_names"
return names
def return_instance_from_tuple(self, tuple):
instance = Instance(tuple[0], tuple[1], tuple[2], tuple[3], tuple[4], tuple[5], tuple[6], tuple[7], tuple[8], tuple[9], tuple[10],tuple[11]);
return instance
def query_instance_db(self, pattern):
""" A helpful search for the sqlite db - returns instances"""
search_field = None
search_field1 = None
if re.match('i-', pattern):
## looking by instance id
search_field = "id"
else:
if re.match('emi-', pattern):
## looking by image id
search_field = "image_id"
else:
if pattern.find(".") != -1:
## looking by ip
search_field = "public_dns_name"
search_field1 = "private_dns_name"
else:
## looking by state
search_field = "state"
instances = []
## read from the local database
con = sqlite.connect(self.db_file)
cur = con.cursor()
instancesfromDB = []
if search_field1 :
try:
instancesfromDB = cur.execute('select * from instances where ' + search_field +"=\"" +
pattern + "\" OR " + search_field1 + "=\"" + pattern + "\""
).fetchall()
except sqlite.DatabaseError:
con.rollback()
else:
try:
instancesfromDB = cur.execute('select * from instances where ' + search_field +"=\"" +
pattern + "\"").fetchall()
except sqlite.DatabaseError:
con.rollback()
cur.close()
con.close()
for instancefromDB in instancesfromDB:
instances.append(self.return_instance_from_tuple(instancefromDB))
return instances
def query_instance_db_using_id(self, pattern):
print "pattern: "+ str(pattern)
""" A helpful search for the sqlite db - returns instances"""
search_field = "ip_address"
instances = []
## read from the local database
con = sqlite.connect(self.db_file)
cur = con.cursor()
instancesfromDB = []
try:
instancesfromDB = cur.execute('select * from instances where ' + search_field +"=\"" +
pattern + "\"").fetchall()
except sqlite.DatabaseError:
con.rollback()
cur.close()
con.close()
for instancefromDB in instancesfromDB:
instances.append(self.return_instance_from_tuple(instancefromDB))
return instances
def refresh_instance_db(self, instances):
## Update instance DB with provided instances (removes all previous entries!)
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
cur.execute('delete from instances'
)
con.commit()
except sqlite.DatabaseError:
print "ERROR in truncate"
for instance in instances:
try:
cur.execute(""" insert into instances(id, image_id, dns_name, ip_address, mac_address, vpn,
state, instance_type, launch_time, placement, username, password)
values (?,?,?,?,?,?,?,?,?,?,?,?)""",
(instance.id, instance.image_id, instance.dns_name, instance.ip_address, instance.mac_address, instance.vpn, instance.state, instance.instance_type, instance.launch_time, instance.placement,instance.username,instance.password)
)
con.commit()
except sqlite.DatabaseError, e:
print e.message
print "ERROR in insert"
cur.close()
con.close()
def add_to_instance_db(self, instances):
## Update instance DB with provided instances (keeps previous entries!)
con = sqlite.connect(self.db_file)
cur = con.cursor()
for instance in instances:
try:
cur.execute(""" insert into instances(id, image_id, public_dns_name, private_dns_name,private_ip_address_2,state,
key_name, ami_launch_index, product_codes,instance_type,
launch_time, placement, kernel, ramdisk )
values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(instance.id, instance.image_id,instance.public_dns_name,instance.private_dns_name,instance.private_ip_address_2, instance.state,
instance.key_name,instance.ami_launch_index,instance.product_codes,instance.instance_type,
instance.launch_time,instance.placement,instance.kernel,instance.ramdisk)
)
con.commit()
except sqlite.DatabaseError, e:
print e.message
print "ERROR in insert"
cur.close()
con.close()
def get_remove_candidate_instances(self, now, offset):
instances = []
## read from the local database
con = sqlite.connect(self.db_file)
cur = con.cursor()
instancesfromDB = []
remove_candidates = []
try:
instancesfromDB = cur.execute('select id,launch_time from instances inner join clusters on instances.ip_address == clusters.ip_address where hostname != "'+self.hostname_template+'master"').fetchall()
for vm_time_tuple in instancesfromDB:
#now = datetime.now()
then = datetime.fromtimestamp(float(vm_time_tuple[1]))
tdelta = now - then
seconds = tdelta.total_seconds()
if(seconds % 3600 >= (3600 - offset)):
remove_candidates.append(vm_time_tuple[0]);
except sqlite.DatabaseError:
con.rollback()
cur.close()
con.close()
list.sort(remove_candidates)
return remove_candidates;
def get_remove_candidate_instances_vm_type(self,vm_type,cnt):
instances = []
con = sqlite.connect(self.db_file)
cur = con.cursor()
instancesfromDB = []
remove_candidates = []
try:
instancesfromDB = cur.execute("select id from instances where vm_type='"+str(vm_type)+"' order by id desc limit "+str(cnt)).fetchall()
for vm_tuple in instancesfromDB:
remove_candidates.append(vm_tuple[0])
except sqlite.DatabaseError:
con.rollback()
cur.close()
con.close()
return remove_candidates
def get_remove_candidate_instances_sim(self, now, offset,vm_type):
instances = []
## read from the local database
con = sqlite.connect(self.db_file)
cur = con.cursor()
instancesfromDB = []
remove_candidates = []
try:
instancesfromDB = cur.execute("select id,launch_time from instances where vm_type='"+str(vm_type)+"'").fetchall()
for vm_time_tuple in instancesfromDB:
if(vm_time_tuple[0] == 0):
continue;
then = datetime.fromtimestamp(float(vm_time_tuple[1]))
tdelta = now - then
seconds = tdelta.total_seconds()
if(seconds % 3600 >= (3600 - offset)):
remove_candidates.append(vm_time_tuple[0]);
except sqlite.DatabaseError:
con.rollback()
cur.close()
con.close()
list.sort(remove_candidates)
return remove_candidates;
def get_the_cost(self, launch_time, termination_time, vm_type=""):
now = datetime.fromtimestamp(float(termination_time))
then = datetime.fromtimestamp(float(launch_time))
tdelta = now - then
running_time = tdelta.total_seconds()/60
hours = 0
if(running_time > 60):
hours = tdelta.total_seconds() // 3600
minutes = (tdelta.total_seconds() // 60 - hours * 60)
if(minutes > 0):
hours = hours + 1
else:
hours = 1
if(len(vm_type) > 0):
return hours*self.vm_types_hour_cost[vm_type]
else:
return hours*self.hour_cost
def get_running_time(self, launch_time, termination_time):
now = datetime.fromtimestamp(float(termination_time))
then = datetime.fromtimestamp(float(launch_time))
tdelta = now - then
running_time = self.pretty_time_delta(tdelta.total_seconds())
return running_time
def pretty_time_delta(self, seconds):
seconds = int(seconds)
days, seconds = divmod(seconds, 86400)
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
if days > 0:
return '%dd%dh%dm%ds' % (days, hours, minutes, seconds)
elif hours > 0:
return '%dh%dm%ds' % (hours, minutes, seconds)
elif minutes > 0:
return '%dm%ds' % (minutes, seconds)
else:
return '%ds' % (seconds,)
def get_the_cost_of_all(self, termination_time):
cost = 0
con = sqlite.connect(self.db_file)
cur = con.cursor()
launch_time_list = cur.execute('select launch_time,vm_type from instances').fetchall()
cur.close()
con.close()
for launch_time in launch_time_list:
cost = cost + self.get_the_cost(launch_time[0],time.mktime(termination_time.timetuple()),launch_time[1])
return cost
def get_total_num_vms(self):
con = sqlite.connect(self.db_file)
cur = con.cursor()
vms_list = cur.execute('select id from instances').fetchall()
cur.close()
con.close()
return len(vms_list)
def get_launch_time(self, vmId):
con = sqlite.connect(self.db_file)
cur = con.cursor()
launch_time = cur.execute('select launch_time from instances where id=\"'+str(vmId)+"\"").fetchone()[0]
cur.close()
con.close()
return launch_time
def get_the_hostname(self, vmId):
con = sqlite.connect(self.db_file)
cur = con.cursor()
hostname = cur.execute('select hostname from clusters inner join instances where clusters.ip_address=instances.ip_address and instances.id=\"'+str(vmId)+"\"").fetchone()[0]
cur.close()
con.close()
return hostname
def delete_instances_from_DB(self, remove_node):
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
cur.execute('delete from clusters where clusters.ip_address=(select ip_address from instances where id=\"'+str(remove_node)+"\")")
con.commit()
cur.execute('delete from instances where id='+str(remove_node))
con.commit()
except sqlite.DatabaseError:
print "ERROR in delete"
cur.close()
con.close()
########################################################
## Cluster DB functions
########################################################
def delete_cluster_from_db(self, clusterid = "default"):
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
cur.execute('delete from clusters where cluster_id=\"'+clusterid+"\"")
con.commit()
except sqlite.DatabaseError:
print "ERROR in truncate"
cur.close()
con.close()
def refresh_cluster_db(self, cluster=None):
## Update cluster DB with provided cluster (removes all previous entries!)
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
cur.execute('delete from clusters'
)
con.commit()
except sqlite.DatabaseError:
print "ERROR in truncate"
for (clusterkey,clustervalue) in cluster.items():
try:
cur.execute(""" insert into clusters(cluster_id, hostname, ip_address)
values (?,?,?)""",
(clusterkey, clustervalue.id, clustervalue.ip_address)
)
con.commit()
clusters = cur.execute('select * from clusters',).fetchall()
con.commit()
print "clusters refresh" + str(clusters);
except sqlite.DatabaseError, e:
print e.message
print "ERROR in insert"
cur.close()
con.close()
def get_cluster_from_db(self, cluster_id=None):
if not cluster_id:
print "Got to provide cluster id!!!"
else:
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
clusterfromDB = cur.execute('select * from clusters where cluster_id = \"'+cluster_id+"\""
).fetchall()
con.commit()
except sqlite.DatabaseError:
print "ERROR in select"
return None
if len(clusterfromDB) < 1:
print "Have not found the requested cluster - exiting."
else:
## build a cluster object
cluster = {}
for clusternode in clusterfromDB:
print (clusternode[2])
## query db to get the corresponding instance
instance = self.query_instance_db_using_id(clusternode[2])
## Populate cluster if instance in db
if instance:
cluster[clusternode[1]] = instance[0]
# print cluster
# print "Instance:", instance
# sys.stdout.flush()
return cluster
return None
def get_cluster(self, cluster_id=None):
if not cluster_id:
print "Got to provide cluster id!!!"
else:
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
clusterfromDB = cur.execute('select * from clusters where cluster_id = \"'+cluster_id+"\""
).fetchall()
con.commit()
except sqlite.DatabaseError:
print "ERROR in select"
return None
if len(clusterfromDB) < 1:
print "Have not found the requested cluster - exiting."
else:
return clusterfromDB
return None
def get_instance_from_cluster_db(self, cluster_id=None, hostname=None):
if (not cluster_id) or (not hostname):
print "Got to provide cluster id and hostname!!!"
else:
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
instanceFromClusterDB = cur.execute('select * from clusters where cluster_id = \"'+cluster_id+'\" and hostname = \"'+hostname+'\"'
).fetchall()
con.commit()
except sqlite.DatabaseError:
print "ERROR in select"
return None
if len(instanceFromClusterDB) < 1:
print "Have not found the requested instance fromcluster - exiting."
else:
return instanceFromClusterDB[0]
return None
def get_num_of_running_instances(self, cluster_id=None):
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
instanceFromClusterDB = cur.execute('select * from clusters where cluster_id = \"'+cluster_id+'\"').fetchall()
con.commit()
return len(instanceFromClusterDB)
except sqlite.DatabaseError:
print "ERROR in select"
return 0
def add_to_cluster_db(self, cluster=None, cluster_id=None):
## Add cluster to DB (check for existing records with the same id and remove)
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
cur.execute('delete from clusters where cluster_id = \"'+cluster_id+"\""
)
con.commit()
except sqlite.DatabaseError:
print "No previous entries"
for (clusterkey,clustervalue) in cluster.items():
try:
cur.execute(""" insert into clusters(cluster_id, hostname, ip_address)
values (?,?,?)""",
(cluster_id, clusterkey, clustervalue.ip_address)
)
con.commit()
except sqlite.DatabaseError, e:
print e.message
print "ERROR in insert"
cur.close()
con.close()
def rem_from_cluster_db(self, cluster_id=None, hostname=None):
## Add cluster to DB (check for existing records with the same id and remove)
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
cur.execute('delete from clusters where cluster_id = \"'+cluster_id+"\" and hostname = \""+hostname+"\""
)
con.commit()
except sqlite.DatabaseError:
print "Error in delete"
cur.close()
con.close()
def get_hostname(self, cluster_id=None, ip_address=None):
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
hostnames = cur.execute('select hostname from clusters where cluster_id = \"'+cluster_id+"\" and ip_address = \""+ip_address+"\""
).fetchall()
con.commit()
return hostnames[0]
except sqlite.DatabaseError:
print "Error in delete"
return []
cur.close()
con.close()
########################################################
## Client DB functions
########################################################
def add_to_clients_db(self, clients=None):
## Add client to DB (check for existing records with the same id and remove)
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
cur.execute('delete from clients'
)
con.commit()
except sqlite.DatabaseError:
print "ERROR in truncate"
for i in range(0,len(clients)):
#try:
#cur.execute('delete from clients where id = \"'+str(clients[i].id)+"\""
# )
#con.commit()
#except sqlite.DatabaseError:
#print "No previous entries for"+str(clients[i].id)
try:
cur.execute(""" insert into clients(id, ip_address, username, password)
values (?,?,?,?)""",
(clients[i].id, clients[i].ip_address, clients[i].username, clients[i].password)
)
con.commit()
except sqlite.DatabaseError, e:
print e.message
print "ERROR in client insert"
cur.close()
con.close()
def getClients(self):
con = sqlite.connect(self.db_file)
cur = con.cursor()
try:
clients_list = []
clients = cur.execute('select * from clients'
).fetchall()
for client in clients:
clients_list.append(Client(client[0], client[1], client[2], client[3]))
print "Found client records:\n", clients
return clients_list
except sqlite.DatabaseError:
cur.execute('create table clients(id text, ip_address text, username text, password text)')
con.commit()
return []
def mkdir_p(self,path):
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else: raise
def median(self, mylist):
sorts = sorted(mylist)
length = len(sorts)
if not length % 2:
return (sorts[length / 2] + sorts[length / 2 - 1]) / 2.0
return sorts[length / 2]
def interpolate_metrics(self, memory, state, cur_lambda):
min_lambda_state = np.min(memory[state]['arrayMeas'],axis=0)
if(cur_lambda <= 0 or cur_lambda < min_lambda_state[0]):
cur_lambda = min_lambda_state[0]
state = str(state)
lambdaDict = {}
throughputList = []
latencyList = []
#the maximum from the smaller lambdas
max_smaller_lambda = [-1,0,0,0]
#the minimum from the greater lambdas
min_greater_lambda = [-1,0,0,0]
for j in memory[state]['arrayMeas']:
if cur_lambda <= j[0]:
if (j[0] <= min_greater_lambda[0] or min_greater_lambda[0] == -1):
min_greater_lambda = j
else:
if (j[0] >= max_smaller_lambda[0] or max_smaller_lambda[0] == -1):
max_smaller_lambda = j
if j[0] in lambdaDict:
lambdaDict[j[0]][0].append(j[1])
lambdaDict[j[0]][1].append(j[2])
else:
lambdaDict[j[0]] = ([j[1]],[j[2]])
ordered_lambda = OrderedDict(sorted(lambdaDict.items()))
throughputList = []
latencyList = []
for l in ordered_lambda.values():
throughputList.append(self.median(l[0]))
latencyList.append(self.median(l[1]))
if (min_greater_lambda[0] == -1) or (max_smaller_lambda[0] == -1):
# print state
# print cur_lambda
# print str(memory[state]['arrayMeas'])
# print "ordered set -> " + str(ordered_lambda)
#print ordered_lambda.keys()
#print throughputList
#print cur_lambda
fc_thr = UnivariateSpline(sorted(ordered_lambda.keys()),throughputList, k=1)
thr = fc_thr(cur_lambda)
fc_lat = UnivariateSpline(sorted(ordered_lambda.keys()),latencyList, k=1)
lat = fc_lat(cur_lambda)
else:
#print ordered_lambda.keys()
#print throughputList
#print cur_lambda
thr = np.interp(cur_lambda,ordered_lambda.keys(),throughputList)
lat = np.interp(cur_lambda,ordered_lambda.keys(),latencyList)
if(lat < 0):
print "NEGATIVE LAT: " + str(lat) + " for lambda: " + str(cur_lambda) + " and state: " + str(state)
return (thr, lat)
# def thr(self, vm, inlambda):
# if(3000*vm<inlambda) :
# return (3000.0*vm - 0.03*inlambda + self.myR(-0.05*(3000.0*vm - 0.03*inlambda),0.0));
# else:
# return inlambda+self.myR(-0.1*inlambda, 0.0);
# def lat(self, vm, inlambda):
# if(8000*vm<inlambda):
# return (4*vm)+0.00001*(inlambda-8000.0*vm)*(inlambda-8000.0*vm)+self.myR(0.0, 0.000001*(inlambda-8000.0*vm)*(inlambda-8000.0*vm));
# else:
# return (0.0005*inlambda + self.myR(0.0, 10.0));
# def myR(self, apo,ews):
# return ((ews-apo)*random.uniform(0,1))+apo;
def read_properties(self, property_file="Coordinator.properties"):
""" process properties file """
## Reads the configuration properties
cfg = ConfigParser()
cfg.read(property_file)
self.install_dir = cfg.get("config", "install_dir")
#self.euca_rc_dir = cfg.get("config", "euca_rc_dir")
self.bucket_name = cfg.get("config", "bucket_name")
self.clients_bucket_name = cfg.get("config", "clients_bucket_name")
self.instance_type = cfg.get("config", "instance_type")
self.clients_instance_type = cfg.get("config", "clients_instance_type")
self.clients_num = 4
self.cluster_name = cfg.get("config", "cluster_name")
self.hostname_template = cfg.get("config", "hostname_template")
self.reconfigure = cfg.get("config", "reconfigure")
self.cluster_type = cfg.get("config", "cluster_type")
self.db_file = cfg.get("config", "db_file")
self.cloud_api_type = cfg.get("config", "cloud_api_type")
if(self.cloud_api_type == 'ganeti'):
self.disk = self.instance_type.split('_')[0]
self.cpu = self.instance_type.split('_')[1]
self.ram = self.instance_type.split('_')[2]
self.client_disk = self.clients_instance_type.split('_')[0]
self.client_cpu = self.clients_instance_type.split('_')[1]
self.client_ram = self.clients_instance_type.split('_')[2]
self.trans_cost = cfg.get("config", "trans_cost")
self.gain = cfg.get("config", "gain")
self.serv_throughput = cfg.get("config", "serv_throughput")
self.logging_level = logging.DEBUG if cfg.get("config","logging_level") == "debug" else logging.INFO;
self.training_set_name = cfg.get("config", "training_set_name");
self.training_set_length = cfg.get("config", "training_set_length");
self.reward_function_name = cfg.get("config", "reward_function_name");
self.logs_folder_name = cfg.get("config", "logs_folder_name")
self.ownerTag = "naskos"
#okeanos
self.network_id = cfg.get("config", "network_id")
self.network_id_2 = cfg.get("config", "network_id_2")
self.okeanos_token = cfg.get("config","okeanos_token")
self.okeanos_compute_public_url = cfg.get("config","okeanos_compute_public_url")
#prism
self.prismFilePath = cfg.get("config", "prism_file_path");
self.prismSMGFilePath = cfg.get("config", "prism_multi_file_path");
self.prism_output_folder = self.install_dir+"/"+cfg.get("config", "prism_output_folder")+"/";
self.adversariesPath = self.prism_output_folder+"adv.tra"
self.statesFilePath = self.prism_output_folder+"states"
self.transitionFilePath = self.prism_output_folder+"transition_matrix"
self.modelFilePath = self.prism_output_folder+"model.prism";
self.modelMDPFilePath = self.prism_output_folder+"modelMDP.prism";
self.propertiesFilePath = self.prism_output_folder+"modelProp.csl";
self.propertiesProbabilisticFilePath = self.prism_output_folder+"modelProbabilisticProp.csl";
self.propertiesSMGFilePath = self.prism_output_folder+"modelSMGProp.csl";
self.resultsFilePath = self.prism_output_folder+"modelResults.txt";
self.lp_output_file = self.prism_output_folder+"games.lp";
self.lp_result_file = self.prism_output_folder+"gurobi.sol";
self.lp_solver = '/opt/gurobi562/linux64/bin/gurobi_cl';
#model
self.max_steps = cfg.get("config", "max_steps");
self.cumulative = eval(cfg.get("config", "cumulative"));
#simulation
self.simulation = eval(cfg.get("config", "simulation"));
self.meas_count = cfg.get("config", "sim_meas_count");
self.iteration = cfg.get("config", "iteration");
self.new_inlambda = cfg.get("config", "sim_new_inlambda");
self.load_meas_log = cfg.get("config", "load_meas_log");
self.inlambda_file_path = cfg.get("config", "sim_inlambda_file_path");
#elasticity
self.initial_cluster_size = int(cfg.get("config", "initial_cluster_size"));
self.min_cluster_size = int(cfg.get("config", "min_cluster_size"));
self.max_cluster_size = int(cfg.get("config", "max_cluster_size"));
self.add_nodes = int(cfg.get("config", "add_nodes"));
self.rem_nodes = int(cfg.get("config", "rem_nodes"));
self.num_of_clusters = int(cfg.get("config", "num_of_clusters"));
self.gain_punishment = cfg.get("config", "gain_punishment");
self.latency_threshold = cfg.get("config", "latency_threshold");
self.accepted_percentage = float(cfg.get("config", "accepted_percentage"));
self.window_inlambda = cfg.get("config", "window_inlambda");
self.ewma_inlambda = cfg.get("config", "ewma_inlambda");
self.violation_cluster = eval(cfg.get("config", "violation_cluster"));
self.weightedReward = eval(cfg.get("config", "weighted_reward"));
self.hourly = eval(cfg.get("config", "hourly"));
self.hour_cost = 0.1;
self.last_vm_secs = 600;
self.marketPriceDB = 'spot_prices.db'
self.bidPolicy = 'max'
self.spotInstanceType = 'c3.4xlarge'
self.startDate = '2016-06-05T09:00:07Z'
self.repViolationPunishement = 0
self.vm_types = ['_pr_cl','_pc_od']
self.vm_types_limits = ['_pr_cl','_pc_od']
self.spot = False
for vm_type in self.vm_types:
if("_sp" in vm_type):
self.spot = True
self.multiType = False
if(len(self.vm_types) > 0):
self.multiType = True
self.vm_types_attack_probs = {}
self.vm_types_attack_probs['_pr_cl'] = 0
self.vm_types_attack_probs['_pc_od'] = 0.01
#self.vm_types_attack_probs['_pc2_od'] = 0.1
# self.vm_types_attack_probs['_pc2_sp'] = 0.90
self.vm_types_hour_cost = {}
self.vm_types_hour_cost['_pr_cl'] = 0
self.vm_types_hour_cost['_pc_od'] = 0.9560
# self.vm_types_hour_cost['_pc2_od'] = 0.15
# self.vm_types_hour_cost['_pc2_sp'] = 0.10
self.vm_types_currentState = {}
self.vm_types_currentState['_pr_cl'] = 8
self.vm_types_currentState['_pc_od'] = 0
# self.vm_types_currentState['_pc2_od'] = 0
# self.vm_types_currentState['_pc2_sp'] = 0
self.vm_types_min = {}
self.vm_types_min['_pr_cl'] = 8
self.vm_types_min['_pc_od'] = 0
# self.vm_types_min['_pc2_od'] = 0
# self.vm_types_min['_pc2_sp'] = 0
self.vm_types_max = {}
self.vm_types_max['_pr_cl'] = 8
self.vm_types_max['_pc_od'] = 10
# self.vm_types_max['_pc2_od'] = 10
# self.vm_types_max['_pc2_sp'] = 10
self.prob_policy = 'economy'
#self.policy_case == "noprob"
self.realProbabilisticPolicy = False;
self.removeFirstPolicy = False;
self.removeFirstAllCasesPolicy = False;
self.policy_case = cfg.get("config", "policy_case");
if(self.policy_case == "probe"):
self.realProbabilisticPolicy = True;
self.prob_policy = 'economy'
elif(self.policy_case == "probq"):
self.realProbabilisticPolicy = True;
self.prob_policy = 'quality'
elif(self.policy_case == "probb"):
self.realProbabilisticPolicy = True;
self.prob_policy = 'balance'
elif(self.policy_case == "rf"):
self.removeFirstPolicy = True;
elif(self.policy_case == "raf"):
self.removeFirstAllCasesPolicy = True;
#attack
self.attack = eval(cfg.get("config", "attack"));
self.transient_counter_limit = int(cfg.get("config", "transient_counter_limit"));
#policy
self.policy = cfg.get("config", "policy");
self.mat = False
if (self.policy == "RL-TB"):
self.ignore_add_remove = False
self.mdp = False
self.rltb = True
self.re = False
self.multicluster = False
self.w_reward = False
self.greedy = False
elif (self.policy == "RE"):
self.ignore_add_remove = False
self.mdp = False
self.rltb = False
self.re = True
self.multicluster = False
self.w_reward = False
self.greedy = False
elif (self.policy == "MDP-TB"):
self.ignore_add_remove = False
self.mdp = True
self.multicluster = False
self.w_reward = False
self.greedy = False
elif (self.policy == "MDP-EB"):
self.ignore_add_remove = False
self.mdp = True
self.multicluster = True
self.w_reward = True
self.greedy = False
elif (self.policy == "MDP-EBG"):
self.ignore_add_remove = False
self.mdp = True
self.multicluster = True
self.w_reward = True
self.greedy = True
elif (self.policy == "MDP2"):
self.ignore_add_remove = False
self.mdp = True
self.multicluster = True
self.w_reward = False
self.greedy = False
self.only_attack = False
elif (self.policy == "MDP2attack"):
self.ignore_add_remove = False
self.mdp = True
self.multicluster = True
self.w_reward = False
self.greedy = False
self.only_attack = True
elif (self.policy == "MDP3"):
self.ignore_add_remove = True
self.mdp = True
self.multicluster = True
self.w_reward = False
self.greedy = False
elif (self.policy == "MDP2-MAT"):
self.ignore_add_remove = False
self.mdp = True
self.multicluster = True
self.w_reward = False
self.greedy = False
self.mat = True
self.experiments_logs_name = "";
if self.simulation:
self.experiments_logs_name += 'sim_'
else:
self.experiments_logs_name += 'real_'
if (self.policy == "RL-TB"):
self.experiments_logs_name += 'rltb_'
elif (self.policy == "MDP-TB"):
self.experiments_logs_name += 'mdptb_'
elif (self.policy == "MDP-EB"):
self.experiments_logs_name += 'mdpeb_'
elif (self.policy == "MDP-EBG"):
self.experiments_logs_name += 'mdpebg_'
elif (self.policy == "MDP2"):
self.experiments_logs_name += 'mdp2_'
elif (self.policy == "MDP2attack"):
self.experiments_logs_name += 'mdp2attack_'
elif (self.policy == "MDP3"):
self.experiments_logs_name += 'mdp3_'
elif (self.policy == "RE"):
self.experiments_logs_name += 're_'
elif (self.policy == "MDP2-MAT"):
self.experiments_logs_name += 'mdp2mat_'
self.experiments_logs_name += self.training_set_name + '_'
self.experiments_logs_name += self.reward_function_name + '_'
if eval(self.window_inlambda):
self.experiments_logs_name += 'wt_'
elif eval(self.ewma_inlambda):
self.experiments_logs_name += 'ewmat_'
else:
self.experiments_logs_name += 'wf_'
self.experiments_logs_name += str(self.add_nodes) + 'a_'
self.experiments_logs_name += str(self.rem_nodes) + 'r_'
self.experiments_logs_name += str(self.initial_cluster_size) + 'init_'
self.experiments_logs_name += str(self.min_cluster_size) + 'min_'
self.experiments_logs_name += str(self.max_cluster_size) + 'max_'
self.experiments_logs_name += self.latency_threshold + 'l_'
self.experiments_logs_name += str(self.num_of_clusters) + 'cl_'
self.experiments_logs_name += self.meas_count + 'meas_'
self.experiments_logs_name += self.max_steps + 'steps_'
self.experiments_logs_name += str(self.cumulative) + 'Cum_'
self.experiments_logs_name += str(self.violation_cluster) + 'Viocl_'
self.experiments_logs_name += str(self.attack) + 'Attack_'
self.experiments_logs_name += str(self.transient_counter_limit) + 'trancnt_'
self.experiments_logs_name += str(self.policy_case)+'_'
self.experiments_logs_name += str(self.accepted_percentage)+'ap_'
self.experiments_logs_name += str(self.hourly)+'Hourly_'
self.experiments_logs_name += str(self.gain_punishment)+'pun_'
self.experiments_logs_name += 'exp_logs_'
#the folder that contains all the logs of one experiment
self.experiments_logs_folder_name = self.experiments_logs_name
self.experiments_logs_name += self.iteration + 'iter'
## Reads the monitoring thresholds
self.thresholds_add = {}
self.thresholds_remove = {}
for option in cfg.options("thresholds_add"):
self.thresholds_add[option] = cfg.get("thresholds_add", option)
for option in cfg.options("thresholds_remove"):
self.thresholds_remove[option] = cfg.get("thresholds_remove", option)