Merge remote-tracking branch 'upstream/master'

This commit is contained in:
zhongyehong 2016-05-09 12:49:19 +08:00
commit 1bdd11ea5a
7 changed files with 241 additions and 151 deletions

View File

@ -365,7 +365,7 @@ def hosts_monitor(cur_user, user, form, com_id, issue):
logger.info("handle request: monitor/hosts")
res = {}
fetcher = monitor.Fetcher(etcdaddr,G_clustername,com_id)
fetcher = monitor.Fetcher(com_id)
if issue == 'meminfo':
res['meminfo'] = fetcher.get_meminfo()
elif issue == 'cpuinfo':
@ -404,7 +404,7 @@ def vnodes_monitor(cur_user, user, form, con_id, issue):
global G_clustername
logger.info("handle request: monitor/vnodes")
res = {}
fetcher = monitor.Container_Fetcher(etcdaddr,G_clustername)
fetcher = monitor.Container_Fetcher()
if issue == 'cpu_use':
res['cpu_use'] = fetcher.get_cpu_use(con_id)
elif issue == 'mem_use':
@ -700,6 +700,8 @@ if __name__ == '__main__':
Guest_control = guest_control.Guest(G_vclustermgr,G_nodemgr)
logger.info("guest control started")
threading.Thread(target=Guest_control.work, args=()).start()
master_collector = monitor.Master_Collector(G_nodemgr)
master_collector.start()
logger.info("startting to listen on: ")
masterip = env.getenv('MASTER_IP')

View File

@ -5,13 +5,17 @@ import time,threading,json,traceback,platform
from log import logger
monitor_hosts = {}
monitor_vnodes = {}
workerinfo = {}
workercinfo = {}
class Container_Collector(threading.Thread):
def __init__(self,etcdaddr,cluster_name,host,test=False):
def __init__(self,test=False):
threading.Thread.__init__(self)
self.thread_stop = False
self.host = host
self.etcdser = etcdlib.Client(etcdaddr,"/%s/monitor" % (cluster_name))
self.interval = 2
self.test = test
self.cpu_last = {}
@ -27,6 +31,7 @@ class Container_Collector(threading.Thread):
return containers
def collect_containerinfo(self,container_name):
global workercinfo
output = subprocess.check_output("sudo lxc-info -n %s" % (container_name),shell=True)
output = output.decode('utf-8')
parts = re.split('\n',output)
@ -41,11 +46,11 @@ class Container_Collector(threading.Thread):
basic_info['Name'] = info['Name']
basic_info['State'] = info['State']
if(info['State'] == 'STOPPED'):
self.etcdser.setkey('/vnodes/%s/basic_info'%(container_name), basic_info)
workercinfo[container_name]['basic_info'] = basic_info
return False
basic_info['PID'] = info['PID']
basic_info['IP'] = info['IP']
self.etcdser.setkey('/vnodes/%s/basic_info'%(container_name), basic_info)
workercinfo[container_name]['basic_info'] = basic_info
cpu_parts = re.split(' +',info['CPU use'])
cpu_val = cpu_parts[0].strip()
@ -69,7 +74,7 @@ class Container_Collector(threading.Thread):
self.cpu_quota[container_name] = tmp/100000.0
quota = {'cpu':self.cpu_quota[container_name],'memory':self.mem_quota[container_name]}
#logger.info(quota)
self.etcdser.setkey('/vnodes/%s/quota'%(container_name),quota)
workercinfo[container_name]['quota'] = quota
else:
logger.error("Cant't find config file %s"%(confpath))
return False
@ -82,7 +87,7 @@ class Container_Collector(threading.Thread):
cpu_usedp = 1
cpu_use['usedp'] = cpu_usedp
self.cpu_last[container_name] = cpu_val;
self.etcdser.setkey('/vnodes/%s/cpu_use'%(container_name), cpu_use)
workercinfo[container_name]['cpu_use'] = cpu_use
mem_parts = re.split(' +',info['Memory use'])
mem_val = mem_parts[0].strip()
@ -96,12 +101,14 @@ class Container_Collector(threading.Thread):
mem_val = float(mem_val) * 1024 * 1024
mem_usedp = float(mem_val) / self.mem_quota[container_name]
mem_use['usedp'] = mem_usedp
self.etcdser.setkey('/vnodes/%s/mem_use'%(container_name), mem_use)
workercinfo[container_name]['mem_use'] = mem_use
#print(output)
#print(parts)
return True
def run(self):
global workercinfo
global workerinfo
cnt = 0
while not self.thread_stop:
containers = self.list_container()
@ -110,8 +117,11 @@ class Container_Collector(threading.Thread):
for container in containers:
if not container == '':
conlist.append(container)
if not container in workercinfo.keys():
workercinfo[container] = {}
try:
if(self.collect_containerinfo(container)):
success= self.collect_containerinfo(container)
if(success):
countR += 1
except Exception as err:
logger.warning(traceback.format_exc())
@ -120,10 +130,10 @@ class Container_Collector(threading.Thread):
concnt = {}
concnt['total'] = containers_num
concnt['running'] = countR
self.etcdser.setkey('/hosts/%s/containers'%(self.host), concnt)
workerinfo['containers'] = concnt
time.sleep(self.interval)
if cnt == 0:
self.etcdser.setkey('/hosts/%s/containerslist'%(self.host), conlist)
workerinfo['containerslist'] = conlist
cnt = (cnt+1)%5
if self.test:
break
@ -135,12 +145,9 @@ class Container_Collector(threading.Thread):
class Collector(threading.Thread):
def __init__(self,etcdaddr,cluster_name,host,test=False):
def __init__(self,test=False):
threading.Thread.__init__(self)
self.host = host
self.thread_stop = False
self.etcdser = etcdlib.Client(etcdaddr,"/%s/monitor/hosts/%s" % (cluster_name,host))
self.vetcdser = etcdlib.Client(etcdaddr,"/%s/monitor/vnodes" % (cluster_name))
self.interval = 1
self.test=test
return
@ -154,10 +161,9 @@ class Collector(threading.Thread):
memdict['buffers'] = meminfo.buffers/1024
memdict['cached'] = meminfo.cached/1024
memdict['percent'] = meminfo.percent
self.etcdser.setkey('/meminfo',memdict)
#print(output)
#print(memparts)
return
return memdict
def collect_cpuinfo(self):
cpuinfo = psutil.cpu_times_percent(interval=1,percpu=False)
@ -166,7 +172,6 @@ class Collector(threading.Thread):
cpuset['system'] = cpuinfo.system
cpuset['idle'] = cpuinfo.idle
cpuset['iowait'] = cpuinfo.iowait
self.etcdser.setkey('/cpuinfo',cpuset)
output = subprocess.check_output(["cat /proc/cpuinfo"],shell=True)
output = output.decode('utf-8')
parts = output.split('\n')
@ -182,10 +187,10 @@ class Collector(threading.Thread):
val = key_val[1].lstrip()
if key=='processor' or key=='model name' or key=='core id' or key=='cpu MHz' or key=='cache size' or key=='physical id':
info[idx][key] = val
self.etcdser.setkey('/cpuconfig',info)
return
return [cpuset, info]
def collect_diskinfo(self):
global workercinfo
parts = psutil.disk_partitions()
setval = []
devices = {}
@ -204,15 +209,16 @@ class Collector(threading.Thread):
if(part.mountpoint.startswith('/opt/docklet/local/volume')):
names = re.split('/',part.mountpoint)
container = names[len(names)-1]
self.vetcdser.setkey('/%s/disk_use'%(container), diskval)
if not container in workercinfo.keys():
workercinfo[container] = {}
workercinfo[container]['disk_use'] = diskval
setval.append(diskval)
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
self.etcdser.setkey('/diskinfo', setval)
#print(output)
#print(diskparts)
return
return setval
def collect_osinfo(self):
uname = platform.uname()
@ -224,16 +230,18 @@ class Collector(threading.Thread):
osinfo['version'] = uname.version
osinfo['machine'] = uname.machine
osinfo['processor'] = uname.processor
self.etcdser.setkey('/osinfo',osinfo)
return
return osinfo
def run(self):
self.collect_osinfo()
global workerinfo
workerinfo['osinfo'] = self.collect_osinfo()
while not self.thread_stop:
self.collect_meminfo()
self.collect_cpuinfo()
self.collect_diskinfo()
self.etcdser.setkey('/running','True',6)
workerinfo['meminfo'] = self.collect_meminfo()
[cpuinfo,cpuconfig] = self.collect_cpuinfo()
workerinfo['cpuinfo'] = cpuinfo
workerinfo['cpuconfig'] = cpuconfig
workerinfo['diskinfo'] = self.collect_diskinfo()
workerinfo['running'] = True
time.sleep(self.interval)
if self.test:
break
@ -243,61 +251,96 @@ class Collector(threading.Thread):
def stop(self):
self.thread_stop = True
def workerFetchInfo():
global workerinfo
global workercinfo
return str([workerinfo, workercinfo])
class Master_Collector(threading.Thread):
def __init__(self,nodemgr):
threading.Thread.__init__(self)
self.thread_stop = False
self.nodemgr = nodemgr
return
def run(self):
global monitor_hosts
global monitor_vnodes
while not self.thread_stop:
for worker in monitor_hosts.keys():
monitor_hosts[worker]['running'] = False
workers = self.nodemgr.get_rpcs()
for worker in workers:
try:
ip = self.nodemgr.rpc_to_ip(worker)
#[info,cinfo] = worker.workerFetchInfo()
info = list(eval(worker.workerFetchInfo()))
logger.info(info[1])
monitor_hosts[ip] = info[0]
for container in info[1].keys():
monitor_vnodes[container] = info[1][container]
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
time.sleep(2)
return
def stop(self):
self.thread_stop = True
return
class Container_Fetcher:
def __init__(self,etcdaddr,cluster_name):
self.etcdser = etcdlib.Client(etcdaddr,"/%s/monitor/vnodes" % (cluster_name))
def __init__(self):
return
def get_cpu_use(self,container_name):
res = {}
[ret, ans] = self.etcdser.getkey('/%s/cpu_use'%(container_name))
if ret == True :
res = dict(eval(ans))
[ret,quota] = self.etcdser.getkey('/%s/quota'%(container_name))
if ret == False:
res['quota'] = {'cpu':0}
logger.warning(quota)
res['quota'] = dict(eval(quota))
return res
else:
logger.warning(ans)
return res
global monitor_vnodes
try:
res = monitor_vnodes[container_name]['cpu_use']
res['quota'] = monitor_vnodes[container_name]['quota']
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
res = {}
return res
def get_mem_use(self,container_name):
res = {}
[ret, ans] = self.etcdser.getkey('/%s/mem_use'%(container_name))
if ret == True :
res = dict(eval(ans))
[ret,quota] = self.etcdser.getkey('/%s/quota'%(container_name))
if ret == False:
res['quota'] = {'memory':0}
logger.warning(quota)
res['quota'] = dict(eval(quota))
return res
else:
logger.warning(ans)
return res
global monitor_vnodes
try:
res = monitor_vnodes[container_name]['mem_use']
res['quota'] = monitor_vnodes[container_name]['quota']
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
res = {}
return res
def get_disk_use(self,container_name):
res = {}
[ret, ans] = self.etcdser.getkey('/%s/disk_use'%(container_name))
if ret == True :
res = dict(eval(ans))
else:
logger.warning(ans)
global monitor_vnodes
try:
res = monitor_vnodes[container_name]['disk_use']
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
res = {}
return res
def get_basic_info(self,container_name):
res = self.etcdser.getkey("/%s/basic_info"%(container_name))
if res[0] == False:
return {}
res = dict(eval(res[1]))
global monitor_vnodes
try:
res = monitor_vnodes[container_name]['basic_info']
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
res = {}
return res
class Fetcher:
def __init__(self,etcdaddr,cluster_name,host):
self.etcdser = etcdlib.Client(etcdaddr,"/%s/monitor/hosts/%s" % (cluster_name,host))
def __init__(self,host):
global monitor_hosts
self.info = monitor_hosts[host]
return
#def get_clcnt(self):
@ -310,72 +353,76 @@ class Fetcher:
# return self.get_meminfo_('172.31.0.1')
def get_meminfo(self):
res = {}
[ret, ans] = self.etcdser.getkey('/meminfo')
if ret == True :
res = dict(eval(ans))
return res
else:
logger.warning(ans)
return res
try:
res = self.info['meminfo']
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
res = {}
return res
def get_cpuinfo(self):
res = {}
[ret, ans] = self.etcdser.getkey('/cpuinfo')
if ret == True :
res = dict(eval(ans))
return res
else:
logger.warning(ans)
return res
try:
res = self.info['cpuinfo']
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
res = {}
return res
def get_cpuconfig(self):
res = {}
[ret, ans] = self.etcdser.getkey('/cpuconfig')
if ret == True :
res = list(eval(ans))
return res
else:
logger.warning(ans)
return res
try:
res = self.info['cpuconfig']
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
res = {}
return res
def get_diskinfo(self):
res = []
[ret, ans] = self.etcdser.getkey('/diskinfo')
if ret == True :
res = list(eval(ans))
return res
else:
logger.warning(ans)
return res
try:
res = self.info['diskinfo']
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
res = {}
return res
def get_osinfo(self):
res = {}
[ret, ans] = self.etcdser.getkey('/osinfo')
if ret == True:
res = dict(eval(ans))
return res
else:
logger.warning(ans)
return res
try:
res = self.info['osinfo']
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
res = {}
return res
def get_containers(self):
res = {}
[ret, ans] = self.etcdser.getkey('/containers')
if ret == True:
res = dict(eval(ans))
return res
else:
logger.warning(ans)
return res
try:
res = self.info['containers']
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
res = {}
return res
def get_status(self):
isexist = self.etcdser.getkey('/running')[0]
try:
isexist = self.info['running']
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
isexist = False
if(isexist):
return 'RUNNING'
else:
return 'STOPPED'
def get_containerslist(self):
res = list(eval(self.etcdser.getkey('/containerslist')[1]))
try:
res = self.info['containerslist']
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
res = {}
return res

View File

@ -108,10 +108,38 @@ class IntervalPool(object):
#self.pool[str(i)].sort(key=ip_to_int) # cidr between thiscidr and upcidr are null, no need to sort
return [True, upinterval]
# check whether the addr/cidr overlaps the self.pool
# for example, addr/cidr=172.16.0.48/29 overlaps self.pool['24']=[172.16.0.0]
def overlap(self, addr, cidr):
cidr=int(cidr)
start_cidr=int(self.info.split('/')[1])
# test self.pool[cidr] from first cidr pool to last cidr pool
for cur_cidr in range(start_cidr, 33):
if not self.pool[str(cur_cidr)]:
continue
# for every cur_cidr, test every possible element covered by pool[cur_cidr] in range of addr/cidr
cur_addr=fix_ip(addr, min(cidr, cur_cidr))
last_addr=next_interval(addr, cidr)
while(ip_to_int(cur_addr)<ip_to_int(last_addr)):
if cur_addr in self.pool[str(cur_cidr)]:
return True
cur_addr=next_interval(cur_addr, cur_cidr)
return False
# whether addr/cidr is in the range of self.pool
def inrange(self, addr, cidr):
pool_addr,pool_cidr=self.info.split('/')
if int(cidr)>=int(pool_cidr) and fix_ip(addr,pool_cidr)==pool_addr:
return True
else:
return False
# deallocate an interval with IP/CIDR
# ToDo : when free IP/CIDR, we donot check whether IP/CIDR is in pool
# maybe we check this later
def free(self, addr, cidr):
if not self.inrange(addr, cidr):
return [False, '%s/%s not in range of %s' % (addr, str(cidr), self.info)]
if self.overlap(addr, cidr):
return [False, '%s/%s overlaps the center pool:%s' % (addr, str(cidr), self.__str__())]
cidr = int(cidr)
# cidr not in pool means CIDR out of pool range
if str(cidr) not in self.pool:
@ -185,14 +213,24 @@ class EnumPool(object):
return [status, result]
return [True, list(map(lambda x:x+"/"+self.info.split('/')[1], result))]
# ToDo : when release :
# not check whether IP is in the range of pool
# not check whether IP is already in the pool
def inrange(self, ip):
addr = self.info.split('/')[0]
addrint = ip_to_int(addr)
cidr = int(self.info.split('/')[1])
if addrint+1 <= ip_to_int(ip) <= addrint+pow(2, 32-cidr)-2:
return True
return False
def release(self, ip_or_ips):
if type(ip_or_ips) == str:
ips = [ ip_or_ips ]
else:
ips = ip_or_ips
# check whether all IPs are not in the pool but in the range of pool
for ip in ips:
ip = ip.split('/')[0]
if (ip in self.pool) or (not self.inrange(ip)):
return [False, 'release IPs failed for ip already existing or ip exceeding the network pool, ips to be released: %s, ip pool is: %s and content is : %s' % (ips, self.info, self.pool)]
for ip in ips:
# maybe ip is in format IP/CIDR
ip = ip.split('/')[0]
@ -221,6 +259,14 @@ class UserPool(EnumPool):
def get_gateway_cidr(self):
return self.gateway+"/"+self.info.split('/')[1]
def inrange(self, ip):
addr = self.info.split('/')[0]
addrint = ip_to_int(addr)
cidr = int(self.info.split('/')[1])
if addrint+2 <= ip_to_int(ip) <= addrint+pow(2, 32-cidr)-2:
return True
return False
def printpool(self):
print("users ID:"+str(self.vlanid)+", net info:"+self.info+", gateway:"+self.gateway)
print (str(self.pool))

View File

@ -309,14 +309,14 @@ class userManager:
'''
user = User.verify_auth_token(token)
return user
def set_nfs_quota_bygroup(self,groupname, quota):
if not data_quota == "True":
return
users = User.query.filter_by(user_group = groupname).all()
for user in users:
self.set_nfs_quota(user.username, quota)
def set_nfs_quota(self, username, quota):
if not data_quota == "True":
return
@ -326,7 +326,7 @@ class userManager:
sys_run(cmd.strip('"'))
except Exception as e:
logger.error(e)
@administration_required
def query(*args, **kwargs):
@ -607,8 +607,8 @@ class userManager:
if (user_modify.status == 'applying' and form.get('status', '') == 'normal'):
send_activated_email(user_modify.e_mail, user_modify.username)
user_modify.status = form.get('status', '')
if (form.get('Chpassword', '') == 'Yes'):
new_password = form.get('password','no_password')
if (form.get('password', '') != ''):
new_password = form.get('password','')
new_password = hashlib.sha512(new_password.encode('utf-8')).hexdigest()
user_modify.password = new_password
#self.chpassword(cur_user = user_modify, password = form.get('password','no_password'))
@ -629,8 +629,8 @@ class userManager:
cur_user = kwargs['cur_user']
cur_user.password = hashlib.sha512(kwargs['password'].encode('utf-8')).hexdigest()
def newuser(*args, **kwargs):
'''
Usage : newuser()

View File

@ -120,6 +120,7 @@ class Worker(object):
self.rpcserver = ThreadXMLRPCServer((self.addr, int(self.port)), allow_none=True)
self.rpcserver.register_introspection_functions()
self.rpcserver.register_instance(Containers)
self.rpcserver.register_function(monitor.workerFetchInfo)
# register functions or instances to server for rpc
#self.rpcserver.register_function(function_name)
@ -199,10 +200,6 @@ if __name__ == '__main__':
sys.exit(1)
else:
logger.info("etcd connected")
# init collector to collect monitor infomation
collector = monitor.Collector(etcdaddr,clustername,ipaddr)
collector.start()
cpu_quota = env.getenv('CONTAINER_CPU')
logger.info ("using CONTAINER_CPU %s" % cpu_quota )
@ -213,9 +210,11 @@ if __name__ == '__main__':
worker_port = env.getenv('WORKER_PORT')
logger.info ("using WORKER_PORT %s" % worker_port )
con_collector = monitor.Container_Collector(etcdaddr, clustername,
ipaddr)
# init collector to collect monitor infomation
con_collector = monitor.Container_Collector()
con_collector.start()
collector = monitor.Collector()
collector.start()
logger.info("CPU and Memory usage monitor started")
logger.info("Starting worker")

View File

@ -135,6 +135,7 @@
</div>
</div>
</div>
<div class="table table-responsive">
<table id="myGroupTable" class="table table-striped table-bordered">
<thead>
<tr>
@ -197,6 +198,7 @@
</tbody>
</table>
</div>
</div>
</div>
</div>
</div>

View File

@ -71,6 +71,7 @@
</div>
</div>
</div>
<div class="table table-responsive">
<table id="myDataTable" class="table table-striped table-bordered">
<thead>
<tr>
@ -131,16 +132,10 @@
<label>Telephone Number</label>
<input type = "text" placeholder="Enter Telephone Number" class="form-control" name="tel" id="mTel">
</div>
<div class="form-group">
<label>Change Password?</label>
<select class="form-control" name="Chpassword" id="mChpassword">
<option>Yes</option>
<option>No</option>
</select>
</div>
<div class="form-group">
<label>Password</label>
<input type = "text" placeholder="Enter Password" class="form-control" name="password" id="mPassword">
<input type="password" placeholder="Enter Password" class="form-control" name="password" id="mPassword">
</div>
<div class="form-group">
@ -176,7 +171,8 @@
</div>
</div>
</table>
</div>
</div>
</div>
</div>
@ -247,8 +243,6 @@
$("#mDepartment").val(result.department);
$("#mStudentNumber").val(result.student_number);
$("#mTel").val(result.tel);
$("#mChpassword").val('No');
$("#mPassword").val(result.password);
$("#mStatus").val(result.status);
$("#mUserGroup").val(result.group);
$("#mAuthMethod").val(result.auth_method);