Merge pull request #386 from FirmlyReality/releasecluster

Releasecluster
This commit is contained in:
Yujian Zhu 2019-05-08 18:39:35 +08:00 committed by GitHub
commit 2eb187c066
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 199 additions and 10 deletions

View File

@ -183,6 +183,16 @@
# Only when you deploy docklet on the cloud can you set it to True # Only when you deploy docklet on the cloud can you set it to True
# ALLOW_SCALE_OUT=False # ALLOW_SCALE_OUT=False
# WARNING_DAYS: user will receive a warning email for releasing
# when his/her vcluster has been stopped for more then the days.
# Default: 7
# WARNING_DAYS=7
# RELEASE_DAYS: the vcluster will be released when it has been
# stopped for more then the days. Needs to be larger then WARNING_DAYS.
# Default: 14
# RELEASE_DAYS=14
# ================================================== # ==================================================
# #
# Batch Config # Batch Config

View File

@ -29,7 +29,7 @@ from socketserver import ThreadingMixIn
from utils import etcdlib, imagemgr from utils import etcdlib, imagemgr
from master import nodemgr, vclustermgr, notificationmgr, lockmgr, cloudmgr, jobmgr, taskmgr from master import nodemgr, vclustermgr, notificationmgr, lockmgr, cloudmgr, jobmgr, taskmgr
from utils.logs import logs from utils.logs import logs
from master import userManager, beansapplicationmgr, monitor, sysmgr, network from master import userManager, beansapplicationmgr, monitor, sysmgr, network, releasemgr
from worker.monitor import History_Manager from worker.monitor import History_Manager
import threading import threading
import requests import requests
@ -1162,6 +1162,10 @@ if __name__ == '__main__':
G_imagemgr = imagemgr.ImageMgr() G_imagemgr = imagemgr.ImageMgr()
logger.info("imagemgr started") logger.info("imagemgr started")
G_releasemgr = releasemgr.ReleaseMgr(G_vclustermgr,G_ulockmgr)
G_releasemgr.start()
logger.info("releasemgr started")
logger.info("startting to listen on: ") logger.info("startting to listen on: ")
masterip = env.getenv('MASTER_IP') masterip = env.getenv('MASTER_IP')
logger.info("using MASTER_IP %s", masterip) logger.info("using MASTER_IP %s", masterip)

View File

@ -1,6 +1,6 @@
#!/usr/bin/python3 #!/usr/bin/python3
import json, sys, netifaces, threading import json, sys, netifaces, threading, traceback
from utils.nettools import netcontrol,ovscontrol from utils.nettools import netcontrol,ovscontrol
from utils.log import logger from utils.log import logger
@ -507,6 +507,7 @@ class NetworkMgr(object):
self.user_locks.release() self.user_locks.release()
def del_usrgwbr(self, username, uid, nodemgr): def del_usrgwbr(self, username, uid, nodemgr):
self.load_usrgw(username)
if username not in self.usrgws.keys(): if username not in self.usrgws.keys():
return [False, "user does't have gateway or user doesn't exist."] return [False, "user does't have gateway or user doesn't exist."]
ip = self.usrgws[username] ip = self.usrgws[username]
@ -539,7 +540,7 @@ class NetworkMgr(object):
del self.users[username] del self.users[username]
return [True, 'delete user success'] return [True, 'delete user success']
except Exception as ex: except Exception as ex:
logger.error(str(ex)) logger.error(traceback.format_exc())
return [False, str(ex)] return [False, str(ex)]
finally: finally:
self.user_locks.release() self.user_locks.release()

137
src/master/releasemgr.py Normal file
View File

@ -0,0 +1,137 @@
import threading, time, requests, json, traceback
from utils import env
from utils.log import logger
from utils.model import db, VCluster, Container
import smtplib, datetime
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
from master.settings import settings
userpoint = "http://" + env.getenv('USER_IP') + ":" + str(env.getenv('USER_PORT'))
def post_to_user(url = '/', data={}):
return requests.post(userpoint+url,data=data).json()
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
class ReleaseMgr(threading.Thread):
def __init__(self, vclustermgr, ulockmgr, check_interval=_ONE_DAY_IN_SECONDS):
threading.Thread.__init__(self)
self.thread_stop = False
self.vclustermgr = vclustermgr
self.ulockmgr = ulockmgr
self.check_interval = check_interval
self.warning_days = int(env.getenv("WARNING_DAYS"))
self.release_days = int(env.getenv("RELEASE_DAYS"))
if self.release_days <= self.warning_days:
self.release_days = self.warning_days+1
logger.info("[ReleaseMgr] start withe warning_days=%d release_days=%d"%(self.warning_days, self.release_days))
def _send_email(self, to_address, username, vcluster, days, is_released=True):
email_from_address = settings.get('EMAIL_FROM_ADDRESS')
if (email_from_address in ['\'\'', '\"\"', '']):
return
text = '<html><h4>Dear '+ username + ':</h4>'
st_str = vcluster.stop_time.strftime("%Y-%m-%d %H:%M:%S")
text += '''<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Your workspace/vcluster(name:%s id:%d) in <a href='%s'>%s</a>
has been stopped more than %d days now(stopped at:%s). </p>
''' % (vcluster.clustername, vcluster.clusterid, env.getenv("PORTAL_URL"), env.getenv("PORTAL_URL"), days, st_str)
if is_released:
text += '''<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Therefore, the workspace/vcluster has been released now.</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<b>And the data in it couldn't be recoverd</b> unless you save it.</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;You can create new workspace/vcluster if you need.</p>
'''
else:
#day_d = self.release_days - (datetime.datetime.now() - vcluster.stop_time).days
release_date = vcluster.stop_time + datetime.timedelta(days=self.release_days)
day_d = (release_date - datetime.datetime.now()).days
rd_str = release_date.strftime("%Y-%m-%d %H:%M:%S")
text += '''<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;It will be released after <b>%s(in about %d days)</b>.</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<b>And the data in it couldn't be recoverd after releasing.</b></p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Please start or save it before <b>%s(in about %d days)</b> if you want to keep the data.</p>
''' % (rd_str, day_d, rd_str, day_d)
text += '''<br>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Note: DO NOT reply to this email!</p>
<br><br>
<p> <a href='http://docklet.unias.org'>Docklet Team</a>, SEI, PKU</p>
'''
subject = 'Docklet workspace/vcluster releasing alert'
msg = MIMEMultipart()
textmsg = MIMEText(text,'html','utf-8')
msg['Subject'] = Header(subject, 'utf-8')
msg['From'] = email_from_address
msg['To'] = to_address
msg.attach(textmsg)
s = smtplib.SMTP()
s.connect()
try:
s.sendmail(email_from_address, to_address, msg.as_string())
except Exception as err:
logger.error(traceback.format_exc())
s.close()
def run(self):
while not self.thread_stop:
logger.info("[ReleaseMgr] Begin checking each vcluster if it needs to be released...")
auth_key = env.getenv('AUTH_KEY')
res = post_to_user("/master/user/groupinfo/", {'auth_key':auth_key})
groups = json.loads(res['groups'])
quotas = {}
for group in groups:
quotas[group['name']] = group['quotas']
vcs = VCluster.query.filter_by(status='stopped').all()
#logger.info(str(vcs))
for vc in vcs:
if vc.stop_time is None:
continue
days = (datetime.datetime.now() - vc.stop_time).days
if days >= self.release_days:
logger.info("[ReleaseMgr] VCluster(id:%d,user:%s) has been stopped(%s) for more than %d days, it will be released."
% (vc.clusterid, vc.ownername, vc.stop_time.strftime("%Y-%m-%d %H:%M:%S"), self.release_days))
rc_info = post_to_user("/master/user/recoverinfo/", {'username':vc.ownername,'auth_key':auth_key})
logger.info("[ReleaseMgr] %s"%str(rc_info))
groupname = rc_info['groupname']
user_info = {"data":{"id":rc_info['uid'],"group":groupname,"groupinfo":quotas[groupname]}}
self.ulockmgr.acquire(vc.ownername)
try:
[status, usage_info] = self.vclustermgr.get_clustersetting(vc.clustername, vc.ownername, "all", True)
success, msg = self.vclustermgr.delete_cluster(vc.clustername, vc.ownername, json.dumps(user_info))
if not success:
logger.error("[ReleaseMgr] Can't release VCluster(id:%d,user:%s) for %s"%(vc.clusterid, vc.ownername, msg))
else:
if status:
logger.info("[ReleaseMgr] Release Quota.")
post_to_user("/master/user/usageRelease/", {'auth_key':auth_key,'username':vc.ownername,
'cpu':usage_info['cpu'], 'memory':usage_info['memory'],'disk':usage_info['disk']})
self._send_email(rc_info['email'], vc.ownername, vc, self.release_days)
logger.info("[ReleaseMgr] Succeed to releasing VCluster(id:%d,user:%s) for %s. Send mail to info."%(vc.clusterid, vc.ownername, msg))
except Exception as err:
logger.error(traceback.format_exc())
finally:
self.ulockmgr.release(vc.ownername)
elif days >= self.warning_days:
logger.info("[ReleaseMgr] VCluster(id:%d,user:%s) has been stopped(%s) for more than %d days. A warning email will be sent to the user."
% (vc.clusterid, vc.ownername, vc.stop_time.strftime("%Y-%m-%d %H:%M:%S"), self.warning_days))
if vc.is_warned:
logger.info("[ReleaseMgr] VCluster(id:%d,user:%s) has been warned before. Skip it."% (vc.clusterid, vc.ownername))
continue
rc_info = post_to_user("/master/user/recoverinfo/", {'username':vc.ownername,'auth_key':auth_key})
logger.info("[ReleaseMgr] %s"%str(rc_info))
self._send_email(rc_info['email'], vc.ownername, vc, self.warning_days, False)
vc.is_warned = True
try:
db.session.commit()
except Exception as err:
db.session.rollback()
logger.warning(traceback.format_exc())
time.sleep(self.check_interval)
def stop(self):
self.thread_stop = True
return

View File

@ -179,7 +179,7 @@ class TaskMgr(threading.Thread):
self.free_nets = [] self.free_nets = []
for i in range(0, (1 << (32-self.batch_cidr)) - 1, (1 << self.task_cidr)): for i in range(0, (1 << (32-self.batch_cidr)) - 1, (1 << self.task_cidr)):
self.free_nets.append(i) self.free_nets.append(i)
self.logger.info("Free nets addresses pool %s" % str(self.free_nets)) #self.logger.info("Free nets addresses pool %s" % str(self.free_nets))
self.logger.info("Each Batch Net CIDR:%s"%(str(self.task_cidr))) self.logger.info("Each Batch Net CIDR:%s"%(str(self.task_cidr)))
def data_lock(lockname): def data_lock(lockname):

View File

@ -624,6 +624,7 @@ class VclusterMgr(object):
[status,vcluster] = self.get_vcluster(clustername,username) [status,vcluster] = self.get_vcluster(clustername,username)
vcluster.status ='running' vcluster.status ='running'
vcluster.start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") vcluster.start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
vcluster.is_warned = False
if not db_commit(): if not db_commit():
return [False, "Commit Errror"] return [False, "Commit Errror"]
return [True, "start cluster"] return [True, "start cluster"]
@ -707,6 +708,7 @@ class VclusterMgr(object):
[status, vcluster] = self.get_vcluster(clustername, username) [status, vcluster] = self.get_vcluster(clustername, username)
vcluster.status = 'stopped' vcluster.status = 'stopped'
vcluster.start_time ="------" vcluster.start_time ="------"
vcluster.stop_time = datetime.datetime.now()
if not db_commit(): if not db_commit():
return [False, "Commit Errror"] return [False, "Commit Errror"]
return [True, "stop cluster"] return [True, "stop cluster"]

View File

@ -79,6 +79,10 @@ def getenv(key):
return os.environ.get("ALLOCATED_PORTS","10000-65535") return os.environ.get("ALLOCATED_PORTS","10000-65535")
elif key =="ALLOW_SCALE_OUT": elif key =="ALLOW_SCALE_OUT":
return os.environ.get("ALLOW_SCALE_OUT", "False") return os.environ.get("ALLOW_SCALE_OUT", "False")
elif key == "WARNING_DAYS":
return os.environ.get("WARNING_DAYS", "7")
elif key == "RELEASE_DAYS":
return os.environ.get("RELEASE_DAYS", "14")
elif key == "BATCH_ON": elif key == "BATCH_ON":
return os.environ.get("BATCH_ON","True") return os.environ.get("BATCH_ON","True")
elif key == "BATCH_MASTER_PORT": elif key == "BATCH_MASTER_PORT":

View File

@ -1,5 +1,8 @@
import sys
if sys.path[0].endswith("utils"):
sys.path[0] = sys.path[0][:-5]
from flask_migrate import Migrate,MigrateCommand from flask_migrate import Migrate,MigrateCommand
from model import * from utils.model import *
from flask_script import Manager from flask_script import Manager
from flask import Flask from flask import Flask

View File

@ -160,7 +160,7 @@ class UserUsage(db.Model):
self.disk = '0' self.disk = '0'
def __repr__(self): def __repr__(self):
return '<UserUsage %r>' % self.name return '<UserUsage %s>cpu:%s memory:%s disk:%s' % (self.username,self.cpu,self.memory,self.disk)
class Notification(db.Model): class Notification(db.Model):
id = db.Column(db.Integer, primary_key=True) id = db.Column(db.Integer, primary_key=True)
@ -378,6 +378,8 @@ class VCluster(db.Model):
nextcid = db.Column(db.Integer) nextcid = db.Column(db.Integer)
create_time = db.Column(db.DateTime) create_time = db.Column(db.DateTime)
start_time = db.Column(db.String(20)) start_time = db.Column(db.String(20))
stop_time = db.Column(db.DateTime)
is_warned = db.Column(db.Boolean)
proxy_server_ip = db.Column(db.String(20)) proxy_server_ip = db.Column(db.String(20))
proxy_public_ip = db.Column(db.String(20)) proxy_public_ip = db.Column(db.String(20))
port_mapping = db.relationship('PortMapping', backref='v_cluster', lazy='dynamic') port_mapping = db.relationship('PortMapping', backref='v_cluster', lazy='dynamic')
@ -397,6 +399,8 @@ class VCluster(db.Model):
self.billing_history = [] self.billing_history = []
self.create_time = datetime.now() self.create_time = datetime.now()
self.start_time = "------" self.start_time = "------"
self.stop_time = datetime.now()
self.is_warned = False
def __repr__(self): def __repr__(self):
info = {} info = {}
@ -410,6 +414,11 @@ class VCluster(db.Model):
info["nextcid"] = self.nextcid info["nextcid"] = self.nextcid
info["create_time"] = self.create_time.strftime("%Y-%m-%d %H:%M:%S") info["create_time"] = self.create_time.strftime("%Y-%m-%d %H:%M:%S")
info["start_time"] = self.start_time info["start_time"] = self.start_time
if self.stop_time is None:
info['stop_time'] = "------"
else:
info['stop_time'] = self.stop_time.strftime("%Y-%m-%d %H:%M:%S")
info["is_warned"] = self.is_warned
info["containers"] = [dict(eval(str(con))) for con in self.containers] info["containers"] = [dict(eval(str(con))) for con in self.containers]
info["port_mapping"] = [dict(eval(str(pm))) for pm in self.port_mapping] info["port_mapping"] = [dict(eval(str(pm))) for pm in self.port_mapping]
info["billing_history"] = [dict(eval(str(bh))) for bh in self.billing_history] info["billing_history"] = [dict(eval(str(bh))) for bh in self.billing_history]
@ -517,7 +526,7 @@ class Batchtask(db.Model):
self.running_time = 0 self.running_time = 0
self.billing = 0 self.billing = 0
self.tried_times = 0 self.tried_times = 0
def __repr__(self): def __repr__(self):
info = {} info = {}
info['id'] = self.id info['id'] = self.id

View File

@ -372,7 +372,7 @@ class Container_Collector(threading.Thread):
# deal with network used data # deal with network used data
containerids = re.split("-",container_name) containerids = re.split("-",container_name)
if not is_batch and len(containerids) >= 3: if not is_batch and len(containerids) >= 3 and (containerids[1] + "-" + containerids[2]) in self.net_stats.keys():
workercinfo[container_name]['net_stats'] = self.net_stats[containerids[1] + '-' + containerids[2]] workercinfo[container_name]['net_stats'] = self.net_stats[containerids[1] + '-' + containerids[2]]
#logger.info(workercinfo[container_name]['net_stats']) #logger.info(workercinfo[container_name]['net_stats'])
@ -658,7 +658,11 @@ class History_Manager:
laststopruntime[vnode_name] = runtime laststopruntime[vnode_name] = runtime
vnode.laststopruntime = runtime vnode.laststopruntime = runtime
db.session.add(history) db.session.add(history)
db.session.commit() try:
db.session.commit()
except Exception as err:
db.session.rollback()
logger.warning(traceback.format_exc())
def getHistory(self,vnode_name): def getHistory(self,vnode_name):
vnode = VNode.query.filter_by(name=vnode_name).first() vnode = VNode.query.filter_by(name=vnode_name).first()

View File

@ -314,7 +314,7 @@ def get_master_recoverinfo():
return json.dumps({'success':'false', 'message':'username field is required.'}) return json.dumps({'success':'false', 'message':'username field is required.'})
else: else:
user = User.query.filter_by(username=username).first() user = User.query.filter_by(username=username).first()
return json.dumps({'success':'true', 'uid':user.id, 'groupname':user.user_group}) return json.dumps({'success':'true', 'uid':user.id, 'email':user.e_mail, 'groupname':user.user_group})
@app.route("/master/user/groupinfo/", methods=['POST']) @app.route("/master/user/groupinfo/", methods=['POST'])
@auth_key_required @auth_key_required
@ -325,6 +325,21 @@ def get_master_groupinfo():
groupfile.close() groupfile.close()
return json.dumps({'success':'true', 'groups':json.dumps(groups)}) return json.dumps({'success':'true', 'groups':json.dumps(groups)})
@app.route("/master/user/usageRelease/", methods=['POST'])
@auth_key_required
def usageRelease_master():
global G_usermgr
logger.info("handle request: /master/user/usageRelease/")
form = request.form
user = form.get("username",None)
cur_user = User.query.filter_by(username=user).first()
if user is None or cur_user is None:
return json.dumps({'success':'false', 'message':'Null username field or user does not exist.'})
G_lockmgr.acquire('__usage_'+str(user))
result = G_usermgr.usageRelease(cur_user = cur_user, cpu = form.get('cpu'), memory = form.get('memory'), disk = form.get('disk'))
G_lockmgr.release('__usage_'+str(user))
return json.dumps(result)
@app.route("/user/selfModify/", methods=['POST']) @app.route("/user/selfModify/", methods=['POST'])
@login_required @login_required
def selfModify_user(cur_user, user, form): def selfModify_user(cur_user, user, form):