Merge pull request #274 from FirmlyReality/qos_rate_limiting

Qos rate limiting
This commit is contained in:
Yujian Zhu 2017-11-20 01:01:11 +08:00 committed by GitHub
commit 4759e29d3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 149 additions and 51 deletions

View File

@ -9,5 +9,8 @@
ovs-vsctl --if-exists del-port $Bridge $5
cnt=$(ovs-vsctl list-ports ${Bridge} | wc -l)
if [ "$cnt" = "1" ]; then
ovs-vsctl del-br $Bridge
greport=$(ovs-vsctl list-ports $(Bridge) | grep "^gre-[[:digit:]][[:digit:]]*-[[:digit:]][[:digit:]]*\.[[:digit:]][[:digit:]]*\.[[:digit:]][[:digit:]]*\.[[:digit:]][[:digit:]]*$" | wc -l)
if [ "$greport" = "1" ]; then
ovs-vsctl del-br $Bridge
fi
fi

View File

@ -204,9 +204,8 @@ def start_cluster(user, beans, form):
if (clustername == None):
return json.dumps({'success':'false', 'message':'clustername is null'})
user_info = post_to_user("/user/selfQuery/", {'token':form.get("token")})
uid = user_info['data']['id']
logger.info ("handle request : start cluster %s" % clustername)
[status, result] = G_vclustermgr.start_cluster(clustername, user, uid)
[status, result] = G_vclustermgr.start_cluster(clustername, user, user_info)
if status:
return json.dumps({'success':'true', 'action':'start cluster', 'message':result})
else:

View File

@ -231,6 +231,51 @@ class ovscontrol(object):
except subprocess.CalledProcessError as suberror:
return [False, "set port tag failed : %s" % suberror.stdout.decode('utf-8')]
@staticmethod
def set_port_input_qos(port, input_rate_limit):
input_rate_limiting = int(input_rate_limit)*1000
try:
p = subprocess.run(['ovs-vsctl', 'create', 'qos', 'type=linux-htb', 'other_config:max-rate='+str(input_rate_limiting)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
subprocess.run(['ovs-vsctl', 'set', 'Port', str(port), 'qos='+p.stdout.decode('utf-8').rstrip()], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, str(port)]
except subprocess.CalledProcessError as suberror:
return [False, "set port input qos failed : %s" % suberror.stdout.decode('utf-8')]
@staticmethod
def del_port_input_qos(port):
try:
p = subprocess.run(['ovs-vsctl', 'get', 'port', str(port), 'qos'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
subprocess.run(['ovs-vsctl', 'clear', 'port', str(port), 'qos'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
subprocess.run(['ovs-vsctl', 'destroy', 'qos', p.stdout.decode('utf-8').rstrip()], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, str(port)]
except subprocess.CalledProcessError as suberror:
return [False, "del port input qos failed : %s" % suberror.stdout.decode('utf-8')]
@staticmethod
def set_port_output_qos(port, output_rate_limit):
try:
subprocess.run(['ovs-vsctl', 'set', 'interface', str(port), 'ingress_policing_rate='+str(output_rate_limit)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
subprocess.run(['ovs-vsctl', 'set', 'interface', str(port), 'ingress_policing_burst='+str(output_rate_limit)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, str(port)]
except subprocess.CalledProcessError as suberror:
return [False, "set port output qos failed : %s" % suberror.stdout.decode('utf-8')]
@staticmethod
def del_port_output_qos(port):
try:
subprocess.run(['ovs-vsctl', 'set', 'interface', str(port), 'ingress_policing_rate=0'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
subprocess.run(['ovs-vsctl', 'set', 'interface', str(port), 'ingress_policing_burst=0'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, str(port)]
except subprocess.CalledProcessError as suberror:
return [False, "del port output qos failed : %s" % suberror.stdout.decode('utf-8')]
@staticmethod
def destroy_all_qos():
try:
ret = subprocess.run(['ovs-vsctl', '--all', 'destroy', 'qos'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, 'succeed to destroying all qos.']
except subprocess.CalledProcessError as suberror:
return [False, "destroy all qos failed : %s" % suberror.stdout.decode('utf-8')]
class netcontrol(object):
@staticmethod
@ -259,24 +304,36 @@ class netcontrol(object):
return ovscontrol.port_exists(gwport)
@staticmethod
def setup_gw(bridge, gwport, addr):
def setup_gw(bridge, gwport, addr, input_rate_limit, output_rate_limit):
[status, result] = ovscontrol.add_port_internal(bridge, gwport)
if not status:
return [status, result]
[status, result] = ipcontrol.add_addr(gwport, addr)
if not status:
return [status, result]
return ipcontrol.up_link(gwport)
[status, result] = ipcontrol.up_link(gwport)
if not status:
return [status, result]
[status, result] = ovscontrol.set_port_input_qos(gwport, input_rate_limit)
if not status:
return [status, result]
return ovscontrol.set_port_output_qos(gwport, output_rate_limit)
@staticmethod
def del_gw(bridge, gwport):
[status, result] = ovscontrol.del_port_input_qos(gwport)
if not status:
return [status, result]
[status, result] = ovscontrol.del_port_output_qos(gwport)
if not status:
return [status, result]
return ovscontrol.del_port(bridge, gwport)
@staticmethod
def check_gw(bridge, gwport, uid, addr):
def check_gw(bridge, gwport, uid, addr, input_rate_limit, output_rate_limit):
ovscontrol.add_bridge(bridge)
if not netcontrol.gw_exists(bridge, gwport):
return netcontrol.setup_gw(bridge, gwport, addr)
return netcontrol.setup_gw(bridge, gwport, addr, input_rate_limit, output_rate_limit)
[status, info] = ipcontrol.link_info(gwport)
if not status:
return [False, "get gateway info failed"]

View File

@ -450,7 +450,7 @@ class NetworkMgr(object):
self.load_usrgw(username)
return username in self.usrgws.keys()
def setup_usrgw(self, username, uid, nodemgr, workerip=None):
def setup_usrgw(self, input_rate_limit, output_rate_limit, username, uid, nodemgr, workerip=None):
if not self.has_user(username):
return [False,"user doesn't exist."]
self.load_usrgw(username)
@ -464,12 +464,12 @@ class NetworkMgr(object):
logger.info("setup gateway for %s with %s on %s" % (username, usrpools.get_gateway_cidr(), ip))
self.usrgws[username] = ip
self.dump_usrgw(username)
worker.setup_gw('docklet-br-'+str(uid), username, usrpools.get_gateway_cidr())
worker.setup_gw('docklet-br-'+str(uid), username, usrpools.get_gateway_cidr(), input_rate_limit, output_rate_limit)
else:
logger.info("setup gateway for %s with %s on master" % (username, usrpools.get_gateway_cidr() ))
self.usrgws[username] = self.masterip
self.dump_usrgw(username)
netcontrol.setup_gw('docklet-br-'+str(uid), username, usrpools.get_gateway_cidr())
netcontrol.setup_gw('docklet-br-'+str(uid), username, usrpools.get_gateway_cidr(), input_rate_limit, output_rate_limit)
self.dump_user(username)
del self.users[username]
return [True, "set up gateway success"]
@ -527,7 +527,7 @@ class NetworkMgr(object):
del self.users[username]
return [True, 'delete user success']
def check_usergw(self, username, uid, nodemgr, distributedgw=False):
def check_usergw(self, input_rate_limit, output_rate_limit, username, uid, nodemgr, distributedgw=False):
logger.info("Check %s(%s) user gateway."%(username, str(uid)))
if not self.has_user(username):
return [False,"user doesn't exist."]
@ -542,10 +542,10 @@ class NetworkMgr(object):
self.del_usrgwbr(username,uid,nodemgr)
self.usrgws[username] = self.masterip
self.dump_usrgw(username)
netcontrol.check_gw('docklet-br-'+str(uid), username, uid, self.users[username].get_gateway_cidr())
netcontrol.check_gw('docklet-br-'+str(uid), username, uid, self.users[username].get_gateway_cidr(), input_rate_limit, output_rate_limit)
else:
worker = nodemgr.ip_to_rpc(ip)
worker.check_gw('docklet-br-'+str(uid), username, uid, self.users[username].get_gateway_cidr())
worker.check_gw('docklet-br-'+str(uid), username, uid, self.users[username].get_gateway_cidr(), input_rate_limit, output_rate_limit)
del self.users[username]
return [True, 'check gw ok']

View File

@ -167,10 +167,10 @@ class userManager:
if not os.path.exists(fspath+"/global/sys/quota"):
groupfile = open(fspath+"/global/sys/quota",'w')
groups = []
groups.append({'name':'root', 'quotas':{ 'cpu':'4', 'disk':'2000', 'data':'100', 'memory':'2000', 'image':'10', 'idletime':'24', 'vnode':'8' }})
groups.append({'name':'admin', 'quotas':{'cpu':'4', 'disk':'2000', 'data':'100', 'memory':'2000', 'image':'10', 'idletime':'24', 'vnode':'8'}})
groups.append({'name':'primary', 'quotas':{'cpu':'4', 'disk':'2000', 'data':'100', 'memory':'2000', 'image':'10', 'idletime':'24', 'vnode':'8'}})
groups.append({'name':'foundation', 'quotas':{'cpu':'4', 'disk':'2000', 'data':'100', 'memory':'2000', 'image':'10', 'idletime':'24', 'vnode':'8'}})
groups.append({'name':'root', 'quotas':{ 'cpu':'4', 'disk':'2000', 'data':'100', 'memory':'2000', 'image':'10', 'idletime':'24', 'vnode':'8', 'input_rate_limit':'10000', 'output_rate_limit':'10000'}})
groups.append({'name':'admin', 'quotas':{'cpu':'4', 'disk':'2000', 'data':'100', 'memory':'2000', 'image':'10', 'idletime':'24', 'vnode':'8', 'input_rate_limit':'10000', 'output_rate_limit':'10000'}})
groups.append({'name':'primary', 'quotas':{'cpu':'4', 'disk':'2000', 'data':'100', 'memory':'2000', 'image':'10', 'idletime':'24', 'vnode':'8', 'input_rate_limit':'10000', 'output_rate_limit':'10000'}})
groups.append({'name':'foundation', 'quotas':{'cpu':'4', 'disk':'2000', 'data':'100', 'memory':'2000', 'image':'10', 'idletime':'24', 'vnode':'8', 'input_rate_limit':'10000', 'output_rate_limit':'10000'}})
groupfile.write(json.dumps(groups))
groupfile.close()
if not os.path.exists(fspath+"/global/sys/quotainfo"):
@ -185,6 +185,8 @@ class userManager:
quotas['quotainfo'].append({'name':'image', 'hint':'how many images the user can save, e.g. 10'})
quotas['quotainfo'].append({'name':'idletime', 'hint':'will stop cluster after idletime, number of hours, e.g. 24'})
quotas['quotainfo'].append({'name':'vnode', 'hint':'how many containers the user can have, e.g. 8'})
quotas['quotainfo'].append({'name':'input_rate_limit', 'hint':'the ingress speed of the network, number of kbps'})
quotas['quotainfo'].append({'name':'output_rate_limit', 'hint':'the egress speed of the network, number of kbps'})
quotafile.write(json.dumps(quotas))
quotafile.close()
if not os.path.exists(fspath+"/global/sys/lxc.default"):
@ -853,7 +855,7 @@ class userManager:
if (user_check != None and (user_check.status == "init")):
db.session.delete(user_check)
db.session.commit()
else:
else:
newuser.password = hashlib.sha512(newuser.password.encode('utf-8')).hexdigest()
db.session.add(newuser)
db.session.commit()

View File

@ -56,11 +56,23 @@ class VclusterMgr(object):
logger.info("recovering all vclusters for all users...")
usersdir = self.fspath+"/global/users/"
auth_key = env.getenv('AUTH_KEY')
res = post_to_user("/master/user/groupinfo/", {'auth_key':auth_key})
#logger.info(res)
groups = json.loads(res['groups'])
quotas = {}
for group in groups:
logger.info(group)
quotas[group['name']] = group['quotas']
for user in os.listdir(usersdir):
for cluster in self.list_clusters(user)[1]:
logger.info ("recovering cluster:%s for user:%s ..." % (cluster, user))
res = post_to_user('/user/uid/',{'username':user,'auth_key':auth_key})
self.recover_cluster(cluster, user, res['uid'])
#res = post_to_user('/user/uid/',{'username':user,'auth_key':auth_key})
recover_info = post_to_user("/master/user/recoverinfo/", {'username':user,'auth_key':auth_key})
uid = recover_info['uid']
groupname = recover_info['groupname']
input_rate_limit = quotas[groupname]['input_rate_limit']
output_rate_limit = quotas[groupname]['output_rate_limit']
self.recover_cluster(cluster, user, uid, input_rate_limit, output_rate_limit)
logger.info("recovered all vclusters for all users")
def mount_allclusters(self):
@ -98,6 +110,7 @@ class VclusterMgr(object):
workers = self.nodemgr.get_nodeips()
image_json = json.dumps(image)
groupname = json.loads(user_info)["data"]["group"]
groupquota = json.loads(user_info)["data"]["groupinfo"]
uid = json.loads(user_info)["data"]["id"]
if (len(workers) == 0):
logger.warning ("no workers to start containers, start cluster failed")
@ -106,7 +119,7 @@ class VclusterMgr(object):
if not self.networkmgr.has_user(username):
self.networkmgr.add_user(username, cidr=29, isshared = True if str(groupname) == "fundation" else False)
if self.distributedgw == "False":
[success,message] = self.networkmgr.setup_usrgw(username, uid, self.nodemgr)
[success,message] = self.networkmgr.setup_usrgw(groupquota['input_rate_limit'], groupquota['output_rate_limit'], username, uid, self.nodemgr)
if not success:
return [False, message]
elif not self.networkmgr.has_usrgw(username):
@ -132,7 +145,7 @@ class VclusterMgr(object):
workerip = workers[random.randint(0, len(workers)-1)]
oneworker = xmlrpc.client.ServerProxy("http://%s:%s" % (workerip, env.getenv("WORKER_PORT")))
if self.distributedgw == "True" and i == 0 and not self.networkmgr.has_usrgw(username):
[success,message] = self.networkmgr.setup_usrgw(username, uid, self.nodemgr, workerip)
[success,message] = self.networkmgr.setup_usrgw(groupquota['input_rate_limit'], groupquota['output_rate_limit'], username, uid, self.nodemgr, workerip)
if not success:
return [False, message]
if i == 0:
@ -495,7 +508,10 @@ class VclusterMgr(object):
else:
return True
def start_cluster(self, clustername, username, uid):
def start_cluster(self, clustername, username, user_info):
uid = user_info['data']['id']
input_rate_limit = user_info['data']['groupinfo']['input_rate_limit']
output_rate_limit = user_info['data']['groupinfo']['output_rate_limit']
[status, info] = self.get_clusterinfo(clustername, username)
if not status:
return [False, "cluster not found"]
@ -529,7 +545,7 @@ class VclusterMgr(object):
# check gateway for user
# after reboot, user gateway goes down and lose its configuration
# so, check is necessary
self.networkmgr.check_usergw(username, uid, self.nodemgr,self.distributedgw=='True')
self.networkmgr.check_usergw(input_rate_limit, output_rate_limit, username, uid, self.nodemgr,self.distributedgw=='True')
# start containers
for container in info['containers']:
# set up gre from user's gateway host to container's host.
@ -558,7 +574,7 @@ class VclusterMgr(object):
worker.mount_container(container['containername'])
return [True, "mount cluster"]
def recover_cluster(self, clustername, username, uid):
def recover_cluster(self, clustername, username, uid, input_rate_limit, output_rate_limit):
[status, info] = self.get_clusterinfo(clustername, username)
if not status:
return [False, "cluster not found"]
@ -598,7 +614,7 @@ class VclusterMgr(object):
except:
return [False, "start cluster failed with setting proxy failed"]
# need to check and recover gateway of this user
self.networkmgr.check_usergw(username, uid, self.nodemgr,self.distributedgw=='True')
self.networkmgr.check_usergw(input_rate_limit, output_rate_limit, username, uid, self.nodemgr,self.distributedgw=='True')
# recover containers of this cluster
for container in info['containers']:
# set up gre from user's gateway host to container's host.

View File

@ -154,6 +154,9 @@ class Worker(object):
ovscontrol.del_bridge(bridge)
else:
logger.error(bridges)
[success, message] = ovscontrol.destroy_all_qos()
if not success:
logger.error(message)
'''if (self.addr == self.master):
logger.info ("master also on this node. reuse master's network")
else:

View File

@ -325,15 +325,24 @@ def selfQuery_user(cur_user, user, form):
result = G_usermgr.selfQuery(cur_user = cur_user)
return json.dumps(result)
@app.route("/user/uid/", methods=['POST'])
@app.route("/master/user/recoverinfo/", methods=['POST'])
@auth_key_required
def get_userid():
def get_master_recoverinfo():
username = request.form.get("username",None)
if username is None:
return json.dumps({'success':'false', 'message':'username field is required.'})
else:
user = User.query.filter_by(username=username).first()
return json.dumps({'success':'true', 'uid':user.id})
return json.dumps({'success':'true', 'uid':user.id, 'groupname':user.user_group})
@app.route("/master/user/groupinfo/", methods=['POST'])
@auth_key_required
def get_master_groupinfo():
fspath = env.getenv('FS_PREFIX')
groupfile = open(fspath+"/global/sys/quota",'r')
groups = json.loads(groupfile.read())
groupfile.close()
return json.dumps({'success':'true', 'groups':json.dumps(groups)})
@app.route("/user/selfModify/", methods=['POST'])
@login_required
@ -470,7 +479,7 @@ def query_self_notifications_infos(cur_user, user, form):
return json.dumps(result)
@app.route("/billing/beans/", methods=['POST'])
#@auth_key_required
@auth_key_required
def billing_beans():
logger.info("handle request: /billing/beans/")
form = request.form

View File

@ -32,26 +32,32 @@
<table class="table table-bordered">
<thead>
<tr>
<th>CPU</th>
<th>Memory</th>
<th>Disk</th>
<th>Vnode</th>
<th>Image</th>
<th>Idletime</th>
{% for quotaname in quotanames %}
<th> {{ quotaname }} </th>
{% endfor %}
</tr>
</thead>
<tbody>
<tr>
{% if quotainfo['cpu'] > 1 %}
<th>{{ quotainfo['cpu'] }} Cores</th>
{% else %}
<th>{{ quotainfo['cpu'] }} Core</th>
{% endif %}
<th>{{ quotainfo['memory'] }} MB</th>
<th>{{ quotainfo['disk'] }} MB</th>
<th>{{ quotainfo['vnode'] }}</th>
<th>{{ quotainfo['image'] }}</th>
<th>{{ quotainfo['idletime'] }} hours</th>
{% for quotaname in quotanames %}
{% if quotaname == 'cpu' %}
{% if quotas['cpu'] == '1' %}
<th>{{ quotas['cpu'] }} Core</th>
{% else %}
<th>{{ quotas['cpu'] }} Cores</th>
{% endif %}
{% elif quotaname == 'memory' or quotaname == 'disk' %}
<th>{{ quotas[quotaname] }} MB</th>
{% elif quotaname == 'idletime' %}
<th>{{ quotas[quotaname] }} hours</th>
{% elif quotaname == 'input_rate_limit' or quotaname == 'output_rate_limit'%}
<th>{{ quotas[quotaname] }} kbps</th>
{% elif quotaname == 'data' %}
<th>{{ quotas[quotaname] }} GB</th>
{% else %}
<th>{{ quotas[quotaname] }}</th>
{% endif %}
{% endfor %}
</tr>
</tbody>
</table>

View File

@ -12,10 +12,13 @@ class statusView(normalView):
allclusters = dockletRequest.post_to_all('/cluster/list/')
for master in allclusters:
allclusters[master] = allclusters[master].get('clusters')
result = dockletRequest.post('/monitor/user/quotainfo/', data)
result = dockletRequest.post('/user/selfQuery/')
quotas = result['data']['groupinfo']
quotanames = quotas.keys()
'''result = dockletRequest.post('/monitor/user/quotainfo/', data)
quotainfo = result.get('quotainfo')
quotainfo['cpu'] = int(int(quotainfo['cpu']))
print(quotainfo)
print(quotainfo)'''
allcontainers = {}
if (result):
containers = {}
@ -29,7 +32,7 @@ class statusView(normalView):
else:
self.error()
allcontainers[master][cluster] = message
return self.render(self.template_path, quotainfo = quotainfo, allcontainers = allcontainers, user = session['username'])
return self.render(self.template_path, quotas = quotas, quotanames = quotanames, allcontainers = allcontainers, user = session['username'])
else:
self.error()
@ -49,7 +52,7 @@ class statusRealtimeView(normalView):
class historyView(normalView):
template_path = "monitor/history.html"
@classmethod
def get(self):
data = {
@ -64,7 +67,7 @@ class historyView(normalView):
class historyVNodeView(normalView):
template_path = "monitor/historyVNode.html"
vnode_name = ""
@classmethod
def get(self):
masterip = self.masterip