From 1315ca0c3a08da9655b120c180b07b208e784c97 Mon Sep 17 00:00:00 2001 From: zhuyj17 Date: Sun, 10 Jun 2018 23:53:21 +0800 Subject: [PATCH] Tidy master code --- bin/docklet-master | 2 +- bin/docklet-supermaster | 4 +- src/master/beansapplicationmgr.py | 8 +- src/master/cloudmgr.py | 26 +-- src/master/deploy.py | 10 +- src/master/httprest.py | 25 +-- src/master/monitor.py | 264 ++++++++++++++++++++++++++++++ src/master/nodemgr.py | 8 +- src/master/notificationmgr.py | 10 +- src/master/settings.py | 4 +- src/master/userManager.py | 14 +- src/master/vclustermgr.py | 14 +- src/utils/imagemgr.py | 2 +- src/utils/logs.py | 4 +- src/{worker => utils}/lvmtool.py | 0 src/worker/container.py | 4 +- src/worker/monitor.py | 255 ----------------------------- src/worker/worker.py | 2 +- 18 files changed, 333 insertions(+), 323 deletions(-) create mode 100644 src/master/monitor.py rename src/{worker => utils}/lvmtool.py (100%) diff --git a/bin/docklet-master b/bin/docklet-master index 89c223e..807ba83 100755 --- a/bin/docklet-master +++ b/bin/docklet-master @@ -46,7 +46,7 @@ export FS_PREFIX DAEMON_USER=root # settings for docklet master -DAEMON_MASTER=$DOCKLET_LIB/httprest.py +DAEMON_MASTER=$DOCKLET_LIB/master/httprest.py DAEMON_NAME_MASTER=docklet-master DAEMON_OPTS_MASTER= # The process ID of the script when it runs is stored here: diff --git a/bin/docklet-supermaster b/bin/docklet-supermaster index 629239e..ed93ef1 100755 --- a/bin/docklet-supermaster +++ b/bin/docklet-supermaster @@ -45,7 +45,7 @@ export FS_PREFIX DAEMON_USER=root # settings for docklet master -DAEMON_MASTER=$DOCKLET_LIB/httprest.py +DAEMON_MASTER=$DOCKLET_LIB/master/httprest.py DAEMON_NAME_MASTER=docklet-master DAEMON_OPTS_MASTER= # The process ID of the script when it runs is stored here: @@ -158,7 +158,7 @@ do_start_web () { } do_start_user () { - + log_daemon_msg "Starting $DAEMON_NAME_USER in $FS_PREFIX" DAEMON_OPTS_USER="-p $USER_PORT" diff --git a/src/master/beansapplicationmgr.py b/src/master/beansapplicationmgr.py index b64df00..2ec5f3e 100755 --- a/src/master/beansapplicationmgr.py +++ b/src/master/beansapplicationmgr.py @@ -11,14 +11,14 @@ This module consists of three parts: ''' import threading,datetime,random,time -from model import db,User,ApplyMsg -from userManager import administration_required -import env +from utils.model import db,User,ApplyMsg +from master.userManager import administration_required +from utils import env import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.header import Header -from settings import settings +from master.settings import settings # send email to remind users of their beans diff --git a/src/master/cloudmgr.py b/src/master/cloudmgr.py index 2b8957a..a62c8b0 100755 --- a/src/master/cloudmgr.py +++ b/src/master/cloudmgr.py @@ -1,12 +1,12 @@ #!/usr/bin/python3 from io import StringIO import os,sys,subprocess,time,re,datetime,threading,random,shutil -from model import db, Image -from deploy import * +from utils.model import db, Image +from master.deploy import * import json -from log import logger -import env +from utils.log import logger +from utils import env import requests fspath = env.getenv('FS_PREFIX') @@ -42,12 +42,12 @@ class AliyunMgr(): except Exception as e: logger.error(e) return False - + def createInstance(self): request = self.Request.CreateInstanceRequest.CreateInstanceRequest() request.set_accept_format('json') request.add_query_param('RegionId', self.setting['RegionId']) - if 'ZoneId' in self.setting and not self.setting['ZoneId'] == "": + if 'ZoneId' in self.setting and not self.setting['ZoneId'] == "": request.add_query_param('ZoneId', self.setting['ZoneId']) if 'VSwitchId' in self.setting and not self.setting['VSwitchId'] == "": request.add_query_param('VSwitchId', self.setting['VSwitchId']) @@ -60,25 +60,25 @@ class AliyunMgr(): request.add_query_param('Password', self.setting['Password']) response = self.clt.do_action_with_exception(request) logger.info(response) - + instanceid=json.loads(bytes.decode(response))['InstanceId'] return instanceid - + def startInstance(self, instanceid): request = self.Request.StartInstanceRequest.StartInstanceRequest() request.set_accept_format('json') request.add_query_param('InstanceId', instanceid) response = self.clt.do_action_with_exception(request) logger.info(response) - - + + def createEIP(self): request = self.Request.AllocateEipAddressRequest.AllocateEipAddressRequest() request.set_accept_format('json') request.add_query_param('RegionId', self.setting['RegionId']) response = self.clt.do_action_with_exception(request) logger.info(response) - + response=json.loads(bytes.decode(response)) eipid=response['AllocationId'] eipaddr=response['EipAddress'] @@ -94,7 +94,7 @@ class AliyunMgr(): response = self.clt.do_action_with_exception(request) logger.info(response) - + def getInnerIP(self, instanceid): request = self.Request.DescribeInstancesRequest.DescribeInstancesRequest() request.set_accept_format('json') @@ -168,7 +168,7 @@ class EmptyMgr(): return False class CloudMgr(): - + def getSettingFile(self): if not os.path.exists(fspath+"/global/sys/cloudsetting.json"): currentfilepath = os.path.dirname(os.path.abspath(__file__)) diff --git a/src/master/deploy.py b/src/master/deploy.py index 0be22a4..5b5a585 100755 --- a/src/master/deploy.py +++ b/src/master/deploy.py @@ -1,8 +1,8 @@ #!/usr/bin/python3 -import paramiko, time -from log import logger -import env,os +import paramiko, time, os +from utils.log import logger +from utils import env def myexec(ssh,command): stdin,stdout,stderr = ssh.exec_command(command) @@ -12,7 +12,7 @@ def myexec(ssh,command): if time.time() > endtime: stdout.channel.close() logger.error(command + ": fail") - return + return # for line in stdout.readlines(): # if line is None: # time.sleep(5) @@ -35,7 +35,7 @@ def deploy(ipaddr,masterip,account,password,volumename): sftp.put(deployscriptpath,'/root/docklet-deploy.sh') sftp.put('/etc/hosts', '/etc/hosts') transport.close() - + ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) while True: diff --git a/src/master/httprest.py b/src/master/httprest.py index 1bc5d28..f01b972 100755 --- a/src/master/httprest.py +++ b/src/master/httprest.py @@ -4,10 +4,13 @@ # because some modules need variables when import # for example, userManager/model.py +import sys +if sys.path[0].endswith("master"): + sys.path[0] = sys.path[0][:-6] from flask import Flask, request # must first init loadenv -import tools, env +from utils import tools, env # default CONFIG=/opt/docklet/local/docklet-running.conf config = env.getenv("CONFIG") @@ -15,22 +18,22 @@ tools.loadenv(config) # second init logging # must import logger after initlogging, ugly -from log import initlogging +from utils.log import initlogging initlogging("docklet-master") -from log import logger +from utils.log import logger import os -import http.server, cgi, json, sys, shutil +import http.server, cgi, json, sys, shutil, traceback import xmlrpc.client from socketserver import ThreadingMixIn -import nodemgr, vclustermgr, etcdlib, network, imagemgr, notificationmgr, lockmgr, cloudmgr -from logs import logs -import userManager,beansapplicationmgr -import monitor,traceback +from utils import etcdlib, imagemgr +from master import nodemgr, vclustermgr, notificationmgr, lockmgr, cloudmgr +from utils.logs import logs +from master import userManager, beansapplicationmgr, monitor, sysmgr, network +from worker.monitor import History_Manager import threading -import sysmgr import requests -from nettools import portcontrol +from utils.nettools import portcontrol #default EXTERNAL_LOGIN=False external_login = env.getenv('EXTERNAL_LOGIN') @@ -895,7 +898,7 @@ if __name__ == '__main__': masterport = env.getenv('MASTER_PORT') logger.info("using MASTER_PORT %d", int(masterport)) - G_historymgr = monitor.History_Manager() + G_historymgr = History_Manager() master_collector = monitor.Master_Collector(G_nodemgr,ipaddr+":"+str(masterport)) master_collector.start() logger.info("master_collector started") diff --git a/src/master/monitor.py b/src/master/monitor.py new file mode 100644 index 0000000..2aaaa60 --- /dev/null +++ b/src/master/monitor.py @@ -0,0 +1,264 @@ +import threading, time, traceback +from utils import env +from utils.log import logger +from httplib2 import Http + +# major dict to store the monitoring data +# only use on Master +# monitor_hosts: use workers' ip addresses as first key. +# second key: cpuinfo,diskinfo,meminfo,osinfo,cpuconfig,running,containers,containerslist +# 1.cpuinfo stores the cpu usages data, and it has keys: user,system,idle,iowait +# 2.diskinfo stores the disks usages data, and it has keys: device,mountpoint,total,used,free,percent +# 3.meminfo stores the memory usages data, and it has keys: total,used,free,buffers,cached,percent +# 4.osinfo stores the information of operating system, +# and it has keys: platform,system,node,release,version,machine,processor +# 5.cpuconfig stores the information of processors, and it is a list, each element of list is a dict +# which stores the information of a processor, each element has key: processor,model name, +# core id, cpu MHz, cache size, physical id. +# 6.running indicates the status of worker,and it has two values: True, False. +# 7.containers store the amount of containers on the worker. +# 8.containers store a list which consists of the names of containers on the worker. +monitor_hosts = {} + +# monitor_vnodes: use the owners' names of vnodes(containers) as first key. +# use the names of vnodes(containers) as second key. +# third key: cpu_use,mem_use,disk_use,basic_info,quota +# 1.cpu_use has keys: val,unit,hostpercent +# 2.mem_use has keys: val,unit,usedp +# 3.disk_use has keys: device,mountpoint,total,used,free,percent +# 4.basic_info has keys: Name,State,PID,IP,RunningTime,billing,billing_this_hour +# 5.quota has keys: cpu,memeory +monitor_vnodes = {} + +# get owner name of a container +def get_owner(container_name): + names = container_name.split('-') + return names[0] + +# the thread to collect data from each worker and store them in monitor_hosts and monitor_vnodes +class Master_Collector(threading.Thread): + + def __init__(self,nodemgr,master_ip): + threading.Thread.__init__(self) + self.thread_stop = False + self.nodemgr = nodemgr + self.master_ip = master_ip + self.net_lastbillings = {} + self.bytes_per_beans = 1000000000 + return + + def net_billings(self, username, now_bytes_total): + global monitor_vnodes + if not username in self.net_lastbillings.keys(): + self.net_lastbillings[username] = 0 + elif int(now_bytes_total/self.bytes_per_beans) < self.net_lastbillings[username]: + self.net_lastbillings[username] = 0 + diff = int(now_bytes_total/self.bytes_per_beans) - self.net_lastbillings[username] + if diff > 0: + auth_key = env.getenv('AUTH_KEY') + data = {"owner_name":username,"billing":diff, "auth_key":auth_key} + header = {'Content-Type':'application/x-www-form-urlencoded'} + http = Http() + [resp,content] = http.request("http://"+self.master_ip+"/billing/beans/","POST",urlencode(data),headers = header) + logger.info("response from master:"+content.decode('utf-8')) + self.net_lastbillings[username] += diff + monitor_vnodes[username]['net_stats']['net_billings'] = self.net_lastbillings[username] + + def run(self): + global monitor_hosts + global monitor_vnodes + while not self.thread_stop: + for worker in monitor_hosts.keys(): + monitor_hosts[worker]['running'] = False + workers = self.nodemgr.get_nodeips() + for worker in workers: + try: + ip = worker + workerrpc = self.nodemgr.ip_to_rpc(worker) + # fetch data + info = list(eval(workerrpc.workerFetchInfo(self.master_ip))) + #logger.info(info[0]) + # store data in monitor_hosts and monitor_vnodes + monitor_hosts[ip] = info[0] + for container in info[1].keys(): + owner = get_owner(container) + if not owner in monitor_vnodes.keys(): + monitor_vnodes[owner] = {} + monitor_vnodes[owner][container] = info[1][container] + for user in info[2].keys(): + if not user in monitor_vnodes.keys(): + continue + else: + monitor_vnodes[user]['net_stats'] = info[2][user] + self.net_billings(user, info[2][user]['bytes_total']) + except Exception as err: + logger.warning(traceback.format_exc()) + logger.warning(err) + time.sleep(2) + #logger.info(History.query.all()) + #logger.info(VNode.query.all()) + return + + def stop(self): + self.thread_stop = True + return + +# master use this class to fetch specific data of containers(vnodes) +class Container_Fetcher: + def __init__(self,container_name): + self.owner = get_owner(container_name) + self.con_id = container_name + return + + def get_cpu_use(self): + global monitor_vnodes + try: + res = monitor_vnodes[self.owner][self.con_id]['cpu_use'] + res['quota'] = monitor_vnodes[self.owner][self.con_id]['quota'] + except Exception as err: + logger.warning(traceback.format_exc()) + logger.warning(err) + res = {} + return res + + def get_mem_use(self): + global monitor_vnodes + try: + res = monitor_vnodes[self.owner][self.con_id]['mem_use'] + res['quota'] = monitor_vnodes[self.owner][self.con_id]['quota'] + except Exception as err: + logger.warning(traceback.format_exc()) + logger.warning(err) + res = {} + return res + + def get_disk_use(self): + global monitor_vnodes + try: + res = monitor_vnodes[self.owner][self.con_id]['disk_use'] + except Exception as err: + logger.warning(traceback.format_exc()) + logger.warning(err) + res = {} + return res + + def get_net_stats(self): + global monitor_vnodes + try: + res = monitor_vnodes[self.owner][self.con_id]['net_stats'] + except Exception as err: + logger.warning(traceback.format_exc()) + logger.warning(err) + res = {} + return res + + def get_basic_info(self): + global monitor_vnodes + try: + res = monitor_vnodes[self.owner][self.con_id]['basic_info'] + except Exception as err: + logger.warning(traceback.format_exc()) + logger.warning(err) + res = {} + return res + +# Master use this class to fetch specific data of physical machines(hosts) +class Fetcher: + + def __init__(self,host): + global monitor_hosts + self.info = monitor_hosts[host] + return + + #def get_clcnt(self): + # return DockletMonitor.clcnt + + #def get_nodecnt(self): + # return DockletMonitor.nodecnt + + #def get_meminfo(self): + # return self.get_meminfo_('172.31.0.1') + + def get_meminfo(self): + try: + res = self.info['meminfo'] + except Exception as err: + logger.warning(traceback.format_exc()) + logger.warning(err) + res = {} + return res + + def get_cpuinfo(self): + try: + res = self.info['cpuinfo'] + except Exception as err: + logger.warning(traceback.format_exc()) + logger.warning(err) + res = {} + return res + + def get_cpuconfig(self): + try: + res = self.info['cpuconfig'] + except Exception as err: + logger.warning(traceback.format_exc()) + logger.warning(err) + res = {} + return res + + def get_diskinfo(self): + try: + res = self.info['diskinfo'] + except Exception as err: + logger.warning(traceback.format_exc()) + logger.warning(err) + res = {} + return res + + def get_osinfo(self): + try: + res = self.info['osinfo'] + except Exception as err: + logger.warning(traceback.format_exc()) + logger.warning(err) + res = {} + return res + + def get_concpuinfo(self): + try: + res = self.info['concpupercent'] + except Exception as err: + logger.warning(traceback.format_exc()) + logger.warning(err) + res = {} + return res + + def get_containers(self): + try: + res = self.info['containers'] + except Exception as err: + logger.warning(traceback.format_exc()) + logger.warning(err) + res = {} + return res + + def get_status(self): + try: + isexist = self.info['running'] + except Exception as err: + logger.warning(traceback.format_exc()) + logger.warning(err) + isexist = False + if(isexist): + return 'RUNNING' + else: + return 'STOPPED' + + def get_containerslist(self): + try: + res = self.info['containerslist'] + except Exception as err: + logger.warning(traceback.format_exc()) + logger.warning(err) + res = {} + return res diff --git a/src/master/nodemgr.py b/src/master/nodemgr.py index 9ca819b..d3396a6 100755 --- a/src/master/nodemgr.py +++ b/src/master/nodemgr.py @@ -2,9 +2,9 @@ import threading, random, time, xmlrpc.client, sys #import network -from nettools import netcontrol,ovscontrol -from log import logger -import env +from utils.nettools import netcontrol,ovscontrol +from utils.log import logger +from utils import env ########################################## # NodeMgr @@ -149,7 +149,7 @@ class NodeMgr(object): taskargs = task['args'] logger.info("recover task:%s in worker:%s" % (taskname, ip)) eval('worker.'+taskname)(*taskargs) - + # get all run nodes' IP addr def get_nodeips(self): return self.runnodes diff --git a/src/master/notificationmgr.py b/src/master/notificationmgr.py index 426099d..84e6350 100644 --- a/src/master/notificationmgr.py +++ b/src/master/notificationmgr.py @@ -1,15 +1,15 @@ import json -from log import logger -from model import db, Notification, NotificationGroups, User, UserNotificationPair -from userManager import administration_required, token_required +from utils.log import logger +from utils.model import db, Notification, NotificationGroups, User, UserNotificationPair +from master.userManager import administration_required, token_required import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.header import Header from datetime import datetime -import env -from settings import settings +from utils import env +from master.settings import settings class NotificationMgr: def __init__(self): diff --git a/src/master/settings.py b/src/master/settings.py index 67d8111..af9e9df 100644 --- a/src/master/settings.py +++ b/src/master/settings.py @@ -1,9 +1,9 @@ #!/usr/bin/python3 -import env +from utils import env import json, os from functools import wraps -from log import logger +from utils.log import logger class settingsClass: diff --git a/src/master/userManager.py b/src/master/userManager.py index 1dad29b..6d89835 100755 --- a/src/master/userManager.py +++ b/src/master/userManager.py @@ -7,22 +7,22 @@ Warning: in some early versions, "token" stand for the instance of class model.U Original author: Liu Peidong ''' -from model import db, User, UserGroup, Notification, UserUsage +from utils.model import db, User, UserGroup, Notification, UserUsage from functools import wraps import os, subprocess, math import hashlib import pam from base64 import b64encode -import env -from settings import settings +from utils import env +from master.settings import settings import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.header import Header from datetime import datetime import json -from log import logger -from lvmtool import * +from utils.log import logger +from utils.lvmtool import * PAM = pam.pam() fspath = env.getenv('FS_PREFIX') @@ -162,7 +162,7 @@ class userManager: sys_admin.auth_method = 'local' db.session.add(sys_admin) path = env.getenv('DOCKLET_LIB') - subprocess.call([path+"/userinit.sh", username]) + subprocess.call([path+"/master/userinit.sh", username]) db.session.commit() if not os.path.exists(fspath+"/global/sys/quota"): groupfile = open(fspath+"/global/sys/quota",'w') @@ -870,7 +870,7 @@ class userManager: # now initialize for all kind of users #if newuser.status == 'normal': path = env.getenv('DOCKLET_LIB') - subprocess.call([path+"/userinit.sh", newuser.username]) + subprocess.call([path+"/master/userinit.sh", newuser.username]) res = self.groupQuery(name=newuser.user_group) if res['success']: self.set_nfs_quota(newuser.username,res['data']['data']) diff --git a/src/master/vclustermgr.py b/src/master/vclustermgr.py index c781feb..b7ef839 100755 --- a/src/master/vclustermgr.py +++ b/src/master/vclustermgr.py @@ -1,15 +1,13 @@ #!/usr/bin/python3 -import os, random, json, sys, imagemgr +import os, random, json, sys import datetime, math -from log import logger -import env -import proxytool -import requests, threading -import traceback -from nettools import portcontrol -from model import db, Container, PortMapping, VCluster +from utils.log import logger +from utils import env, imagemgr, proxytool +import requests, threading, traceback +from utils.nettools import portcontrol +from utils.model import db, Container, PortMapping, VCluster userpoint = "http://" + env.getenv('USER_IP') + ":" + str(env.getenv('USER_PORT')) def post_to_user(url = '/', data={}): diff --git a/src/utils/imagemgr.py b/src/utils/imagemgr.py index fc6a2bb..fcb8873 100755 --- a/src/utils/imagemgr.py +++ b/src/utils/imagemgr.py @@ -24,7 +24,7 @@ from utils.model import db, Image from utils.log import logger from utils import env, updatebase -from worker.lvmtool import * +from utils.lvmtool import * import requests master_port = str(env.getenv('MASTER_PORT')) diff --git a/src/utils/logs.py b/src/utils/logs.py index 51be483..ac79727 100644 --- a/src/utils/logs.py +++ b/src/utils/logs.py @@ -1,8 +1,8 @@ #!/usr/bin/python3 -import env +from utils import env import json, os -from log import logger +from utils.log import logger from werkzeug.utils import secure_filename logsPath = env.getenv('FS_PREFIX') + '/local/log/' diff --git a/src/worker/lvmtool.py b/src/utils/lvmtool.py similarity index 100% rename from src/worker/lvmtool.py rename to src/utils/lvmtool.py diff --git a/src/worker/container.py b/src/worker/container.py index 7f2d977..9a530fa 100755 --- a/src/worker/container.py +++ b/src/worker/container.py @@ -3,7 +3,7 @@ import subprocess, os, json from utils.log import logger from utils import env, imagemgr -from worker.lvmtool import sys_run, check_volume +from utils.lvmtool import sys_run, check_volume from worker.monitor import Container_Collector, History_Manager import lxc @@ -44,7 +44,7 @@ class Container(object): if not os.path.isdir("%s/global/users/%s" % (self.fspath,username)): path = env.getenv('DOCKLET_LIB') - subprocess.call([path+"/userinit.sh", username]) + subprocess.call([path+"/master/userinit.sh", username]) logger.info("user %s directory not found, create it" % username) sys_run("mkdir -p /var/lib/lxc/%s" % lxc_name) logger.info("generate config file for %s" % lxc_name) diff --git a/src/worker/monitor.py b/src/worker/monitor.py index 0291b1a..de31870 100755 --- a/src/worker/monitor.py +++ b/src/worker/monitor.py @@ -35,33 +35,6 @@ b_mem = 2000000 # MB c_disk = 4000 # MB d_port = 1 -# major dict to store the monitoring data -# only use on Master -# monitor_hosts: use workers' ip addresses as first key. -# second key: cpuinfo,diskinfo,meminfo,osinfo,cpuconfig,running,containers,containerslist -# 1.cpuinfo stores the cpu usages data, and it has keys: user,system,idle,iowait -# 2.diskinfo stores the disks usages data, and it has keys: device,mountpoint,total,used,free,percent -# 3.meminfo stores the memory usages data, and it has keys: total,used,free,buffers,cached,percent -# 4.osinfo stores the information of operating system, -# and it has keys: platform,system,node,release,version,machine,processor -# 5.cpuconfig stores the information of processors, and it is a list, each element of list is a dict -# which stores the information of a processor, each element has key: processor,model name, -# core id, cpu MHz, cache size, physical id. -# 6.running indicates the status of worker,and it has two values: True, False. -# 7.containers store the amount of containers on the worker. -# 8.containers store a list which consists of the names of containers on the worker. -monitor_hosts = {} - -# monitor_vnodes: use the owners' names of vnodes(containers) as first key. -# use the names of vnodes(containers) as second key. -# third key: cpu_use,mem_use,disk_use,basic_info,quota -# 1.cpu_use has keys: val,unit,hostpercent -# 2.mem_use has keys: val,unit,usedp -# 3.disk_use has keys: device,mountpoint,total,used,free,percent -# 4.basic_info has keys: Name,State,PID,IP,RunningTime,billing,billing_this_hour -# 5.quota has keys: cpu,memeory -monitor_vnodes = {} - # major dict to store the monitoring data on Worker # only use on Worker # workerinfo: only store the data collected on current Worker, @@ -627,234 +600,6 @@ def get_billing_history(vnode_name): default['port'] = 0 return default -# the thread to collect data from each worker and store them in monitor_hosts and monitor_vnodes -class Master_Collector(threading.Thread): - - def __init__(self,nodemgr,master_ip): - threading.Thread.__init__(self) - self.thread_stop = False - self.nodemgr = nodemgr - self.master_ip = master_ip - self.net_lastbillings = {} - self.bytes_per_beans = 1000000000 - return - - def net_billings(self, username, now_bytes_total): - global monitor_vnodes - if not username in self.net_lastbillings.keys(): - self.net_lastbillings[username] = 0 - elif int(now_bytes_total/self.bytes_per_beans) < self.net_lastbillings[username]: - self.net_lastbillings[username] = 0 - diff = int(now_bytes_total/self.bytes_per_beans) - self.net_lastbillings[username] - if diff > 0: - auth_key = env.getenv('AUTH_KEY') - data = {"owner_name":username,"billing":diff, "auth_key":auth_key} - header = {'Content-Type':'application/x-www-form-urlencoded'} - http = Http() - [resp,content] = http.request("http://"+self.master_ip+"/billing/beans/","POST",urlencode(data),headers = header) - logger.info("response from master:"+content.decode('utf-8')) - self.net_lastbillings[username] += diff - monitor_vnodes[username]['net_stats']['net_billings'] = self.net_lastbillings[username] - - def run(self): - global monitor_hosts - global monitor_vnodes - while not self.thread_stop: - for worker in monitor_hosts.keys(): - monitor_hosts[worker]['running'] = False - workers = self.nodemgr.get_nodeips() - for worker in workers: - try: - ip = worker - workerrpc = self.nodemgr.ip_to_rpc(worker) - # fetch data - info = list(eval(workerrpc.workerFetchInfo(self.master_ip))) - #logger.info(info[0]) - # store data in monitor_hosts and monitor_vnodes - monitor_hosts[ip] = info[0] - for container in info[1].keys(): - owner = get_owner(container) - if not owner in monitor_vnodes.keys(): - monitor_vnodes[owner] = {} - monitor_vnodes[owner][container] = info[1][container] - for user in info[2].keys(): - if not user in monitor_vnodes.keys(): - continue - else: - monitor_vnodes[user]['net_stats'] = info[2][user] - self.net_billings(user, info[2][user]['bytes_total']) - except Exception as err: - logger.warning(traceback.format_exc()) - logger.warning(err) - time.sleep(2) - #logger.info(History.query.all()) - #logger.info(VNode.query.all()) - return - - def stop(self): - self.thread_stop = True - return - -# master use this class to fetch specific data of containers(vnodes) -class Container_Fetcher: - def __init__(self,container_name): - self.owner = get_owner(container_name) - self.con_id = container_name - return - - def get_cpu_use(self): - global monitor_vnodes - try: - res = monitor_vnodes[self.owner][self.con_id]['cpu_use'] - res['quota'] = monitor_vnodes[self.owner][self.con_id]['quota'] - except Exception as err: - logger.warning(traceback.format_exc()) - logger.warning(err) - res = {} - return res - - def get_mem_use(self): - global monitor_vnodes - try: - res = monitor_vnodes[self.owner][self.con_id]['mem_use'] - res['quota'] = monitor_vnodes[self.owner][self.con_id]['quota'] - except Exception as err: - logger.warning(traceback.format_exc()) - logger.warning(err) - res = {} - return res - - def get_disk_use(self): - global monitor_vnodes - try: - res = monitor_vnodes[self.owner][self.con_id]['disk_use'] - except Exception as err: - logger.warning(traceback.format_exc()) - logger.warning(err) - res = {} - return res - - def get_net_stats(self): - global monitor_vnodes - try: - res = monitor_vnodes[self.owner][self.con_id]['net_stats'] - except Exception as err: - logger.warning(traceback.format_exc()) - logger.warning(err) - res = {} - return res - - def get_basic_info(self): - global monitor_vnodes - try: - res = monitor_vnodes[self.owner][self.con_id]['basic_info'] - except Exception as err: - logger.warning(traceback.format_exc()) - logger.warning(err) - res = {} - return res - -# Master use this class to fetch specific data of physical machines(hosts) -class Fetcher: - - def __init__(self,host): - global monitor_hosts - self.info = monitor_hosts[host] - return - - #def get_clcnt(self): - # return DockletMonitor.clcnt - - #def get_nodecnt(self): - # return DockletMonitor.nodecnt - - #def get_meminfo(self): - # return self.get_meminfo_('172.31.0.1') - - def get_meminfo(self): - try: - res = self.info['meminfo'] - except Exception as err: - logger.warning(traceback.format_exc()) - logger.warning(err) - res = {} - return res - - def get_cpuinfo(self): - try: - res = self.info['cpuinfo'] - except Exception as err: - logger.warning(traceback.format_exc()) - logger.warning(err) - res = {} - return res - - def get_cpuconfig(self): - try: - res = self.info['cpuconfig'] - except Exception as err: - logger.warning(traceback.format_exc()) - logger.warning(err) - res = {} - return res - - def get_diskinfo(self): - try: - res = self.info['diskinfo'] - except Exception as err: - logger.warning(traceback.format_exc()) - logger.warning(err) - res = {} - return res - - def get_osinfo(self): - try: - res = self.info['osinfo'] - except Exception as err: - logger.warning(traceback.format_exc()) - logger.warning(err) - res = {} - return res - - def get_concpuinfo(self): - try: - res = self.info['concpupercent'] - except Exception as err: - logger.warning(traceback.format_exc()) - logger.warning(err) - res = {} - return res - - def get_containers(self): - try: - res = self.info['containers'] - except Exception as err: - logger.warning(traceback.format_exc()) - logger.warning(err) - res = {} - return res - - def get_status(self): - try: - isexist = self.info['running'] - except Exception as err: - logger.warning(traceback.format_exc()) - logger.warning(err) - isexist = False - if(isexist): - return 'RUNNING' - else: - return 'STOPPED' - - def get_containerslist(self): - try: - res = self.info['containerslist'] - except Exception as err: - logger.warning(traceback.format_exc()) - logger.warning(err) - res = {} - return res - # To record data when the status of containers change class History_Manager: diff --git a/src/worker/worker.py b/src/worker/worker.py index 7121090..88839c7 100755 --- a/src/worker/worker.py +++ b/src/worker/worker.py @@ -20,7 +20,7 @@ import threading from utils import etcdlib, proxytool from worker import container, monitor from utils.nettools import netcontrol,ovscontrol,portcontrol -from worker.lvmtool import new_group, recover_group +from utils.lvmtool import new_group, recover_group from master import network ##################################################################