diff --git a/src/httprest.py b/src/httprest.py index 9b5764c..bbb3dcf 100755 --- a/src/httprest.py +++ b/src/httprest.py @@ -23,7 +23,7 @@ import os import http.server, cgi, json, sys, shutil import xmlrpc.client from socketserver import ThreadingMixIn -import nodemgr, vclustermgr, etcdlib, network, imagemgr, notificationmgr +import nodemgr, vclustermgr, etcdlib, network, imagemgr, notificationmgr, lockmgr from logs import logs import userManager,beansapplicationmgr import monitor,traceback @@ -117,9 +117,11 @@ def logs_get(user, beans, form): @beans_check def create_cluster(user, beans, form): global G_vclustermgr + global G_ulockmgr clustername = form.get('clustername', None) if (clustername == None): return json.dumps({'success':'false', 'message':'clustername is null'}) + G_ulockmgr.acquire(user) image = {} image['name'] = form.get("imagename", None) image['type'] = form.get("imagetype", None) @@ -136,8 +138,10 @@ def create_cluster(user, beans, form): status = res.get('success') result = res.get('result') if not status: + G_ulockmgr.release(user) return json.dumps({'success':'false', 'action':'create cluster', 'message':result}) [status, result] = G_vclustermgr.create_cluster(clustername, user, image, user_info, setting) + G_ulockmgr.release(user) if status: return json.dumps({'success':'true', 'action':'create cluster', 'message':result}) else: @@ -149,10 +153,12 @@ def create_cluster(user, beans, form): @beans_check def scaleout_cluster(user, beans, form): global G_vclustermgr + global G_ulockmgr clustername = form.get('clustername', None) logger.info ("scaleout: %s" % form) if (clustername == None): return json.dumps({'success':'false', 'message':'clustername is null'}) + G_ulockmgr.acquire(user) logger.info("handle request : scale out %s" % clustername) image = {} image['name'] = form.get("imagename", None) @@ -169,8 +175,10 @@ def scaleout_cluster(user, beans, form): status = res.get('success') result = res.get('result') if not status: + G_ulockmgr.release(user) return json.dumps({'success':'false', 'action':'scale out', 'message': result}) [status, result] = G_vclustermgr.scale_out_cluster(clustername, user, image, user_info, setting) + G_ulockmgr.release(user) if status: return json.dumps({'success':'true', 'action':'scale out', 'message':result}) else: @@ -181,15 +189,18 @@ def scaleout_cluster(user, beans, form): @login_required def scalein_cluster(user, beans, form): global G_vclustermgr + global G_ulockmgr clustername = form.get('clustername', None) if (clustername == None): return json.dumps({'success':'false', 'message':'clustername is null'}) + G_ulockmgr.acquire(user) logger.info("handle request : scale in %s" % clustername) containername = form.get("containername", None) [status, usage_info] = G_vclustermgr.get_clustersetting(clustername, user, containername, False) if status: post_to_user("/user/usageRelease/", {'token':form.get('token'), 'cpu':usage_info['cpu'], 'memory':usage_info['memory'],'disk':usage_info['disk']}) [status, result] = G_vclustermgr.scale_in_cluster(clustername, user, containername) + G_ulockmgr.release(user) if status: return json.dumps({'success':'true', 'action':'scale in', 'message':result}) else: @@ -200,12 +211,15 @@ def scalein_cluster(user, beans, form): @beans_check def start_cluster(user, beans, form): global G_vclustermgr + global G_ulockmgr clustername = form.get('clustername', None) if (clustername == None): return json.dumps({'success':'false', 'message':'clustername is null'}) + G_ulockmgr.acquire(user) user_info = post_to_user("/user/selfQuery/", {'token':form.get("token")}) logger.info ("handle request : start cluster %s" % clustername) [status, result] = G_vclustermgr.start_cluster(clustername, user, user_info) + G_ulockmgr.release(user) if status: return json.dumps({'success':'true', 'action':'start cluster', 'message':result}) else: @@ -215,11 +229,14 @@ def start_cluster(user, beans, form): @login_required def stop_cluster(user, beans, form): global G_vclustermgr + global G_ulockmgr clustername = form.get('clustername', None) if (clustername == None): return json.dumps({'success':'false', 'message':'clustername is null'}) + G_ulockmgr.acquire(user) logger.info ("handle request : stop cluster %s" % clustername) [status, result] = G_vclustermgr.stop_cluster(clustername, user) + G_ulockmgr.release(user) if status: return json.dumps({'success':'true', 'action':'stop cluster', 'message':result}) else: @@ -229,9 +246,11 @@ def stop_cluster(user, beans, form): @login_required def delete_cluster(user, beans, form): global G_vclustermgr + global G_ulockmgr clustername = form.get('clustername', None) if (clustername == None): return json.dumps({'success':'false', 'message':'clustername is null'}) + G_ulockmgr.acquire(user) logger.info ("handle request : delete cluster %s" % clustername) user_info = post_to_user("/user/selfQuery/" , {'token':form.get("token")}) user_info = json.dumps(user_info) @@ -239,6 +258,7 @@ def delete_cluster(user, beans, form): if status: post_to_user("/user/usageRelease/", {'token':form.get('token'), 'cpu':usage_info['cpu'], 'memory':usage_info['memory'],'disk':usage_info['disk']}) [status, result] = G_vclustermgr.delete_cluster(clustername, user, user_info) + G_ulockmgr.release(user) if status: return json.dumps({'success':'true', 'action':'delete cluster', 'message':result}) else: @@ -271,7 +291,7 @@ def list_cluster(user, beans, form): return json.dumps({'success':'false', 'action':'list cluster', 'message':clusterlist}) @app.route("/cluster/stopall/",methods=['POST']) -#@auth_key_required +@auth_key_required def stopall_cluster(): global G_vclustermgr user = request.form.get('username',None) @@ -689,6 +709,7 @@ if __name__ == '__main__': global G_sysmgr global G_historymgr global G_applicationmgr + global G_ulockmgr # move 'tools.loadenv' to the beginning of this file fs_path = env.getenv("FS_PREFIX") @@ -770,6 +791,7 @@ if __name__ == '__main__': #init portcontrol portcontrol.init_new() + G_ulockmgr = lockmgr.LockMgr() clusternet = env.getenv("CLUSTER_NET") logger.info("using CLUSTER_NET %s" % clusternet) diff --git a/src/lockmgr.py b/src/lockmgr.py new file mode 100644 index 0000000..3c887ab --- /dev/null +++ b/src/lockmgr.py @@ -0,0 +1,33 @@ +#!/usr/bin/python3 + +''' +This module is the manager of threadings locks. +A LockMgr manages multiple threadings locks. +''' + +import threading + + +class LockMgr: + + def __init__(self): + # self.locks will store multiple locks by their names. + self.locks = {} + # the lock of self.locks, is to ensure that only one thread can update it at the same time + self.locks_lock = threading.Lock() + + # acquire a lock by its name + def acquire(self, lock_name): + self.locks_lock.acquire() + if lock_name not in self.locks.keys(): + self.locks[lock_name] = threading.Lock() + self.locks_lock.release() + self.locks[lock_name].acquire() + return + + # release a lock by its name + def release(self, lock_name): + if lock_name not in self.locks.keys(): + return + self.locks[lock_name].release() + return diff --git a/src/nettools.py b/src/nettools.py index 7827674..ee5f8ba 100755 --- a/src/nettools.py +++ b/src/nettools.py @@ -1,6 +1,6 @@ #!/usr/bin/python3 -import subprocess,env +import subprocess, env, threading from log import logger class ipcontrol(object): @@ -363,6 +363,7 @@ class netcontrol(object): free_ports = [False]*65536 allocated_ports = {} +ports_lock = threading.Lock() class portcontrol(object): @@ -396,13 +397,17 @@ class portcontrol(object): def acquire_port_mapping(container_name, container_ip, container_port, host_port=None): global free_ports global allocated_ports + global ports_lock + ports_lock.acquire() # if container_name in allocated_ports.keys(): # return [False, "This container already has a port mapping."] if container_name not in allocated_ports.keys(): allocated_ports[container_name] = {} elif container_port in allocated_ports[container_name].keys(): - return [False, "This container already has a port mapping."] + ports_lock.release() + return [False, "This container port already has a port mapping."] if container_name == "" or container_ip == "" or container_port == "": + ports_lock.release() return [False, "Node Name or Node IP or Node Port can't be null."] #print("acquire_port_mapping1") free_port = 1 @@ -416,10 +421,12 @@ class portcontrol(object): break free_port += 1 if free_port == 65536: + ports_lock.release() return [False, "No free ports."] free_ports[free_port] = False allocated_ports[container_name][container_port] = free_port public_ip = env.getenv("PUBLIC_IP") + ports_lock.release() try: subprocess.run(['iptables','-t','nat','-A','PREROUTING','-p','tcp','--dport',str(free_port),"-j","DNAT",'--to-destination','%s:%s'%(container_ip,container_port)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return [True, str(free_port)] @@ -430,6 +437,7 @@ class portcontrol(object): def release_port_mapping(container_name, container_ip, container_port): global free_ports global allocated_ports + global ports_lock if container_name not in allocated_ports.keys(): return [False, "This container does not have a port mapping."] free_port = allocated_ports[container_name][container_port] @@ -438,6 +446,8 @@ class portcontrol(object): subprocess.run(['iptables','-t','nat','-D','PREROUTING','-p','tcp','--dport',str(free_port),"-j","DNAT",'--to-destination','%s:%s'%(container_ip,container_port)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) except subprocess.CalledProcessError as suberror: return [False, "release port mapping failed : %s" % suberror.stdout.decode('utf-8')] + ports_lock.acquire() free_ports[free_port] = True allocated_ports[container_name].pop(container_port) + ports_lock.release() return [True, ""] diff --git a/src/network.py b/src/network.py index db233b0..c6ab468 100755 --- a/src/network.py +++ b/src/network.py @@ -279,6 +279,7 @@ class NetworkMgr(object): def __init__(self, addr_cidr, etcdclient, mode, masterip): self.etcd = etcdclient self.masterip = masterip + self.user_locks = threading.Lock() if mode == 'new': logger.info("init network manager with %s" % addr_cidr) self.center = IntervalPool(addr_cidr=addr_cidr) @@ -478,11 +479,14 @@ class NetworkMgr(object): def add_user(self, username, cidr, isshared = False): logger.info ("add user %s with cidr=%s" % (username, str(cidr))) + self.user_locks.acquire() if self.has_user(username): + self.user_locks.release() return [False, "user already exists in users set"] [status, result] = self.center.allocate(cidr) self.dump_center() if status == False: + self.user_locks.release() return [False, result] '''[status, vlanid] = self.acquire_vlanid(isshared) if status: @@ -496,6 +500,7 @@ class NetworkMgr(object): #netcontrol.setup_gw('docklet-br', username, self.users[username].get_gateway_cidr(), str(vlanid)) self.dump_user(username) del self.users[username] + self.user_locks.release() return [True, 'add user success'] def del_usrgwbr(self, username, uid, nodemgr): @@ -515,7 +520,9 @@ class NetworkMgr(object): return [True, 'delete user\' gateway success'] def del_user(self, username): + self.user_locks.acquire() if not self.has_user(username): + self.user_locks.release() return [False, username+" not in users set"] self.load_user(username) [addr, cidr] = self.users[username].info.split('/') @@ -527,6 +534,7 @@ class NetworkMgr(object): #netcontrol.del_gw('docklet-br', username) self.etcd.deldir("network/users/"+username) del self.users[username] + self.user_locks.release() return [True, 'delete user success'] def check_usergw(self, input_rate_limit, output_rate_limit, username, uid, nodemgr, distributedgw=False): diff --git a/src/vclustermgr.py b/src/vclustermgr.py index 6fe09fa..aa77e33 100755 --- a/src/vclustermgr.py +++ b/src/vclustermgr.py @@ -7,7 +7,7 @@ import xmlrpc.client from log import logger import env import proxytool -import requests +import requests, threading import traceback from nettools import portcontrol @@ -32,6 +32,7 @@ class VclusterMgr(object): self.etcd = etcdclient self.defaultsize = env.getenv("CLUSTER_SIZE") self.fspath = env.getenv("FS_PREFIX") + self.clusterid_locks = threading.Lock() logger.info ("vcluster start on %s" % (self.addr)) if self.mode == 'new': @@ -326,7 +327,7 @@ class VclusterMgr(object): clusterfile.write(json.dumps(clusterinfo)) clusterfile.close() else: - return [False,"No port mapping."] + return [True,"No port mapping."] if error_msg is not None: return [False,error_msg] else: @@ -778,6 +779,8 @@ class VclusterMgr(object): # acquire cluster id from etcd def _acquire_id(self): + self.clusterid_locks.acquire() clusterid = self.etcd.getkey("vcluster/nextid")[1] self.etcd.setkey("vcluster/nextid", str(int(clusterid)+1)) + self.clusterid_locks.release() return int(clusterid)