Merge pull request #287 from FirmlyReality/threading

Threading
This commit is contained in:
Yujian Zhu 2018-01-21 18:46:27 +08:00 committed by GitHub
commit 5877440f3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 82 additions and 6 deletions

View File

@ -23,7 +23,7 @@ import os
import http.server, cgi, json, sys, shutil import http.server, cgi, json, sys, shutil
import xmlrpc.client import xmlrpc.client
from socketserver import ThreadingMixIn from socketserver import ThreadingMixIn
import nodemgr, vclustermgr, etcdlib, network, imagemgr, notificationmgr import nodemgr, vclustermgr, etcdlib, network, imagemgr, notificationmgr, lockmgr
from logs import logs from logs import logs
import userManager,beansapplicationmgr import userManager,beansapplicationmgr
import monitor,traceback import monitor,traceback
@ -117,9 +117,11 @@ def logs_get(user, beans, form):
@beans_check @beans_check
def create_cluster(user, beans, form): def create_cluster(user, beans, form):
global G_vclustermgr global G_vclustermgr
global G_ulockmgr
clustername = form.get('clustername', None) clustername = form.get('clustername', None)
if (clustername == None): if (clustername == None):
return json.dumps({'success':'false', 'message':'clustername is null'}) return json.dumps({'success':'false', 'message':'clustername is null'})
G_ulockmgr.acquire(user)
image = {} image = {}
image['name'] = form.get("imagename", None) image['name'] = form.get("imagename", None)
image['type'] = form.get("imagetype", None) image['type'] = form.get("imagetype", None)
@ -136,8 +138,10 @@ def create_cluster(user, beans, form):
status = res.get('success') status = res.get('success')
result = res.get('result') result = res.get('result')
if not status: if not status:
G_ulockmgr.release(user)
return json.dumps({'success':'false', 'action':'create cluster', 'message':result}) return json.dumps({'success':'false', 'action':'create cluster', 'message':result})
[status, result] = G_vclustermgr.create_cluster(clustername, user, image, user_info, setting) [status, result] = G_vclustermgr.create_cluster(clustername, user, image, user_info, setting)
G_ulockmgr.release(user)
if status: if status:
return json.dumps({'success':'true', 'action':'create cluster', 'message':result}) return json.dumps({'success':'true', 'action':'create cluster', 'message':result})
else: else:
@ -149,10 +153,12 @@ def create_cluster(user, beans, form):
@beans_check @beans_check
def scaleout_cluster(user, beans, form): def scaleout_cluster(user, beans, form):
global G_vclustermgr global G_vclustermgr
global G_ulockmgr
clustername = form.get('clustername', None) clustername = form.get('clustername', None)
logger.info ("scaleout: %s" % form) logger.info ("scaleout: %s" % form)
if (clustername == None): if (clustername == None):
return json.dumps({'success':'false', 'message':'clustername is null'}) return json.dumps({'success':'false', 'message':'clustername is null'})
G_ulockmgr.acquire(user)
logger.info("handle request : scale out %s" % clustername) logger.info("handle request : scale out %s" % clustername)
image = {} image = {}
image['name'] = form.get("imagename", None) image['name'] = form.get("imagename", None)
@ -169,8 +175,10 @@ def scaleout_cluster(user, beans, form):
status = res.get('success') status = res.get('success')
result = res.get('result') result = res.get('result')
if not status: if not status:
G_ulockmgr.release(user)
return json.dumps({'success':'false', 'action':'scale out', 'message': result}) return json.dumps({'success':'false', 'action':'scale out', 'message': result})
[status, result] = G_vclustermgr.scale_out_cluster(clustername, user, image, user_info, setting) [status, result] = G_vclustermgr.scale_out_cluster(clustername, user, image, user_info, setting)
G_ulockmgr.release(user)
if status: if status:
return json.dumps({'success':'true', 'action':'scale out', 'message':result}) return json.dumps({'success':'true', 'action':'scale out', 'message':result})
else: else:
@ -181,15 +189,18 @@ def scaleout_cluster(user, beans, form):
@login_required @login_required
def scalein_cluster(user, beans, form): def scalein_cluster(user, beans, form):
global G_vclustermgr global G_vclustermgr
global G_ulockmgr
clustername = form.get('clustername', None) clustername = form.get('clustername', None)
if (clustername == None): if (clustername == None):
return json.dumps({'success':'false', 'message':'clustername is null'}) return json.dumps({'success':'false', 'message':'clustername is null'})
G_ulockmgr.acquire(user)
logger.info("handle request : scale in %s" % clustername) logger.info("handle request : scale in %s" % clustername)
containername = form.get("containername", None) containername = form.get("containername", None)
[status, usage_info] = G_vclustermgr.get_clustersetting(clustername, user, containername, False) [status, usage_info] = G_vclustermgr.get_clustersetting(clustername, user, containername, False)
if status: if status:
post_to_user("/user/usageRelease/", {'token':form.get('token'), 'cpu':usage_info['cpu'], 'memory':usage_info['memory'],'disk':usage_info['disk']}) post_to_user("/user/usageRelease/", {'token':form.get('token'), 'cpu':usage_info['cpu'], 'memory':usage_info['memory'],'disk':usage_info['disk']})
[status, result] = G_vclustermgr.scale_in_cluster(clustername, user, containername) [status, result] = G_vclustermgr.scale_in_cluster(clustername, user, containername)
G_ulockmgr.release(user)
if status: if status:
return json.dumps({'success':'true', 'action':'scale in', 'message':result}) return json.dumps({'success':'true', 'action':'scale in', 'message':result})
else: else:
@ -200,12 +211,15 @@ def scalein_cluster(user, beans, form):
@beans_check @beans_check
def start_cluster(user, beans, form): def start_cluster(user, beans, form):
global G_vclustermgr global G_vclustermgr
global G_ulockmgr
clustername = form.get('clustername', None) clustername = form.get('clustername', None)
if (clustername == None): if (clustername == None):
return json.dumps({'success':'false', 'message':'clustername is null'}) return json.dumps({'success':'false', 'message':'clustername is null'})
G_ulockmgr.acquire(user)
user_info = post_to_user("/user/selfQuery/", {'token':form.get("token")}) user_info = post_to_user("/user/selfQuery/", {'token':form.get("token")})
logger.info ("handle request : start cluster %s" % clustername) logger.info ("handle request : start cluster %s" % clustername)
[status, result] = G_vclustermgr.start_cluster(clustername, user, user_info) [status, result] = G_vclustermgr.start_cluster(clustername, user, user_info)
G_ulockmgr.release(user)
if status: if status:
return json.dumps({'success':'true', 'action':'start cluster', 'message':result}) return json.dumps({'success':'true', 'action':'start cluster', 'message':result})
else: else:
@ -215,11 +229,14 @@ def start_cluster(user, beans, form):
@login_required @login_required
def stop_cluster(user, beans, form): def stop_cluster(user, beans, form):
global G_vclustermgr global G_vclustermgr
global G_ulockmgr
clustername = form.get('clustername', None) clustername = form.get('clustername', None)
if (clustername == None): if (clustername == None):
return json.dumps({'success':'false', 'message':'clustername is null'}) return json.dumps({'success':'false', 'message':'clustername is null'})
G_ulockmgr.acquire(user)
logger.info ("handle request : stop cluster %s" % clustername) logger.info ("handle request : stop cluster %s" % clustername)
[status, result] = G_vclustermgr.stop_cluster(clustername, user) [status, result] = G_vclustermgr.stop_cluster(clustername, user)
G_ulockmgr.release(user)
if status: if status:
return json.dumps({'success':'true', 'action':'stop cluster', 'message':result}) return json.dumps({'success':'true', 'action':'stop cluster', 'message':result})
else: else:
@ -229,9 +246,11 @@ def stop_cluster(user, beans, form):
@login_required @login_required
def delete_cluster(user, beans, form): def delete_cluster(user, beans, form):
global G_vclustermgr global G_vclustermgr
global G_ulockmgr
clustername = form.get('clustername', None) clustername = form.get('clustername', None)
if (clustername == None): if (clustername == None):
return json.dumps({'success':'false', 'message':'clustername is null'}) return json.dumps({'success':'false', 'message':'clustername is null'})
G_ulockmgr.acquire(user)
logger.info ("handle request : delete cluster %s" % clustername) logger.info ("handle request : delete cluster %s" % clustername)
user_info = post_to_user("/user/selfQuery/" , {'token':form.get("token")}) user_info = post_to_user("/user/selfQuery/" , {'token':form.get("token")})
user_info = json.dumps(user_info) user_info = json.dumps(user_info)
@ -239,6 +258,7 @@ def delete_cluster(user, beans, form):
if status: if status:
post_to_user("/user/usageRelease/", {'token':form.get('token'), 'cpu':usage_info['cpu'], 'memory':usage_info['memory'],'disk':usage_info['disk']}) post_to_user("/user/usageRelease/", {'token':form.get('token'), 'cpu':usage_info['cpu'], 'memory':usage_info['memory'],'disk':usage_info['disk']})
[status, result] = G_vclustermgr.delete_cluster(clustername, user, user_info) [status, result] = G_vclustermgr.delete_cluster(clustername, user, user_info)
G_ulockmgr.release(user)
if status: if status:
return json.dumps({'success':'true', 'action':'delete cluster', 'message':result}) return json.dumps({'success':'true', 'action':'delete cluster', 'message':result})
else: else:
@ -271,7 +291,7 @@ def list_cluster(user, beans, form):
return json.dumps({'success':'false', 'action':'list cluster', 'message':clusterlist}) return json.dumps({'success':'false', 'action':'list cluster', 'message':clusterlist})
@app.route("/cluster/stopall/",methods=['POST']) @app.route("/cluster/stopall/",methods=['POST'])
#@auth_key_required @auth_key_required
def stopall_cluster(): def stopall_cluster():
global G_vclustermgr global G_vclustermgr
user = request.form.get('username',None) user = request.form.get('username',None)
@ -689,6 +709,7 @@ if __name__ == '__main__':
global G_sysmgr global G_sysmgr
global G_historymgr global G_historymgr
global G_applicationmgr global G_applicationmgr
global G_ulockmgr
# move 'tools.loadenv' to the beginning of this file # move 'tools.loadenv' to the beginning of this file
fs_path = env.getenv("FS_PREFIX") fs_path = env.getenv("FS_PREFIX")
@ -770,6 +791,7 @@ if __name__ == '__main__':
#init portcontrol #init portcontrol
portcontrol.init_new() portcontrol.init_new()
G_ulockmgr = lockmgr.LockMgr()
clusternet = env.getenv("CLUSTER_NET") clusternet = env.getenv("CLUSTER_NET")
logger.info("using CLUSTER_NET %s" % clusternet) logger.info("using CLUSTER_NET %s" % clusternet)

33
src/lockmgr.py Normal file
View File

@ -0,0 +1,33 @@
#!/usr/bin/python3
'''
This module is the manager of threadings locks.
A LockMgr manages multiple threadings locks.
'''
import threading
class LockMgr:
def __init__(self):
# self.locks will store multiple locks by their names.
self.locks = {}
# the lock of self.locks, is to ensure that only one thread can update it at the same time
self.locks_lock = threading.Lock()
# acquire a lock by its name
def acquire(self, lock_name):
self.locks_lock.acquire()
if lock_name not in self.locks.keys():
self.locks[lock_name] = threading.Lock()
self.locks_lock.release()
self.locks[lock_name].acquire()
return
# release a lock by its name
def release(self, lock_name):
if lock_name not in self.locks.keys():
return
self.locks[lock_name].release()
return

View File

@ -1,6 +1,6 @@
#!/usr/bin/python3 #!/usr/bin/python3
import subprocess,env import subprocess, env, threading
from log import logger from log import logger
class ipcontrol(object): class ipcontrol(object):
@ -363,6 +363,7 @@ class netcontrol(object):
free_ports = [False]*65536 free_ports = [False]*65536
allocated_ports = {} allocated_ports = {}
ports_lock = threading.Lock()
class portcontrol(object): class portcontrol(object):
@ -396,13 +397,17 @@ class portcontrol(object):
def acquire_port_mapping(container_name, container_ip, container_port, host_port=None): def acquire_port_mapping(container_name, container_ip, container_port, host_port=None):
global free_ports global free_ports
global allocated_ports global allocated_ports
global ports_lock
ports_lock.acquire()
# if container_name in allocated_ports.keys(): # if container_name in allocated_ports.keys():
# return [False, "This container already has a port mapping."] # return [False, "This container already has a port mapping."]
if container_name not in allocated_ports.keys(): if container_name not in allocated_ports.keys():
allocated_ports[container_name] = {} allocated_ports[container_name] = {}
elif container_port in allocated_ports[container_name].keys(): elif container_port in allocated_ports[container_name].keys():
return [False, "This container already has a port mapping."] ports_lock.release()
return [False, "This container port already has a port mapping."]
if container_name == "" or container_ip == "" or container_port == "": if container_name == "" or container_ip == "" or container_port == "":
ports_lock.release()
return [False, "Node Name or Node IP or Node Port can't be null."] return [False, "Node Name or Node IP or Node Port can't be null."]
#print("acquire_port_mapping1") #print("acquire_port_mapping1")
free_port = 1 free_port = 1
@ -416,10 +421,12 @@ class portcontrol(object):
break break
free_port += 1 free_port += 1
if free_port == 65536: if free_port == 65536:
ports_lock.release()
return [False, "No free ports."] return [False, "No free ports."]
free_ports[free_port] = False free_ports[free_port] = False
allocated_ports[container_name][container_port] = free_port allocated_ports[container_name][container_port] = free_port
public_ip = env.getenv("PUBLIC_IP") public_ip = env.getenv("PUBLIC_IP")
ports_lock.release()
try: try:
subprocess.run(['iptables','-t','nat','-A','PREROUTING','-p','tcp','--dport',str(free_port),"-j","DNAT",'--to-destination','%s:%s'%(container_ip,container_port)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) subprocess.run(['iptables','-t','nat','-A','PREROUTING','-p','tcp','--dport',str(free_port),"-j","DNAT",'--to-destination','%s:%s'%(container_ip,container_port)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, str(free_port)] return [True, str(free_port)]
@ -430,6 +437,7 @@ class portcontrol(object):
def release_port_mapping(container_name, container_ip, container_port): def release_port_mapping(container_name, container_ip, container_port):
global free_ports global free_ports
global allocated_ports global allocated_ports
global ports_lock
if container_name not in allocated_ports.keys(): if container_name not in allocated_ports.keys():
return [False, "This container does not have a port mapping."] return [False, "This container does not have a port mapping."]
free_port = allocated_ports[container_name][container_port] free_port = allocated_ports[container_name][container_port]
@ -438,6 +446,8 @@ class portcontrol(object):
subprocess.run(['iptables','-t','nat','-D','PREROUTING','-p','tcp','--dport',str(free_port),"-j","DNAT",'--to-destination','%s:%s'%(container_ip,container_port)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) subprocess.run(['iptables','-t','nat','-D','PREROUTING','-p','tcp','--dport',str(free_port),"-j","DNAT",'--to-destination','%s:%s'%(container_ip,container_port)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
except subprocess.CalledProcessError as suberror: except subprocess.CalledProcessError as suberror:
return [False, "release port mapping failed : %s" % suberror.stdout.decode('utf-8')] return [False, "release port mapping failed : %s" % suberror.stdout.decode('utf-8')]
ports_lock.acquire()
free_ports[free_port] = True free_ports[free_port] = True
allocated_ports[container_name].pop(container_port) allocated_ports[container_name].pop(container_port)
ports_lock.release()
return [True, ""] return [True, ""]

View File

@ -279,6 +279,7 @@ class NetworkMgr(object):
def __init__(self, addr_cidr, etcdclient, mode, masterip): def __init__(self, addr_cidr, etcdclient, mode, masterip):
self.etcd = etcdclient self.etcd = etcdclient
self.masterip = masterip self.masterip = masterip
self.user_locks = threading.Lock()
if mode == 'new': if mode == 'new':
logger.info("init network manager with %s" % addr_cidr) logger.info("init network manager with %s" % addr_cidr)
self.center = IntervalPool(addr_cidr=addr_cidr) self.center = IntervalPool(addr_cidr=addr_cidr)
@ -478,11 +479,14 @@ class NetworkMgr(object):
def add_user(self, username, cidr, isshared = False): def add_user(self, username, cidr, isshared = False):
logger.info ("add user %s with cidr=%s" % (username, str(cidr))) logger.info ("add user %s with cidr=%s" % (username, str(cidr)))
self.user_locks.acquire()
if self.has_user(username): if self.has_user(username):
self.user_locks.release()
return [False, "user already exists in users set"] return [False, "user already exists in users set"]
[status, result] = self.center.allocate(cidr) [status, result] = self.center.allocate(cidr)
self.dump_center() self.dump_center()
if status == False: if status == False:
self.user_locks.release()
return [False, result] return [False, result]
'''[status, vlanid] = self.acquire_vlanid(isshared) '''[status, vlanid] = self.acquire_vlanid(isshared)
if status: if status:
@ -496,6 +500,7 @@ class NetworkMgr(object):
#netcontrol.setup_gw('docklet-br', username, self.users[username].get_gateway_cidr(), str(vlanid)) #netcontrol.setup_gw('docklet-br', username, self.users[username].get_gateway_cidr(), str(vlanid))
self.dump_user(username) self.dump_user(username)
del self.users[username] del self.users[username]
self.user_locks.release()
return [True, 'add user success'] return [True, 'add user success']
def del_usrgwbr(self, username, uid, nodemgr): def del_usrgwbr(self, username, uid, nodemgr):
@ -515,7 +520,9 @@ class NetworkMgr(object):
return [True, 'delete user\' gateway success'] return [True, 'delete user\' gateway success']
def del_user(self, username): def del_user(self, username):
self.user_locks.acquire()
if not self.has_user(username): if not self.has_user(username):
self.user_locks.release()
return [False, username+" not in users set"] return [False, username+" not in users set"]
self.load_user(username) self.load_user(username)
[addr, cidr] = self.users[username].info.split('/') [addr, cidr] = self.users[username].info.split('/')
@ -527,6 +534,7 @@ class NetworkMgr(object):
#netcontrol.del_gw('docklet-br', username) #netcontrol.del_gw('docklet-br', username)
self.etcd.deldir("network/users/"+username) self.etcd.deldir("network/users/"+username)
del self.users[username] del self.users[username]
self.user_locks.release()
return [True, 'delete user success'] return [True, 'delete user success']
def check_usergw(self, input_rate_limit, output_rate_limit, username, uid, nodemgr, distributedgw=False): def check_usergw(self, input_rate_limit, output_rate_limit, username, uid, nodemgr, distributedgw=False):

View File

@ -7,7 +7,7 @@ import xmlrpc.client
from log import logger from log import logger
import env import env
import proxytool import proxytool
import requests import requests, threading
import traceback import traceback
from nettools import portcontrol from nettools import portcontrol
@ -32,6 +32,7 @@ class VclusterMgr(object):
self.etcd = etcdclient self.etcd = etcdclient
self.defaultsize = env.getenv("CLUSTER_SIZE") self.defaultsize = env.getenv("CLUSTER_SIZE")
self.fspath = env.getenv("FS_PREFIX") self.fspath = env.getenv("FS_PREFIX")
self.clusterid_locks = threading.Lock()
logger.info ("vcluster start on %s" % (self.addr)) logger.info ("vcluster start on %s" % (self.addr))
if self.mode == 'new': if self.mode == 'new':
@ -326,7 +327,7 @@ class VclusterMgr(object):
clusterfile.write(json.dumps(clusterinfo)) clusterfile.write(json.dumps(clusterinfo))
clusterfile.close() clusterfile.close()
else: else:
return [False,"No port mapping."] return [True,"No port mapping."]
if error_msg is not None: if error_msg is not None:
return [False,error_msg] return [False,error_msg]
else: else:
@ -778,6 +779,8 @@ class VclusterMgr(object):
# acquire cluster id from etcd # acquire cluster id from etcd
def _acquire_id(self): def _acquire_id(self):
self.clusterid_locks.acquire()
clusterid = self.etcd.getkey("vcluster/nextid")[1] clusterid = self.etcd.getkey("vcluster/nextid")[1]
self.etcd.setkey("vcluster/nextid", str(int(clusterid)+1)) self.etcd.setkey("vcluster/nextid", str(int(clusterid)+1))
self.clusterid_locks.release()
return int(clusterid) return int(clusterid)