import random, itertools
import Utils, operator, os, datetime
import numpy as np
import pandas
import statsmodels.api as sm
import LoadGenerator_emu
from scipy.cluster.vq import vq, kmeans, kmeans2, whiten
from scipy.spatial.distance import pdist
from scipy.cluster.hierarchy import centroid, linkage, fcluster, fclusterdata
from scipy.stats import linregress
from collections import deque
from collections import defaultdict;
from PrismHandler import PrismHandler
from SpotHandler import SpotHandler
from collections import deque
from sklearn.linear_model import LinearRegression
from pysqlite2 import dbapi2 as sqlite
class Verification():
def __init__(self, properties_file, past_log=-1):
self.rcvallmetrics = {}
self.past_log = past_log
self.utils = Utils.Utils(properties_file)
self.training_set_length = int(self.utils.training_set_length)
self.load_generator = LoadGenerator_emu.LoadGenerator(int(self.utils.meas_count), self.utils.inlambda_file_path, self.utils.load_meas_log, eval(self.utils.new_inlambda),True)
self.spotHandler = SpotHandler(properties_file)
self.acted = ["done"]
cluster_size = self.utils.initial_cluster_size
self.currentState = str(cluster_size)
self.nextState = str(cluster_size)
self.vm_types_currentState = self.utils.vm_types_currentState
self.prediction = False
self.waitForIt = 0
# The policy for getting throughput and latency when computing the reward func.
# average, centroid
self.measurementsPolicy = 'centroid'
self.countdown = 0
self.n_current = 50
self.past_lambda = np.array([])
self.latencies = np.array([])
#window configuration
if eval(self.utils.window_inlambda):
self.latestMeasQueue = deque([])
self.windowSize = 100
elif eval(self.utils.ewma_inlambda):
self.lambdaQueue = deque()
self.latQueue = deque()
self.thrQueue = deque()
self.windowSize = 100
##Create model
if not os.path.exists(self.utils.prism_output_folder):
os.makedirs(self.utils.prism_output_folder);
self.mdp_handler = PrismHandler(properties_file)
# A dictionary that will remember rewards and metrics in states previously visited
self.memory = {}
for i in range(int(self.utils.min_cluster_size), int(self.utils.max_cluster_size)+1):
self.memory[str(i)] = {}
self.memory[str(i)]['V'] = None # placeholder for rewards and metrics
self.memory[str(i)]['r'] = None
self.memory[str(i)]['arrayMeas'] = None
# Load any previous statics.
self.measurementsFile = self.utils.install_dir+'/'+self.utils.logs_folder_name+'/measurements.txt'
self.sumMetrics = {}
meas = open(self.measurementsFile, 'a')
if os.stat(self.measurementsFile).st_size == 0:
# The file is empty, set the headers for each column.
meas.write('State\t\tLambda\t\tThroughput\t\tLatency\t\tCPU\t\tTime\n')
else :
# Read the measurements saved in the file.
meas.close()
meas = open(self.measurementsFile, 'r')
lines_cnt = 0
meas.next() # Skip the first line with the headers of the columns
for line in meas:
# Skip comments (used in training sets)
if not line.startswith('###'):
m = line.split('\t\t')
self.addMeasurement(m)
lines_cnt += 1
if(past_log != -1):
if(lines_cnt >= past_log+self.training_set_length):
self.rcvallmetrics['inlambda'] = float(m[1])
self.rcvallmetrics['latency'] = float(m[3])
self.rcvallmetrics['throughput'] = float(m[2])
for i in range(6, len(m)):
num = int(m[i].split('@')[0])
vm_type = m[i].split('@')[1].strip()
self.vm_types_currentState[vm_type] = num
self.currentState = m[0]
break
meas.close()
self.cost_before_remove = 0;
self.removed_vms_cnt = 0;
# param metrics: array The metrics to store. An array containing [state, lamdba, throughput, latency, cpu, time]
# param writeFile: boolean If set write the measurement in the txt file
def addMeasurement(self, metrics):
#store all the latency meas to compute mean and stdev for z-score normalization
self.latencies = np.append(self.latencies,[float(metrics[3])])
if self.measurementsPolicy.startswith('average'):
if not self.sumMetrics.has_key(metrics[0]):
# Save the metric with the state as key metrics = [state, inlambda, throughput, latency]
self.sumMetrics[metrics[0]] = {'inlambda': 0.0, 'throughput': 0.0, 'latency': 0.0, 'divide_by': 0}
self.sumMetrics[metrics[0]] = {'inlambda': self.sumMetrics[metrics[0]]['inlambda'] + float(metrics[1]),
'throughput': self.sumMetrics[metrics[0]]['throughput'] + float(metrics[2]),
'latency': self.sumMetrics[metrics[0]]['latency'] + float(metrics[3]),
'divide_by': self.sumMetrics[metrics[0]]['divide_by'] + 1}
# metrics-> 0: state, 1: lambda, 2: thoughtput, 3:latency, 4:cpu, 5:time
if not self.memory.has_key(metrics[0]):
self.memory[str(metrics[0])] = {}
self.memory[str(metrics[0])]['V'] = None # placeholder for rewards and metrics
self.memory[str(metrics[0])]['r'] = None
self.memory[str(metrics[0])]['arrayMeas'] = np.array([float(metrics[1]), float(metrics[2]),
float(metrics[3]), float(metrics[4])], ndmin=2)
elif self.memory[metrics[0]]['arrayMeas'] == None:
self.memory[metrics[0]]['arrayMeas'] = np.array([float(metrics[1]), float(metrics[2]),
float(metrics[3]), float(metrics[4])], ndmin=2)
else:
self.memory[metrics[0]]['arrayMeas'] = np.append(self.memory[metrics[0]]['arrayMeas'],
[[float(metrics[1]), float(metrics[2]),
float(metrics[3]), float(metrics[4])]], axis=0)
# param state: string Get the average metrics (throughput, latency) for this state.
# return a dictionary with the averages
def getAverages(self, state):
averages = {}
if self.sumMetrics.has_key(state):
averages['throughput'] = float(self.sumMetrics[state]['throughput']/self.sumMetrics[state]['divide_by'])
averages['latency'] = float(self.sumMetrics[state]['latency']/self.sumMetrics[state]['divide_by'])
return averages
def getCentroids(self):
centroids = {}
for i in range(int(self.utils.min_cluster_size), int(self.utils.max_cluster_size)+1):
if self.memory[str(i)]['arrayMeas'] != None:
if len(self.memory[str(i)]['arrayMeas']) > 1:
Y = self.memory[str(i)]['arrayMeas']
T= fclusterdata(self.memory[str(i)]['arrayMeas'], t=15.0, criterion='distance', metric='euclidean', method='single')
Z = centroid(Y)
else:
centroids[str(i)] = {}
centroids[str(i)]['throughput'] = self.memory[str(i)]['arrayMeas'][0][0]
centroids[str(i)]['latency'] = self.memory[str(i)]['arrayMeas'][0][1]
return centroids
def getNewCentroids(self,sliced_data,k,centroids,label):
inlambda_sum = [0] * k
inlambda_cnt = [0] * k
thr_sum = [0] * k
thr_cnt = [0] * k
cpu_sum = [0] * k
cpu_cnt = [0] * k
i = 0;
for l in label:
inlambda_sum[l] = inlambda_sum[l] + sliced_data[i][0]
inlambda_cnt[l] = inlambda_cnt[l] + 1
thr_sum[l] = thr_sum[l] + sliced_data[i][1]
thr_cnt[l] = thr_cnt[l] + 1
cpu_sum[l] = cpu_sum[l] + sliced_data[i][3]
cpu_cnt[l] = cpu_cnt[l] + 1
i = i+1
for i in range(0,k):
if inlambda_cnt[i] != 0 and len(centroids[:][i]) != 0:
if i == 0:
a = np.array([inlambda_sum[i]/inlambda_cnt[i],thr_sum[i]/thr_cnt[i],centroids[:][i][0],cpu_sum[i]/cpu_cnt[i]], ndmin=2)
else:
a = np.append(a, [[inlambda_sum[i]/inlambda_cnt[i],thr_sum[i]/thr_cnt[i],centroids[:][i][0],cpu_sum[i]/cpu_cnt[i]]], axis=0)
elif inlambda_cnt[i] == 0:
if i == 0:
a = np.array([0,0,centroids[:][i][0],0], ndmin=2)
else:
if i < len(centroids):
a = np.append(a, [[0,0,centroids[:][i][0],0]], axis=0)
return a
def doKmeansMultiCluster(self, state, from_inlambda, to_inlambda):
# Run kmeans for the measurements of this state and return the centroid point (throughput, latency)
ctd = defaultdict(dict);
label = []
label_viol = []
if self.memory[state]['arrayMeas'] != None :
sliced_data = None
sliced_data_violation = None
for j in self.memory[state]['arrayMeas']:
# If this measurement belongs in the slice we're insterested in
if j[0] >= from_inlambda and j[0] <= to_inlambda:
# add it
if self.utils.violation_cluster and j[2] >= float(self.utils.latency_threshold):
if sliced_data_violation == None:
sliced_data_violation = np.array(j, ndmin=2)
else:
sliced_data_violation = np.append(sliced_data_violation, [j], axis=0)
else:
if sliced_data == None:
sliced_data = np.array(j, ndmin=2)
else:
sliced_data = np.append(sliced_data, [j], axis=0)
k = int(self.utils.num_of_clusters)
# 1. No known lamdba values close to current lambda measurement
if (sliced_data != None):
if (len(sliced_data) < k):
# print "Not enough meas close to current lambda measurement. Producing extra artificial values!";
thr, lat = self.utils.interpolate_metrics(self.memory,state,from_inlambda)
#print state
#print from_inlambda
if(thr != thr):
thr = self.load_generator.thr(str(state),from_inlambda)
if(lat != lat):
lat = self.load_generator.lat(str(state),from_inlambda)
for i in range(len(sliced_data),k):
self.memory[state]['arrayMeas'] = np.append(self.memory[state]['arrayMeas'],[[float(from_inlambda), thr, lat, float(100)]], axis=0)
for j in self.memory[state]['arrayMeas']:
if j[0] >= from_inlambda and j[0] <= to_inlambda:
if self.utils.violation_cluster and j[2] >= self.utils.latency_threshold:
if sliced_data_violation == None:
sliced_data_violation = np.array(j, ndmin=2)
else:
sliced_data_violation = np.append(sliced_data_violation, [j], axis=0)
else:
if sliced_data == None:
sliced_data = np.array(j, ndmin=2)
else:
sliced_data = np.append(sliced_data, [j], axis=0)
if (sliced_data == None and sliced_data_violation == None):
# print "No known lamdba values close to current lambda measurement. Returning artificial values!";
thr, lat = self.utils.interpolate_metrics(self.memory,state,from_inlambda)
#print state
#print from_inlambda
if(thr != thr):
thr = self.load_generator.thr(str(state),from_inlambda)
if(lat != lat):
lat = self.load_generator.lat(str(state),from_inlambda)
for i in range(0,k):
self.memory[state]['arrayMeas'] = np.append(self.memory[state]['arrayMeas'],[[float(from_inlambda), thr, lat, float(100)]], axis=0)
for j in self.memory[state]['arrayMeas']:
if j[0] >= from_inlambda and j[0] <= to_inlambda:
if self.utils.violation_cluster and j[2] >= self.utils.latency_threshold:
if sliced_data_violation == None:
sliced_data_violation = np.array(j, ndmin=2)
else:
sliced_data_violation = np.append(sliced_data_violation, [j], axis=0)
else:
if sliced_data == None:
sliced_data = np.array(j, ndmin=2)
else:
sliced_data = np.append(sliced_data, [j], axis=0)
centroids_viol = []
if sliced_data_violation != None and len(sliced_data_violation) > 0:
# print "DOKMEANS length of sliced_data_violation to be fed to kmeans: "+ str(len(sliced_data_violation));
centroids_viol, label_viol = kmeans2(sliced_data_violation, 1, minit='points')
if sliced_data != None:
try:
centroids, label = kmeans2(np.array(sliced_data[:,2],ndmin=2).reshape((-1,1)), k-1, minit='points')
centroids = self.getNewCentroids(sliced_data,k-1,centroids, label)
except Exception, e:
print str(e)
print str(sliced_data)
else:
# centroids, label = kmeans2(sliced_data, k, minit='points')
if(len(sliced_data) < k):
# print "sliced_data size smaller than k: " + str(len(sliced_data))
centroids, label = kmeans2(np.array(sliced_data[:,2],ndmin=2).reshape((-1,1)), len(sliced_data), minit='points')
# for i in range(0,k-len(sliced_data)):
# centroids = np.append(centroids,[[0]],axis=0)
# label = np.append(label,[0],axis=0)
else:
centroids, label = kmeans2(np.array(sliced_data[:,2],ndmin=2).reshape((-1,1)), k, minit='points')
centroids = self.getNewCentroids(sliced_data,k,centroids, label)
num_of_meas = {}
#num_of_meas = {'0': 0, '1': 0, '2': 0, '3': 0, '4': 0}
for j in range(0, k):
num_of_meas[str(j)] = 0
meas_sum = 0;
if len(label) > 0:
for i in label:
num_of_meas[str(i)] += 1
meas_sum += 1;
num_of_meas_viol = {}
#num_of_meas = {'0': 0, '1': 0, '2': 0, '3': 0, '4': 0}
for j in range(0, 1):
num_of_meas_viol[str(j)] = 0
if len(label_viol) > 0:
for i in label_viol:
num_of_meas_viol[str(i)] += 1
meas_sum += 1;
if not self.utils.multicluster:
max_meas_cluster = max(num_of_meas.iteritems(), key=operator.itemgetter(1))[0]
if len(label) > 0:
for j in label:
ctd[int(j)]['cluster_weight'] = 1.0 if int(j) == int(max_meas_cluster) else 0;
ctd[int(j)]['inlambda'] = centroids[int(j)][0];
ctd[int(j)]['throughput'] = centroids[int(j)][1];
ctd[int(j)]['latency'] = centroids[int(j)][2];
ctd[int(j)]['cpu'] = centroids[int(j)][3];
else:
for j in range(0, k):
ctd[int(j)]['cluster_weight'] = 0.0;
ctd[int(j)]['inlambda'] = 0.0;
ctd[int(j)]['throughput'] = 0.0;
ctd[int(j)]['latency'] = 0.0;
ctd[int(j)]['cpu'] = 0.0;
else:
if len(label) > 0:
for j in label:
ctd[int(j)]['cluster_weight'] = float(float(num_of_meas[str(j)]) / float(meas_sum));
ctd[int(j)]['inlambda'] = centroids[int(j)][0];
ctd[int(j)]['throughput'] = centroids[int(j)][1];
ctd[int(j)]['latency'] = centroids[int(j)][2];
ctd[int(j)]['cpu'] = centroids[int(j)][3];
else:
for j in range(0, k):
ctd[int(j)]['cluster_weight'] = 0.0;
ctd[int(j)]['inlambda'] = 0.0;
ctd[int(j)]['throughput'] = 0.0;
ctd[int(j)]['latency'] = 0.0;
ctd[int(j)]['cpu'] = 0.0;
if len(label_viol) > 0:
for j in label_viol:
ctd[int(self.utils.num_of_clusters)-1]['cluster_weight'] = float(float(num_of_meas_viol[str(0)]) / float(meas_sum));
ctd[int(self.utils.num_of_clusters)-1]['inlambda'] = centroids_viol[int(j)][0];
ctd[int(self.utils.num_of_clusters)-1]['throughput'] = centroids_viol[int(j)][1];
ctd[int(self.utils.num_of_clusters)-1]['latency'] = centroids_viol[int(j)][2];
ctd[int(self.utils.num_of_clusters)-1]['cpu'] = centroids_viol[int(j)][3];
else:
print "DOKMEANS self.memory[state]['arrayMeas'] is None :|";
return ctd;
def moving_average(self, iterable, n=3):
# moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0
# http://en.wikipedia.org/wiki/Moving_average
it = iter(iterable)
d = deque(itertools.islice(it, n-1))
d.appendleft(0)
s = sum(d)
for elem in it:
s += elem - d.popleft()
d.append(elem)
yield s / float(n)
def failuresCalculator(self,currentState, nextState):
successfullActions = 0
if currentState > nextState: #remove action
removeVmsNum = currentState - nextState
pn = self.utils.removeFailureProb*pow(self.utils.failure_alpha_const,removeVmsNum)*pow(self.utils.failure_beta_const,self.generate_n_current());
for i in range(0,removeVmsNum):
successfullActions = successfullActions + 1 - self.generate(pn)
return currentState - successfullActions
else: #add action
addVmsNum = nextState - currentState
pn = self.utils.addFailureProb*pow(self.utils.failure_alpha_const,addVmsNum)*pow(self.utils.failure_beta_const,self.generate_n_current());
for i in range(0,addVmsNum):
successfullActions = successfullActions + 1 - self.generate(pn)
return currentState + successfullActions
def generate(self,p):
return int(random.random() < p)
def generate_n_current(self):
return self.n_current
def getMetricConsts(self, from_inlambda, to_inlambda, p, allmetrics):
latencyConst = defaultdict(dict);
throughputConst = defaultdict(dict);
cluster_weights = defaultdict(dict);
for i in range(int(self.utils.min_cluster_size), int(self.utils.max_cluster_size)+1):
vals = defaultdict(dict);
met = defaultdict(dict);
if self.measurementsPolicy.startswith('average'):
met = self.getAverages(str(i))
elif self.measurementsPolicy.startswith('centroid'):
met[i] = self.doKmeansMultiCluster(str(i), from_inlambda, to_inlambda);
for j in range(0,int(self.utils.num_of_clusters)):
vals[j]['num_nodes'] = i
if met[i][j] != None and len(met[i][j]) > 0 :
vals[j]['inlambda'] = met[i][j]['inlambda']
vals[j]['throughput'] = met[i][j]['throughput']
vals[j]['latency'] = met[i][j]['latency']
vals[j]['cpu'] = met[i][j]['cpu']
else:
# No clue for this state use defaults/current measurements...
vals[j]['inlambda'] = allmetrics['inlambda']
vals[j]['throughput'] = allmetrics['throughput']
vals[j]['latency'] = allmetrics['latency']
if int(self.currentState) == i and p == 0:
vals[j]['inlambda'] = allmetrics['inlambda']
vals[j]['latency'] = allmetrics['latency']
vals[j]['throughput'] = allmetrics['throughput']
# print "CURRENT LATENCY:" + str(allmetrics['latency'])
allmetrics['num_nodes'] = i
allmetrics['max_num_nodes'] = int(self.utils.max_cluster_size)
total_cost,norm_total_cost = self.get_the_cost()
allmetrics['total_cost'] = total_cost
allmetrics['norm_total_cost'] = norm_total_cost
allmetrics['norm_latency'] = self.get_norm_latency(allmetrics['latency'],np.mean(self.latencies),np.std(self.latencies))
if(float(float(allmetrics['latency'])) > float(self.utils.latency_threshold)):
cur_gain = float(self.utils.gain_punishment);
else:
cur_gain = float(eval(self.utils.gain,allmetrics))
self.memory[str(i)]['r'] = cur_gain
if(len(latencyConst[i]) == 0):
latencyConst[i] = defaultdict(dict);
if(len(throughputConst[i]) == 0):
throughputConst[i] = defaultdict(dict);
if(len(cluster_weights[i]) == 0):
cluster_weights[i] = defaultdict(dict);
if (vals[j]['latency'] == None):
latencyConst[i][j] = 0;
else:
latencyConst[i][j] = vals[j]['latency'];
if (vals[j]['throughput'] == None):
throughputConst[i][j] = 0;
else:
throughputConst[i][j] = vals[j]['throughput'];
#Adding cluster_weight = 1 for the first cluster of the current state and 0 sfor the other cluster
if int(self.currentState) == i and p == 0:
if j == 0:
cluster_weights[i][j] = 1.0;
else:
cluster_weights[i][j] = 0.0;
else:
if (len(met[i][j]) == 0):
cluster_weights[i][j] = 0;
else:
cluster_weights[i][j] = met[i][j]['cluster_weight'];
return latencyConst,throughputConst, cluster_weights
def predict_lambda(self,steps_ahead,d_l):
#*10 as want 5 minutes steps and the load is in 30seconds intervals
load = self.load_generator.get_next_load(steps_ahead*10) + self.load_generator.myR(-steps_ahead*100, steps_ahead*100);
return load;
# print("Current Lambda: " + str(d_l))
# today = datetime.datetime.now()
# data_index = pandas.date_range(end=today, periods = len(self.past_lambda),freq='30S')
# data_series = pandas.Series(self.past_lambda, index = data_index)
# try:
# model = sm.tsa.ARMA(data_series,(2,1)).fit()
# except Exception,e:
# print("An Exception Occured on ARMA")
# data_series = pandas.Series(self.past_lambda, index = data_index)[len(self.past_lambda)-10:]
# X = np.array(range(1,len(data_series.values)+1))
# X = X.reshape((-1,1))
# y = np.array(data_series.values)
# model = LinearRegression()
# model.fit(X, y)
# inlambda = model.predict([[len(data_series.values)+steps_ahead]])[0]
# if(inlambda < 0):
# inlambda = float(d_l)
# print("Linear Regression: " + str(inlambda))
# return inlambda
#Big Brother is watching you!
#prediction = model.predict(str(today), str(today+datetime.timedelta(seconds=steps_ahead*30)), dynamic=True)
#print("Predicted Lambda: " + str(prediction[len(prediction)-1]))
# return prediction[len(prediction)-1]
def get_norm_latency(self,latency, mean_lat, std_lat):
z_latency = 0
if((latency-mean_lat)/std_lat>1):
z_latency = 1
elif((latency-mean_lat)/std_lat<(-1)):
z_latency = -1
else:
z_latency = (latency-mean_lat)/std_lat;
return (z_latency+1.0)/2.0;
def get_the_cost(self):
total_cost = 0
for vm_type in self.vm_types_currentState:
if('_sp' not in vm_type):
total_cost = total_cost + self.vm_types_currentState[vm_type]*self.utils.vm_types_hour_cost[vm_type]
else:
total_cost += self.vm_types_currentState[vm_type]*self.spotHandler.getMarketPrice(self.simTime, self.utils.spotInstanceType)
min_total_cost = 0
for vm_type in self.utils.vm_types_hour_cost.keys():
min_total_cost = min_total_cost + self.utils.vm_types_hour_cost[vm_type]*self.utils.vm_types_min[vm_type]
max_total_cost = 0
for vm_type in self.utils.vm_types_hour_cost.keys():
max_total_cost = max_total_cost + self.utils.vm_types_hour_cost[vm_type]*self.utils.vm_types_max[vm_type]
norm_total_cost = (total_cost-min_total_cost)/(max_total_cost-min_total_cost)
return total_cost,norm_total_cost
def ewma(self, alist):
today = datetime.datetime.now()
data_index = pandas.date_range(end=today, periods = len(alist),freq='30S')
data = np.array(alist)
data_series = pandas.Series(data, index = data_index)
a = 0.85
z = pandas.ewma(data_series,com=a/(1.0-a))
return z.values[len(z.values)-1]
def verify(self, pctl_property, shortVersion=False, allowed_actions=["add","remove","no_op"]):
simDatetime = None
simTime = None
con = sqlite.connect(self.utils.db_file)
cur = con.cursor()
try:
if(self.past_log == -1):
metrics = cur.execute("select inlambda,latency,throughput,simDate,simCount from exp where id=1").fetchone()
self.rcvallmetrics['inlambda'] = float(metrics[0])
self.rcvallmetrics['latency'] = float(metrics[1])
self.rcvallmetrics['throughput'] = float(metrics[2])
simDatetime = datetime.datetime.strptime(metrics[3], '%b %d %Y %I:%M:%S%p')
simTime = int(metrics[4])
else:
startDate = cur.execute("select startDate from exp where id=1").fetchone()[0]
simDatetime = datetime.datetime.strptime(startDate, '%b %d %Y %I:%M:%S%p')+datetime.timedelta(0,self.past_log*30)
simTime = self.past_log
except sqlite.DatabaseError:
con.rollback()
cur.close()
con.close()
con = sqlite.connect(self.utils.db_file)
cur = con.cursor()
total_vms = 0
try:
if(self.past_log == -1):
for vm_type in self.utils.vm_types:
vms_number = int(cur.execute("select count(*) from instances where vm_type='"+str(vm_type)+"'").fetchone()[0])
self.vm_types_currentState[vm_type] = vms_number
total_vms += vms_number
else:
total_vms = int(self.currentState)
except sqlite.DatabaseError:
con.rollback()
cur.close()
con.close()
self.currentState = str(total_vms)
startTime = datetime.datetime.now();
allmetrics = None
allmetrics = self.rcvallmetrics.copy()
if not allmetrics.has_key('inlambda'):
allmetrics['inlambda'] = 0
if not allmetrics.has_key('throughput'):
allmetrics['throughput'] = 0
if not allmetrics.has_key('latency'):
allmetrics['latency'] = 0
if not allmetrics.has_key('cpu'):
allmetrics['cpu'] = 0
from_inlambda = 0.95 * allmetrics['inlambda']
to_inlambda = 1.05 * allmetrics['inlambda']
# too low to care, the initial num of nodes can answer 1000 req/sec, so consider it as 0 1000 * len(cluster.size)!!
if 0.0 < to_inlambda < 1000:
from_inlambda = 0.0
predicted_lambdas = {}
predicted_lambdas[0] = [allmetrics['inlambda']]
for p in range(1,int(self.utils.max_steps)+1):
predicted_lambdas[p] = []
predicted_lambdas[p].append(self.predict_lambda(p,allmetrics['inlambda']))
#initialization
latencyConstDict = {0:[]}
throughputConstDict = {0:[]}
cluster_weightsDict = {0:[]}
for p in range(0,int(self.utils.max_steps)+1):
for a in range(0,len(predicted_lambdas[p])):
from_inlambda = 0.95 * predicted_lambdas[p][a]
to_inlambda = 1.05 * predicted_lambdas[p][a]
if 0.0 < to_inlambda < 1000:
from_inlambda = 0.0
latencyConst, throughputConst, cluster_weights = self.getMetricConsts(from_inlambda, to_inlambda, p, allmetrics)
latencyConstDict[a].append(latencyConst)
throughputConstDict[a].append(throughputConst)
cluster_weightsDict[a].append(cluster_weights)
vm_types_num_nodes={}
for vm_type in self.vm_types_currentState:
vm_types_num_nodes[vm_type] = [self.utils.vm_types_min[vm_type],self.utils.vm_types_max[vm_type],self.vm_types_currentState[vm_type]]
cost_spot_step = {}
for vm_type in self.vm_types_currentState:
if("_sp" in vm_type):
cost_spot_step[vm_type] = [self.utils.vm_types_hour_cost[vm_type]] * (int(self.utils.max_steps)+1)
if(len(self.spotHandler.fetchSpots()) > 0):
for i in range(0,int(self.utils.max_steps)+1):
cost_spot_step[vm_type][i] = self.spotHandler.getMarketPrice(self.simTime+(i*10), self.utils.spotInstanceType)
spot_bid_price = {}
for vm_type in self.vm_types_currentState:
if("_sp" in vm_type):
spot_bid_price[vm_type] = self.spotHandler.getBidPrice(vm_type, self.utils.spotInstanceType)
remove_limits_vm_types = {}
if(self.utils.hourly):
for vm_type in self.utils.vm_types:
remove_steps = []
for i in range(0,int(self.utils.max_steps)+1):
if i == 0:
remove_step_0 = min(self.utils.rem_nodes, len(self.utils.get_remove_candidate_instances_sim(simDatetime,self.utils.last_vm_secs,vm_type)))
remove_steps.append(remove_step_0)
else:
#how many can I remove after i*5mins later?
remove_step_x = min(self.utils.rem_nodes, len(self.utils.get_remove_candidate_instances_sim(simDatetime+datetime.timedelta(0,i*300),self.utils.last_vm_secs,vm_type)))
remove_steps.append(remove_step_x)
remove_limits_vm_types[vm_type] = remove_steps
self.actionsList = []
mean_lat = np.mean(self.latencies)
std_lat = np.std(self.latencies)
out = self.mdp_handler.verifyPCTL(pctl_property, self.currentState, latencyConstDict, throughputConstDict, cluster_weightsDict, vm_types_num_nodes, remove_limits_vm_types, cost_spot_step, spot_bid_price, allowed_actions=allowed_actions, mean_lat=mean_lat, std_lat=std_lat);
endTime = datetime.datetime.now();
if(shortVersion):
response = ""
for line in out.split("\n"):
if "Result: " in line:
response = "<p>Result: "+line.split(' ')[1]+'</p>';
return response
return out
if __name__ == '__main__':
mdp = Verification()