from pysqlite2 import dbapi2 as sqlite
#import sys, os
import sys, time, paramiko, os, requests, base64, json, time
import Utils
from Instance import Instance
headers = {
"accept": "application/json",
"content-type": "application/json",
"user-agent": "Ganeti RAPI Client",
}
class GanetiCluster(object):
'''
This class holds all instances that take part in the virtual cluster.
It can create and stop new instances - and working in conjuction with the
db specific classes set up various environments.
'''
def __init__(self,properties_file):
'''
Constructor
'''
requests.packages.urllib3.disable_warnings()
self.utils = Utils.Utils(properties_file)
self.base_url = 'https://192.168.1.6:5080/2/'
self.username = "ganeti_webmgr"
self.password = "9CCNPcF4SJyX"
# Make sure the sqlite file exists. if not, create it and the table we need
con = sqlite.connect(self.utils.db_file)
cur = con.cursor()
try:
instances = cur.execute('select * from instances'
).fetchall()
print """Already discovered instances from previous database file. Use describe_instances without arguments to update.
"""
print "Found records:\n", instances
except sqlite.DatabaseError:
cur.execute('create table instances(id text, image_id text, dns_name text, ip_address text, mac_address text, vpn text, state text, instance_type text, launch_time text, placement text, username text, password text)')
con.commit()
cur.close()
con.close()
def describe_instances(self, state=None, pattern=None):
instances = []
if state != "pollDB":
kwargs = {
"headers": headers,
"auth": (self.username, self.password),
"verify": False,
"params":"bulk=1"
}
r = requests.request("get", self.base_url+"instances", **kwargs)
list_answer = json.loads(r.content)
for i in range(0,len(list_answer)):
#TODO check the image id
server_id = str(list_answer[i]['name']);
server_image_id = str(list_answer[i]['os']);
server_state = str(list_answer[i]['status']);
server_instance_type = str(list_answer[i]['disk_usage'])+"_"+str(list_answer[i]['oper_vcpus'])+"_"+str(list_answer[i]['oper_ram']);
server_launch_time = str(list_answer[i]['mtime']);
server_placement = str(list_answer[i]['pnode']);
server_vpn = str(list_answer[i]['pnode'])+":"+str(list_answer[i]['network_port']);
server_dns_name = str(list_answer[i]['name']);
server_mac_address = str(list_answer[i]['nic.macs'][0])
server_ip_address = self.utils.mac_ip_mapping[server_dns_name][1];
instance = Instance(server_id, server_image_id, server_dns_name, server_ip_address, server_mac_address, server_vpn, server_state, server_instance_type, server_launch_time, server_placement, "root","secretpw");
is_mine = False
r_tags = requests.request("get", self.base_url+"instances/"+server_id+"/tags", **kwargs)
list_tag_answer = json.loads(r_tags.content)
for j in range(0,len(list_tag_answer)):
if list_tag_answer[j] == "owner:"+self.utils.ownerTag:
is_mine = True
break
if is_mine:
is_mine = False
if state:
if state == server_state:
instances.append(instance)
else:
instances.append(instance)
## if simple call
if not state:
self.utils.refresh_instance_db(instances)
else :
## read from the local database
con = sqlite.connect(self.utils.db_file)
cur = con.cursor()
instancesfromDB = []
try:
instancesfromDB = cur.execute('select * from instances'
).fetchall()
except sqlite.DatabaseError:
con.rollback()
cur.close()
con.close()
for instancefromDB in instancesfromDB:
instances.append(self.utils.return_instance_from_tuple(instancefromDB))
## if you are using patterns and state, show only matching state and id's
matched_instances = []
if pattern:
for instance in instances:
if instance.id.find(pattern) != -1:
matched_instances.append(instance)
if len(matched_instances) > 0:
return matched_instances
else:
return None
else:
return instances
def describe_images(self, pattern=None):
kwargs = {
"headers": headers,
"auth": (self.username, self.password),
"verify": False
}
r = requests.request("get", self.base_url+"os", **kwargs)
images = json.loads(r.content)
return images
def get_occupied_dns_names(self):
occupied = []
kwargs = {
"headers": headers,
"auth": (self.username, self.password),
"verify": False,
"params":"bulk=1"
}
r = requests.request("get", self.base_url+"instances", **kwargs)
list_answer = json.loads(r.content)
for i in range(0,len(list_answer)):
occupied.append(str(list_answer[i]['name']));
return occupied
def run_instances(self, instance_count, os, disk, cpu, ram):
occupied = self.get_occupied_dns_names();
return self.run_instances_names(self.utils.getFree_dns_names(int(instance_count),occupied), os, disk, cpu, ram)
def run_instances_names(self, names, os, disk, cpu, ram):
instances = []
pending_instances = {}
jobIDs = []
running_vm_cnt = 0
for name in names:
if running_vm_cnt == 0:
print "running master instance: " + name
kwargs = {
"headers": headers,
"auth": (self.username, self.password),
"verify": False
}
body = {
"__version__": 1,
"mode": 'create',
"hypervisor": 'kvm',
"os_type": self.utils.master_bucket_name,
"instance_name": name,
"disk_template": 'plain',
"disks": [{'size':self.utils.m_disk}],
"beparams": {"memory":self.utils.m_ram,"maxmem":self.utils.m_ram,"minmem":self.utils.m_ram,'vcpus':self.utils.m_cpu},
"hvparams": {'vnc_bind_address':'0.0.0.0'},
"nics": [{'ip':self.utils.mac_ip_mapping[name][1],'mac':self.utils.mac_ip_mapping[name][0]}]
}
body['pnode'] = 'panther.delab.csd.auth.gr'
else:
print "running instance: " + name
kwargs = {
"headers": headers,
"auth": (self.username, self.password),
"verify": False
}
body = {
"__version__": 1,
"mode": 'create',
"hypervisor": 'kvm',
"os_type": os,
"instance_name": name,
"disk_template": 'plain',
"disks": [{'size':disk}],
"beparams": {"memory":ram,"maxmem":ram,"minmem":ram,'vcpus':cpu},
"hvparams": {'vnc_bind_address':'0.0.0.0'},
"nics": [{'ip':self.utils.mac_ip_mapping[name][1],'mac':self.utils.mac_ip_mapping[name][0]}]
}
#rabbit_limit = int(self.utils.rabbit_maxmem/int(ram))+1
if running_vm_cnt < 9:
body['pnode'] = 'rabbit.delab.csd.auth.gr'
elif running_vm_cnt < 15:
body['pnode'] = 'anaconda.delab.csd.auth.gr'
else:
body['pnode'] = 'deer.delab.csd.auth.gr'
# else:
# body['pnode'] = 'deer.delab.csd.auth.gr'
kwargs["data"] = json.dumps(body)
r = requests.request("post", self.base_url+"instances", **kwargs)
jobID = json.loads(r.content)
jobIDs.append(jobID)
instance = Instance(name, os, name, self.utils.mac_ip_mapping[name][1], self.utils.mac_ip_mapping[name][0], "", "", disk+"_"+cpu+"_"+ram, str(time.time()), "", "root","secretpw");
pending_instances[str(jobID)] = instance
running_vm_cnt = running_vm_cnt + 1
failures = []
for JID in jobIDs:
status = ""
while status != "success":
time.sleep(5)
kwargs = {
"headers": headers,
"auth": (self.username, self.password),
"verify": False
}
body = {
"fields": ['status'],
}
kwargs["data"] = json.dumps(body)
r = requests.request("get", self.base_url+"jobs/"+str(JID)+"/wait", **kwargs)
status = json.loads(r.content)['job_info'][0]
# print status
if (status == 'error'):
failures.append(JID)
break
elif(status == 'success'):
instances.append(pending_instances[str(JID)]);
#print failures
self.tag_instances_owner(instances)
return instances
def tag_instances_owner(self, instances):
jobIDs = []
headersTag = {
"accept": "application/json",
"user-agent": "Ganeti RAPI Client",
}
for instance in instances:
print "tagging instance: " + instance.id
kwargs = {
"headers": headersTag,
"auth": (self.username, self.password),
"verify": False
}
body = {
"tag": ['owner:'+self.utils.ownerTag]
}
kwargs["params"] = body
r = requests.request("put", self.base_url+"instances/"+instance.id+"/tags", **kwargs)
jobID = json.loads(r.content)
jobIDs.append(jobID)
def terminate_instances(self, names):
for name in names:
kwargs = {
"headers": headers,
"auth": (self.username, self.password),
"verify": False
}
print "Terminating: ", name
r = requests.request("delete", self.base_url+"instances/"+str(name), **kwargs)
self.utils.free_ips.append(name)
def block_until_running (self, instances):
''' Blocks until all defined instances have reached running state and an ip has been assigned'''
## Run describe instances until everyone is running
running_instances = []
remaining = len(instances)
for instance in instances:
print "Waiting for", remaining, "instances to run"
name = instance.dns_name
kwargs = {
"headers": headers,
"auth": (self.username, self.password),
"verify": False
}
status = ""
timeout = 300
while status != "running" and timeout > 0:
time.sleep(5)
r = requests.request("get", self.base_url+"instances/"+str(name), **kwargs)
status = json.loads(r.content)['status']
timeout = timeout - 5
if (timeout <= 0):
print "timeout while waiting for running state: " + str(name)
else:
remaining = remaining - 1
running_instances.append(instance)
#call describe_instances() to store instances to db
self.describe_instances()
return self.block_until_ping(instances)
def block_until_ping(self, instances):
''' Blocks until all defined instances have reached running state and an ip has been assigned'''
## Run describe instances until everyone is running
ping_instances = []
remaining = len(instances)
for instance in instances:
print "Waiting for", remaining, "instances to ping"
ping = False
timeout = 150
while (not ping and timeout > 0):
response = os.system("ping -c 1 -W 4 " + instance.ip_address)
if response == 0:
ping = True;
else:
time.sleep(1);
timeout = timeout - 1
if (timeout <= 0):
print "timeout while trying to ping: " + str(instance.ip_address)
else:
remaining = remaining - 1
ping_instances.append(instance)
return instances
if __name__ == "__main__":
euca = GanetiCluster('Coordinator_gf_r1_0p_6a_4r_4cl_smg_s_first_True_safe_timelimit.properties')
euca.describe_instances();
euca.block_until_ping(euca.block_until_running(euca.run_instances(['vm50.delab.csd.auth.gr','vm51.delab.csd.auth.gr'],'snf-image+hadoop_limit','5G','2','1G')));
k = euca.describe_instances();
euca.terminate_instances([k[0].dns_name,k[1].dns_name]);