diff --git a/conf/lxc-script/lxc-ifdown b/conf/lxc-script/lxc-ifdown index b0324bf..7d55723 100755 --- a/conf/lxc-script/lxc-ifdown +++ b/conf/lxc-script/lxc-ifdown @@ -9,5 +9,8 @@ ovs-vsctl --if-exists del-port $Bridge $5 cnt=$(ovs-vsctl list-ports ${Bridge} | wc -l) if [ "$cnt" = "1" ]; then - ovs-vsctl del-br $Bridge + greport=$(ovs-vsctl list-ports $(Bridge) | grep "^gre-[[:digit:]][[:digit:]]*-[[:digit:]][[:digit:]]*\.[[:digit:]][[:digit:]]*\.[[:digit:]][[:digit:]]*\.[[:digit:]][[:digit:]]*$" | wc -l) + if [ "$greport" = "1" ]; then + ovs-vsctl del-br $Bridge + fi fi diff --git a/src/httprest.py b/src/httprest.py index e9f855f..e2ca16f 100755 --- a/src/httprest.py +++ b/src/httprest.py @@ -204,9 +204,8 @@ def start_cluster(user, beans, form): if (clustername == None): return json.dumps({'success':'false', 'message':'clustername is null'}) user_info = post_to_user("/user/selfQuery/", {'token':form.get("token")}) - uid = user_info['data']['id'] logger.info ("handle request : start cluster %s" % clustername) - [status, result] = G_vclustermgr.start_cluster(clustername, user, uid) + [status, result] = G_vclustermgr.start_cluster(clustername, user, user_info) if status: return json.dumps({'success':'true', 'action':'start cluster', 'message':result}) else: diff --git a/src/nettools.py b/src/nettools.py index 492592c..dc51476 100755 --- a/src/nettools.py +++ b/src/nettools.py @@ -231,6 +231,51 @@ class ovscontrol(object): except subprocess.CalledProcessError as suberror: return [False, "set port tag failed : %s" % suberror.stdout.decode('utf-8')] + @staticmethod + def set_port_input_qos(port, input_rate_limit): + input_rate_limiting = int(input_rate_limit)*1000 + try: + p = subprocess.run(['ovs-vsctl', 'create', 'qos', 'type=linux-htb', 'other_config:max-rate='+str(input_rate_limiting)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) + subprocess.run(['ovs-vsctl', 'set', 'Port', str(port), 'qos='+p.stdout.decode('utf-8').rstrip()], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) + return [True, str(port)] + except subprocess.CalledProcessError as suberror: + return [False, "set port input qos failed : %s" % suberror.stdout.decode('utf-8')] + + @staticmethod + def del_port_input_qos(port): + try: + p = subprocess.run(['ovs-vsctl', 'get', 'port', str(port), 'qos'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) + subprocess.run(['ovs-vsctl', 'clear', 'port', str(port), 'qos'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) + subprocess.run(['ovs-vsctl', 'destroy', 'qos', p.stdout.decode('utf-8').rstrip()], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) + return [True, str(port)] + except subprocess.CalledProcessError as suberror: + return [False, "del port input qos failed : %s" % suberror.stdout.decode('utf-8')] + + @staticmethod + def set_port_output_qos(port, output_rate_limit): + try: + subprocess.run(['ovs-vsctl', 'set', 'interface', str(port), 'ingress_policing_rate='+str(output_rate_limit)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) + subprocess.run(['ovs-vsctl', 'set', 'interface', str(port), 'ingress_policing_burst='+str(output_rate_limit)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) + return [True, str(port)] + except subprocess.CalledProcessError as suberror: + return [False, "set port output qos failed : %s" % suberror.stdout.decode('utf-8')] + + @staticmethod + def del_port_output_qos(port): + try: + subprocess.run(['ovs-vsctl', 'set', 'interface', str(port), 'ingress_policing_rate=0'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) + subprocess.run(['ovs-vsctl', 'set', 'interface', str(port), 'ingress_policing_burst=0'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) + return [True, str(port)] + except subprocess.CalledProcessError as suberror: + return [False, "del port output qos failed : %s" % suberror.stdout.decode('utf-8')] + + @staticmethod + def destroy_all_qos(): + try: + ret = subprocess.run(['ovs-vsctl', '--all', 'destroy', 'qos'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) + return [True, 'succeed to destroying all qos.'] + except subprocess.CalledProcessError as suberror: + return [False, "destroy all qos failed : %s" % suberror.stdout.decode('utf-8')] class netcontrol(object): @staticmethod @@ -259,24 +304,36 @@ class netcontrol(object): return ovscontrol.port_exists(gwport) @staticmethod - def setup_gw(bridge, gwport, addr): + def setup_gw(bridge, gwport, addr, input_rate_limit, output_rate_limit): [status, result] = ovscontrol.add_port_internal(bridge, gwport) if not status: return [status, result] [status, result] = ipcontrol.add_addr(gwport, addr) if not status: return [status, result] - return ipcontrol.up_link(gwport) + [status, result] = ipcontrol.up_link(gwport) + if not status: + return [status, result] + [status, result] = ovscontrol.set_port_input_qos(gwport, input_rate_limit) + if not status: + return [status, result] + return ovscontrol.set_port_output_qos(gwport, output_rate_limit) @staticmethod def del_gw(bridge, gwport): + [status, result] = ovscontrol.del_port_input_qos(gwport) + if not status: + return [status, result] + [status, result] = ovscontrol.del_port_output_qos(gwport) + if not status: + return [status, result] return ovscontrol.del_port(bridge, gwport) @staticmethod - def check_gw(bridge, gwport, uid, addr): + def check_gw(bridge, gwport, uid, addr, input_rate_limit, output_rate_limit): ovscontrol.add_bridge(bridge) if not netcontrol.gw_exists(bridge, gwport): - return netcontrol.setup_gw(bridge, gwport, addr) + return netcontrol.setup_gw(bridge, gwport, addr, input_rate_limit, output_rate_limit) [status, info] = ipcontrol.link_info(gwport) if not status: return [False, "get gateway info failed"] diff --git a/src/network.py b/src/network.py index d74c4e4..50654fb 100755 --- a/src/network.py +++ b/src/network.py @@ -450,7 +450,7 @@ class NetworkMgr(object): self.load_usrgw(username) return username in self.usrgws.keys() - def setup_usrgw(self, username, uid, nodemgr, workerip=None): + def setup_usrgw(self, input_rate_limit, output_rate_limit, username, uid, nodemgr, workerip=None): if not self.has_user(username): return [False,"user doesn't exist."] self.load_usrgw(username) @@ -464,12 +464,12 @@ class NetworkMgr(object): logger.info("setup gateway for %s with %s on %s" % (username, usrpools.get_gateway_cidr(), ip)) self.usrgws[username] = ip self.dump_usrgw(username) - worker.setup_gw('docklet-br-'+str(uid), username, usrpools.get_gateway_cidr()) + worker.setup_gw('docklet-br-'+str(uid), username, usrpools.get_gateway_cidr(), input_rate_limit, output_rate_limit) else: logger.info("setup gateway for %s with %s on master" % (username, usrpools.get_gateway_cidr() )) self.usrgws[username] = self.masterip self.dump_usrgw(username) - netcontrol.setup_gw('docklet-br-'+str(uid), username, usrpools.get_gateway_cidr()) + netcontrol.setup_gw('docklet-br-'+str(uid), username, usrpools.get_gateway_cidr(), input_rate_limit, output_rate_limit) self.dump_user(username) del self.users[username] return [True, "set up gateway success"] @@ -527,7 +527,7 @@ class NetworkMgr(object): del self.users[username] return [True, 'delete user success'] - def check_usergw(self, username, uid, nodemgr, distributedgw=False): + def check_usergw(self, input_rate_limit, output_rate_limit, username, uid, nodemgr, distributedgw=False): logger.info("Check %s(%s) user gateway."%(username, str(uid))) if not self.has_user(username): return [False,"user doesn't exist."] @@ -542,10 +542,10 @@ class NetworkMgr(object): self.del_usrgwbr(username,uid,nodemgr) self.usrgws[username] = self.masterip self.dump_usrgw(username) - netcontrol.check_gw('docklet-br-'+str(uid), username, uid, self.users[username].get_gateway_cidr()) + netcontrol.check_gw('docklet-br-'+str(uid), username, uid, self.users[username].get_gateway_cidr(), input_rate_limit, output_rate_limit) else: worker = nodemgr.ip_to_rpc(ip) - worker.check_gw('docklet-br-'+str(uid), username, uid, self.users[username].get_gateway_cidr()) + worker.check_gw('docklet-br-'+str(uid), username, uid, self.users[username].get_gateway_cidr(), input_rate_limit, output_rate_limit) del self.users[username] return [True, 'check gw ok'] diff --git a/src/userManager.py b/src/userManager.py index 365ee39..a50defa 100755 --- a/src/userManager.py +++ b/src/userManager.py @@ -167,10 +167,10 @@ class userManager: if not os.path.exists(fspath+"/global/sys/quota"): groupfile = open(fspath+"/global/sys/quota",'w') groups = [] - groups.append({'name':'root', 'quotas':{ 'cpu':'4', 'disk':'2000', 'data':'100', 'memory':'2000', 'image':'10', 'idletime':'24', 'vnode':'8' }}) - groups.append({'name':'admin', 'quotas':{'cpu':'4', 'disk':'2000', 'data':'100', 'memory':'2000', 'image':'10', 'idletime':'24', 'vnode':'8'}}) - groups.append({'name':'primary', 'quotas':{'cpu':'4', 'disk':'2000', 'data':'100', 'memory':'2000', 'image':'10', 'idletime':'24', 'vnode':'8'}}) - groups.append({'name':'foundation', 'quotas':{'cpu':'4', 'disk':'2000', 'data':'100', 'memory':'2000', 'image':'10', 'idletime':'24', 'vnode':'8'}}) + groups.append({'name':'root', 'quotas':{ 'cpu':'4', 'disk':'2000', 'data':'100', 'memory':'2000', 'image':'10', 'idletime':'24', 'vnode':'8', 'input_rate_limit':'10000', 'output_rate_limit':'10000'}}) + groups.append({'name':'admin', 'quotas':{'cpu':'4', 'disk':'2000', 'data':'100', 'memory':'2000', 'image':'10', 'idletime':'24', 'vnode':'8', 'input_rate_limit':'10000', 'output_rate_limit':'10000'}}) + groups.append({'name':'primary', 'quotas':{'cpu':'4', 'disk':'2000', 'data':'100', 'memory':'2000', 'image':'10', 'idletime':'24', 'vnode':'8', 'input_rate_limit':'10000', 'output_rate_limit':'10000'}}) + groups.append({'name':'foundation', 'quotas':{'cpu':'4', 'disk':'2000', 'data':'100', 'memory':'2000', 'image':'10', 'idletime':'24', 'vnode':'8', 'input_rate_limit':'10000', 'output_rate_limit':'10000'}}) groupfile.write(json.dumps(groups)) groupfile.close() if not os.path.exists(fspath+"/global/sys/quotainfo"): @@ -185,6 +185,8 @@ class userManager: quotas['quotainfo'].append({'name':'image', 'hint':'how many images the user can save, e.g. 10'}) quotas['quotainfo'].append({'name':'idletime', 'hint':'will stop cluster after idletime, number of hours, e.g. 24'}) quotas['quotainfo'].append({'name':'vnode', 'hint':'how many containers the user can have, e.g. 8'}) + quotas['quotainfo'].append({'name':'input_rate_limit', 'hint':'the ingress speed of the network, number of kbps'}) + quotas['quotainfo'].append({'name':'output_rate_limit', 'hint':'the egress speed of the network, number of kbps'}) quotafile.write(json.dumps(quotas)) quotafile.close() if not os.path.exists(fspath+"/global/sys/lxc.default"): @@ -853,7 +855,7 @@ class userManager: if (user_check != None and (user_check.status == "init")): db.session.delete(user_check) db.session.commit() - else: + else: newuser.password = hashlib.sha512(newuser.password.encode('utf-8')).hexdigest() db.session.add(newuser) db.session.commit() diff --git a/src/vclustermgr.py b/src/vclustermgr.py index e01c76b..16ed088 100755 --- a/src/vclustermgr.py +++ b/src/vclustermgr.py @@ -56,11 +56,23 @@ class VclusterMgr(object): logger.info("recovering all vclusters for all users...") usersdir = self.fspath+"/global/users/" auth_key = env.getenv('AUTH_KEY') + res = post_to_user("/master/user/groupinfo/", {'auth_key':auth_key}) + #logger.info(res) + groups = json.loads(res['groups']) + quotas = {} + for group in groups: + logger.info(group) + quotas[group['name']] = group['quotas'] for user in os.listdir(usersdir): for cluster in self.list_clusters(user)[1]: logger.info ("recovering cluster:%s for user:%s ..." % (cluster, user)) - res = post_to_user('/user/uid/',{'username':user,'auth_key':auth_key}) - self.recover_cluster(cluster, user, res['uid']) + #res = post_to_user('/user/uid/',{'username':user,'auth_key':auth_key}) + recover_info = post_to_user("/master/user/recoverinfo/", {'username':user,'auth_key':auth_key}) + uid = recover_info['uid'] + groupname = recover_info['groupname'] + input_rate_limit = quotas[groupname]['input_rate_limit'] + output_rate_limit = quotas[groupname]['output_rate_limit'] + self.recover_cluster(cluster, user, uid, input_rate_limit, output_rate_limit) logger.info("recovered all vclusters for all users") def mount_allclusters(self): @@ -98,6 +110,7 @@ class VclusterMgr(object): workers = self.nodemgr.get_nodeips() image_json = json.dumps(image) groupname = json.loads(user_info)["data"]["group"] + groupquota = json.loads(user_info)["data"]["groupinfo"] uid = json.loads(user_info)["data"]["id"] if (len(workers) == 0): logger.warning ("no workers to start containers, start cluster failed") @@ -106,7 +119,7 @@ class VclusterMgr(object): if not self.networkmgr.has_user(username): self.networkmgr.add_user(username, cidr=29, isshared = True if str(groupname) == "fundation" else False) if self.distributedgw == "False": - [success,message] = self.networkmgr.setup_usrgw(username, uid, self.nodemgr) + [success,message] = self.networkmgr.setup_usrgw(groupquota['input_rate_limit'], groupquota['output_rate_limit'], username, uid, self.nodemgr) if not success: return [False, message] elif not self.networkmgr.has_usrgw(username): @@ -132,7 +145,7 @@ class VclusterMgr(object): workerip = workers[random.randint(0, len(workers)-1)] oneworker = xmlrpc.client.ServerProxy("http://%s:%s" % (workerip, env.getenv("WORKER_PORT"))) if self.distributedgw == "True" and i == 0 and not self.networkmgr.has_usrgw(username): - [success,message] = self.networkmgr.setup_usrgw(username, uid, self.nodemgr, workerip) + [success,message] = self.networkmgr.setup_usrgw(groupquota['input_rate_limit'], groupquota['output_rate_limit'], username, uid, self.nodemgr, workerip) if not success: return [False, message] if i == 0: @@ -495,7 +508,10 @@ class VclusterMgr(object): else: return True - def start_cluster(self, clustername, username, uid): + def start_cluster(self, clustername, username, user_info): + uid = user_info['data']['id'] + input_rate_limit = user_info['data']['groupinfo']['input_rate_limit'] + output_rate_limit = user_info['data']['groupinfo']['output_rate_limit'] [status, info] = self.get_clusterinfo(clustername, username) if not status: return [False, "cluster not found"] @@ -529,7 +545,7 @@ class VclusterMgr(object): # check gateway for user # after reboot, user gateway goes down and lose its configuration # so, check is necessary - self.networkmgr.check_usergw(username, uid, self.nodemgr,self.distributedgw=='True') + self.networkmgr.check_usergw(input_rate_limit, output_rate_limit, username, uid, self.nodemgr,self.distributedgw=='True') # start containers for container in info['containers']: # set up gre from user's gateway host to container's host. @@ -558,7 +574,7 @@ class VclusterMgr(object): worker.mount_container(container['containername']) return [True, "mount cluster"] - def recover_cluster(self, clustername, username, uid): + def recover_cluster(self, clustername, username, uid, input_rate_limit, output_rate_limit): [status, info] = self.get_clusterinfo(clustername, username) if not status: return [False, "cluster not found"] @@ -598,7 +614,7 @@ class VclusterMgr(object): except: return [False, "start cluster failed with setting proxy failed"] # need to check and recover gateway of this user - self.networkmgr.check_usergw(username, uid, self.nodemgr,self.distributedgw=='True') + self.networkmgr.check_usergw(input_rate_limit, output_rate_limit, username, uid, self.nodemgr,self.distributedgw=='True') # recover containers of this cluster for container in info['containers']: # set up gre from user's gateway host to container's host. diff --git a/src/worker.py b/src/worker.py index b50d1c7..3850cc0 100755 --- a/src/worker.py +++ b/src/worker.py @@ -154,6 +154,9 @@ class Worker(object): ovscontrol.del_bridge(bridge) else: logger.error(bridges) + [success, message] = ovscontrol.destroy_all_qos() + if not success: + logger.error(message) '''if (self.addr == self.master): logger.info ("master also on this node. reuse master's network") else: diff --git a/user/user.py b/user/user.py index d6af40b..9c7a501 100755 --- a/user/user.py +++ b/user/user.py @@ -325,15 +325,24 @@ def selfQuery_user(cur_user, user, form): result = G_usermgr.selfQuery(cur_user = cur_user) return json.dumps(result) -@app.route("/user/uid/", methods=['POST']) +@app.route("/master/user/recoverinfo/", methods=['POST']) @auth_key_required -def get_userid(): +def get_master_recoverinfo(): username = request.form.get("username",None) if username is None: return json.dumps({'success':'false', 'message':'username field is required.'}) else: user = User.query.filter_by(username=username).first() - return json.dumps({'success':'true', 'uid':user.id}) + return json.dumps({'success':'true', 'uid':user.id, 'groupname':user.user_group}) + +@app.route("/master/user/groupinfo/", methods=['POST']) +@auth_key_required +def get_master_groupinfo(): + fspath = env.getenv('FS_PREFIX') + groupfile = open(fspath+"/global/sys/quota",'r') + groups = json.loads(groupfile.read()) + groupfile.close() + return json.dumps({'success':'true', 'groups':json.dumps(groups)}) @app.route("/user/selfModify/", methods=['POST']) @login_required @@ -470,7 +479,7 @@ def query_self_notifications_infos(cur_user, user, form): return json.dumps(result) @app.route("/billing/beans/", methods=['POST']) -#@auth_key_required +@auth_key_required def billing_beans(): logger.info("handle request: /billing/beans/") form = request.form diff --git a/web/templates/monitor/status.html b/web/templates/monitor/status.html index b122333..02616c4 100644 --- a/web/templates/monitor/status.html +++ b/web/templates/monitor/status.html @@ -32,26 +32,32 @@
CPU | -Memory | -Disk | -Vnode | -Image | -Idletime | + {% for quotaname in quotanames %} +{{ quotaname }} | + {% endfor %}|||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
{{ quotainfo['cpu'] }} Cores | - {% else %} -{{ quotainfo['cpu'] }} Core | - {% endif %} -{{ quotainfo['memory'] }} MB | -{{ quotainfo['disk'] }} MB | -{{ quotainfo['vnode'] }} | -{{ quotainfo['image'] }} | -{{ quotainfo['idletime'] }} hours | + {% for quotaname in quotanames %} + {% if quotaname == 'cpu' %} + {% if quotas['cpu'] == '1' %} +{{ quotas['cpu'] }} Core | + {% else %} +{{ quotas['cpu'] }} Cores | + {% endif %} + {% elif quotaname == 'memory' or quotaname == 'disk' %} +{{ quotas[quotaname] }} MB | + {% elif quotaname == 'idletime' %} +{{ quotas[quotaname] }} hours | + {% elif quotaname == 'input_rate_limit' or quotaname == 'output_rate_limit'%} +{{ quotas[quotaname] }} kbps | + {% elif quotaname == 'data' %} +{{ quotas[quotaname] }} GB | + {% else %} +{{ quotas[quotaname] }} | + {% endif %} + {% endfor %}