Merge remote-tracking branch 'upstream/master'

This commit is contained in:
zhongyehong 2018-04-22 14:06:43 +08:00
commit d5bc0e583d
7 changed files with 397 additions and 216 deletions

1
.gitignore vendored
View File

@ -6,3 +6,4 @@ __temp
.DS_Store
docklet.conf
home.html
src/migrations/

View File

@ -16,7 +16,7 @@ fi
# some packages' name maybe different in debian
apt-get install -y cgmanager lxc lxcfs lxc-templates lvm2 bridge-utils curl exim4 openssh-server openvswitch-switch
apt-get install -y python3 python3-netifaces python3-flask python3-flask-sqlalchemy python3-pampy python3-httplib2
apt-get install -y python3-psutil
apt-get install -y python3-psutil python3-flask-migrate
apt-get install -y python3-lxc
apt-get install -y python3-requests python3-suds
apt-get install -y nodejs nodejs-legacy npm
@ -60,7 +60,7 @@ if [ ! -d /opt/docklet/local/basefs ]; then
echo "Generating basefs"
wget -P /opt/docklet/local http://iwork.pku.edu.cn:1616/basefs-0.11.tar.bz2 && tar xvf /opt/docklet/local/basefs-0.11.tar.bz2 -C /opt/docklet/local/ > /dev/null
[ $? != "0" ] && echo "Generate basefs failed, please download it from http://unias.github.io/docklet/download to FS_PREFIX/local and then extract it using root. (defalut FS_PRERIX is /opt/docklet)"
fi
fi
echo "Some packagefs can be downloaded from http://unias.github.io/docklet.download"
echo "you can download the packagefs and extract it to FS_PREFIX/local using root. (default FS_PREFIX is /opt/docklet"

View File

@ -412,11 +412,31 @@ def delete_image(user, beans, form):
@login_required
def copy_image(user, beans, form):
global G_imagemgr
global G_ulockmgr
image = form.get('image', None)
target = form.get('target',None)
res = G_imagemgr.copyImage(user,image,target)
token = form.get('token',None)
G_ulockmgr.acquire(user)
res = G_imagemgr.copyImage(user,image,token,target)
G_ulockmgr.release(user)
return json.dumps(res)
@app.route("/image/copytarget/", methods=['POST'])
@login_required
@auth_key_required
def copytarget_image(user, beans, form):
global G_imagemgr
global G_ulockmgr
imagename = form.get('imagename',None)
description = form.get('description',None)
try:
G_ulockmgr.acquire(user)
res = G_imagemgr.updateinfo(user,imagename,description)
G_ulockmgr.release(user)
except Exception as e:
logger.error(e)
return json.dumps({'success':'false', 'message':str(e)})
return json.dumps({'success':'true', 'action':'copy image to target.'})
@app.route("/addproxy/", methods=['POST'])
@login_required

View File

@ -10,7 +10,7 @@ design:
out of time.
4. We can show every user their own images and the images are shared by other. User can new a
cluster or scale out a new node by them. And user can remove his own images.
5. When a remove option occur, the image server will delete it. But some physical host may
5. When a remove option occur, the image server will delete it. But some physical host may
also maintain it. I think it doesn't matter.
6. The manage of lvm has been including in this module.
"""
@ -20,47 +20,57 @@ from configparser import ConfigParser
from io import StringIO
import os,sys,subprocess,time,re,datetime,threading,random
import xmlrpc.client
from model import db, Image
from log import logger
import env
from lvmtool import *
import updatebase
import requests
master_port = str(env.getenv('MASTER_PORT'))
class ImageMgr():
#def sys_call(self,command):
# output = subprocess.getoutput(command).strip()
# return None if output == '' else output
def sys_return(self,command):
return_value = subprocess.call(command,shell=True)
return return_value
def __init__(self):
self.NFS_PREFIX = env.getenv('FS_PREFIX')
self.NFS_PREFIX = env.getenv('FS_PREFIX')
self.imgpath = self.NFS_PREFIX + "/global/images/"
self.srcpath = env.getenv('DOCKLET_LIB') + "/"
self.srcpath = env.getenv('DOCKLET_LIB') + "/"
self.imageserver = "192.168.6.249"
def datetime_toString(self,dt):
return dt.strftime("%Y-%m-%d %H:%M:%S")
def string_toDatetime(self,string):
return datetime.datetime.strptime(string, "%Y-%m-%d %H:%M:%S")
def updateinfo(self,imgpath,image,description):
image_info_file = open(imgpath+"."+image+".info",'w')
def updateinfo(self,user,imagename,description):
'''image_info_file = open(imgpath+"."+image+".info",'w')
image_info_file.writelines([self.datetime_toString(datetime.datetime.now()) + "\n", "unshare"])
image_info_file.close()
image_description_file = open(imgpath+"."+image+".description", 'w')
image_description_file.write(description)
image_description_file.close()
image_description_file.close()'''
image = Image.query.filter_by(ownername=user,imagename=imagename).first()
if image is None:
newimage = Image(imagename,True,False,user,description)
db.session.add(newimage)
db.session.commit()
def dealpath(self,fspath):
if fspath[-1:] == "/":
return self.dealpath(fspath[:-1])
else:
return fspath
def createImage(self,user,image,lxc,description="Not thing", imagenum=10):
fspath = self.NFS_PREFIX + "/local/volume/" + lxc
imgpath = self.imgpath + "private/" + user + "/"
@ -80,14 +90,14 @@ class ImageMgr():
sys_run("tar -cvf %s -C %s ." % (imgpath+image+".tz",self.dealpath(fspath)), True)
except Exception as e:
logger.error(e)
#try:
#try:
#sys_run("cp %s %s" % (tmppath+tmpimage, imgpath+image+".tz"), True)
#sys_run("rsync -a --delete --exclude=lost+found/ --exclude=root/nfs/ --exclude=dev/ --exclude=mnt/ --exclude=tmp/ --exclude=media/ --exclude=proc/ --exclude=sys/ %s/ %s/" % (self.dealpath(fspath),imgpath+image),True)
#except Exception as e:
# logger.error(e)
#sys_run("rm -f %s" % tmppath+tmpimage, True)
#sys_run("rm -f %s" % tmppath+tmpimage, True)
#sys_run("rm -f %s" % (imgpath+"."+image+"_docklet_share"),True)
self.updateinfo(imgpath,image,description)
self.updateinfo(user,image,description)
logger.info("image:%s from LXC:%s create success" % (image,lxc))
return [True, "create image success"]
@ -116,8 +126,8 @@ class ImageMgr():
#self.sys_call("rsync -a --delete --exclude=nfs/ %s/ %s/" % (imgpath+image,self.dealpath(fspath)))
#self.updatetime(imgpath,image)
return
return
def prepareFS(self,user,image,lxc,size="1000",vgname="docklet-group"):
rootfs = "/var/lib/lxc/%s/rootfs" % lxc
layer = self.NFS_PREFIX + "/local/volume/" + lxc
@ -130,14 +140,14 @@ class ImageMgr():
if Ret.returncode == 0:
logger.info("%s not clean" % layer)
sys_run("umount -l %s" % layer)
try:
sys_run("rm -rf %s %s" % (rootfs, layer))
sys_run("mkdir -p %s %s" % (rootfs, layer))
except Exception as e:
logger.error(e)
#prepare volume
if check_volume(vgname,lxc):
logger.info("volume %s already exists, delete it")
@ -145,7 +155,7 @@ class ImageMgr():
if not new_volume(vgname,lxc,size):
logger.error("volume %s create failed" % lxc)
return False
try:
sys_run("mkfs.ext4 /dev/%s/%s" % (vgname,lxc),True)
sys_run("mount /dev/%s/%s %s" %(vgname,lxc,layer),True)
@ -184,7 +194,7 @@ class ImageMgr():
logger.error(e)
return True
def detachFS(self, lxc, vgname="docklet-group"):
rootfs = "/var/lib/lxc/%s/rootfs" % lxc
Ret = sys_run("umount %s" % rootfs)
@ -192,7 +202,7 @@ class ImageMgr():
logger.error("cannot umount rootfs:%s" % rootfs)
return False
return True
def checkFS(self, lxc, vgname="docklet-group"):
rootfs = "/var/lib/lxc/%s/rootfs" % lxc
layer = self.NFS_PREFIX + "/local/volume/" + lxc
@ -208,90 +218,111 @@ class ImageMgr():
return True
def removeImage(self,user,image):
def removeImage(self,user,imagename):
imgpath = self.imgpath + "private/" + user + "/"
try:
sys_run("rm -rf %s/" % imgpath+image+".tz", True)
sys_run("rm -f %s" % imgpath+"."+image+".info", True)
sys_run("rm -f %s" % (imgpath+"."+image+".description"), True)
image = Image.query.filter_by(imagename=imagename,ownername=user).first()
image.hasPrivate = False
if image.hasPublic == False:
db.session.delete(image)
db.session.commit()
sys_run("rm -rf %s/" % imgpath+imagename+".tz", True)
#sys_run("rm -f %s" % imgpath+"."+image+".info", True)
#sys_run("rm -f %s" % (imgpath+"."+image+".description"), True)
except Exception as e:
logger.error(e)
def shareImage(self,user,image):
def shareImage(self,user,imagename):
imgpath = self.imgpath + "private/" + user + "/"
share_imgpath = self.imgpath + "public/" + user + "/"
image_info_file = open(imgpath+"."+image+".info", 'r')
'''image_info_file = open(imgpath+"."+image+".info", 'r')
[createtime, isshare] = image_info_file.readlines()
isshare = "shared"
image_info_file.close()
image_info_file = open(imgpath+"."+image+".info", 'w')
image_info_file.writelines([createtime, isshare])
image_info_file.close()
sys_run("mkdir -p %s" % share_imgpath, True)
image_info_file.close()'''
try:
sys_run("cp %s %s" % (imgpath+image+".tz", share_imgpath+image+".tz"), True)
image = Image.query.filter_by(imagename=imagename,ownername=user).first()
if image.hasPublic == True:
return
image.hasPublic = True
db.session.commit()
sys_run("mkdir -p %s" % share_imgpath, True)
sys_run("cp %s %s" % (imgpath+imagename+".tz", share_imgpath+imagename+".tz"), True)
#sys_run("rsync -a --delete %s/ %s/" % (imgpath+image,share_imgpath+image), True)
except Exception as e:
logger.error(e)
sys_run("cp %s %s" % (imgpath+"."+image+".info",share_imgpath+"."+image+".info"), True)
sys_run("cp %s %s" % (imgpath+"."+image+".description",share_imgpath+"."+image+".description"), True)
#$sys_run("cp %s %s" % (imgpath+"."+image+".info",share_imgpath+"."+image+".info"), True)
#sys_run("cp %s %s" % (imgpath+"."+image+".description",share_imgpath+"."+image+".description"), True)
def unshareImage(self,user,image):
def unshareImage(self,user,imagename):
public_imgpath = self.imgpath + "public/" + user + "/"
imgpath = self.imgpath + "private/" + user + "/"
if os.path.isfile(imgpath + image + ".tz"):
'''if os.path.isfile(imgpath + image + ".tz"):
image_info_file = open(imgpath+"."+image+".info", 'r')
[createtime, isshare] = image_info_file.readlines()
isshare = "unshare"
image_info_file.close()
image_info_file = open(imgpath+"."+image+".info", 'w')
image_info_file = open(imgpath+"."+image+".info", 'w')
image_info_file.writelines([createtime, isshare])
image_info_file.close()
image_info_file.close()'''
try:
#sys_run("rm -rf %s/" % public_imgpath+image, True)
sys_run("rm -f %s" % public_imgpath+image+".tz", True)
sys_run("rm -f %s" % public_imgpath+"."+image+".info", True)
sys_run("rm -f %s" % public_imgpath+"."+image+".description", True)
image = Image.query.filter_by(imagename=imagename,ownername=user).first()
image.hasPublic = False
if image.hasPrivate == False:
db.session.delete(image)
db.session.commit()
sys_run("rm -f %s" % public_imgpath+imagename+".tz", True)
#sys_run("rm -f %s" % public_imgpath+"."+image+".info", True)
#sys_run("rm -f %s" % public_imgpath+"."+image+".description", True)
except Exception as e:
logger.error(e)
def copyImage(self,user,image,target):
def copyImage(self,user,image,token,target):
path = "/opt/docklet/global/images/private/"+user+"/"
image_info_file = open(path+"."+image+".info", 'r')
'''image_info_file = open(path+"."+image+".info", 'r')
[createtime, isshare] = image_info_file.readlines()
recordshare = isshare
isshare = "unshared"
image_info_file.close()
image_info_file = open(path+"."+image+".info", 'w')
image_info_file.writelines([createtime, isshare])
image_info_file.close()
image_info_file.close()'''
try:
sys_run('ssh root@%s "mkdir -p %s"' % (target,path))
sys_run('scp %s%s.tz root@%s:%s' % (path,image,target,path))
sys_run('scp %s.%s.description root@%s:%s' % (path,image,target,path))
sys_run('scp %s.%s.info root@%s:%s' % (path,image,target,path))
#sys_run('scp %s.%s.description root@%s:%s' % (path,image,target,path))
#sys_run('scp %s.%s.info root@%s:%s' % (path,image,target,path))
resimage = Image.query.filter_by(ownername=user,imagename=image).first()
auth_key = env.getenv('AUTH_KEY')
url = "http://" + target + ":" + master_port + "/image/copytarget/"
data = {"token":token,"auth_key":auth_key,"user":user,"imagename":image,"description":resimage.description}
result = requests.post(url, data=data).json()
logger.info("Response from target master: " + str(result))
except Exception as e:
logger.error(e)
image_info_file = open(path+"."+image+".info", 'w')
'''image_info_file = open(path+"."+image+".info", 'w')
image_info_file.writelines([createtime, recordshare])
image_info_file.close()
image_info_file.close()'''
return {'success':'false', 'message':str(e)}
image_info_file = open(path+"."+image+".info", 'w')
'''image_info_file = open(path+"."+image+".info", 'w')
image_info_file.writelines([createtime, recordshare])
image_info_file.close()
image_info_file.close()'''
logger.info("copy image %s of %s to %s success" % (image,user,target))
return {'success':'true', 'action':'copy image'}
def update_basefs(self,image):
def update_basefs(self,imagename):
imgpath = self.imgpath + "private/root/"
basefs = self.NFS_PREFIX+"/local/packagefs/"
tmppath = self.NFS_PREFIX + "/local/tmpimg/"
tmpimage = str(random.randint(0,10000000))
try:
sys_run("mkdir -p %s" % tmppath+tmpimage)
sys_run("tar -C %s -xvf %s" % (tmppath+tmpimage,imgpath+image+".tz"),True)
sys_run("tar -C %s -xvf %s" % (tmppath+tmpimage,imgpath+imagename+".tz"),True)
logger.info("start updating base image")
updatebase.aufs_update_base(tmppath+tmpimage, basefs)
logger.info("update base image success")
@ -299,7 +330,7 @@ class ImageMgr():
logger.error(e)
sys_run("rm -rf %s" % tmppath+tmpimage)
return True
def update_base_image(self, user, vclustermgr, image):
if not user == "root":
logger.info("only root can update base image")
@ -317,8 +348,8 @@ class ImageMgr():
#logger.info("recover all cluster success")
return [True, "update base image"]
def get_image_info(self, user, image, imagetype):
if imagetype == "private":
def get_image_info(self, user, imagename, imagetype):
'''if imagetype == "private":
imgpath = self.imgpath + "private/" + user + "/"
else:
imgpath = self.imgpath + "public/" + user + "/"
@ -327,20 +358,28 @@ class ImageMgr():
image_info_file.close()
image_description_file = open(imgpath+"."+image+".description",'r')
description = image_description_file.read()
image_description_file.close()
image_description_file.close()'''
image = Image.query.filter_by(imagename=imagename,ownername=user).first()
if image is None:
return ["", ""]
time = image.create_time.strftime("%Y-%m-%d %H:%M:%S")
description = image.description
if len(description) > 15:
description = description[:15] + "......"
return [time, description]
def get_image_description(self, user, image):
if image['type'] == "private":
'''if image['type'] == "private":
imgpath = self.imgpath + "private/" + user + "/"
else:
imgpath = self.imgpath + "public/" + image['owner'] + "/"
image_description_file = open(imgpath+"."+image['name']+".description", 'r')
description = image_description_file.read()
image_description_file.close()
return description
image_description_file.close()'''
image = Image.query.filter_by(imagename=image['name'],ownername=image['owner']).first()
if image is None:
return ""
return image.description
def list_images(self,user):
images = {}
@ -392,13 +431,16 @@ class ImageMgr():
logger.error(e)
return images
def isshared(self,user,image):
imgpath = self.imgpath + "private/" + user + "/"
def isshared(self,user,imagename):
'''imgpath = self.imgpath + "private/" + user + "/"
image_info_file = open(imgpath+"."+image+".info",'r')
[time, isshare] = image_info_file.readlines()
image_info_file.close()
if isshare == "shared":
image_info_file.close()'''
image = Image.query.filter_by(imagename=imagename,ownername=user).first()
if image is None:
return ""
if image.hasPublic == True:
return "true"
else:
return "false"

11
src/manage.py Normal file
View File

@ -0,0 +1,11 @@
from flask_migrate import Migrate,MigrateCommand
from model import *
from flask_script import Manager
from flask import Flask
migrate = Migrate(app,db)
manager = Manager(app)
manager.add_command('db',MigrateCommand)
if __name__ == '__main__':
manager.run()

View File

@ -28,7 +28,7 @@ from flask import Flask
from flask.ext.sqlalchemy import SQLAlchemy
from datetime import datetime
from base64 import b64encode, b64decode
import os
import os, json
#this class from itsdangerous implements token<->user
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
@ -42,8 +42,10 @@ app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///'+fsdir+'/global/sys/UserTable.db'
app.config['SQLALCHEMY_BINDS'] = {
'history': 'sqlite:///'+fsdir+'/global/sys/HistoryTable.db',
'beansapplication': 'sqlite:///'+fsdir+'/global/sys/BeansApplication.db'
'beansapplication': 'sqlite:///'+fsdir+'/global/sys/BeansApplication.db',
'system': 'sqlite:///'+fsdir+'/global/sys/System.db'
}
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
try:
secret_key_file = open(env.getenv('FS_PREFIX') + '/local/token_secret_key.txt')
app.secret_key = secret_key_file.read()
@ -268,3 +270,115 @@ class ApplyMsg(db.Model):
def __repr__(self):
return "{\"id\":\"%d\", \"username\":\"%s\", \"number\": \"%d\", \"reason\":\"%s\", \"status\":\"%s\", \"time\":\"%s\"}" % (self.id, self.username, self.number, self.reason, self.status, self.time.strftime("%Y-%m-%d %H:%M:%S"))
class Container(db.Model):
__bind_key__ = 'system'
containername = db.Column(db.String(100), primary_key=True)
hostname = db.Column(db.String(30))
ip = db.Column(db.String(20))
host = db.Column(db.String(20))
image = db.Column(db.String(50))
lastsave = db.Column(db.DateTime)
setting_cpu = db.Column(db.Integer)
setting_mem = db.Column(db.Integer)
setting_disk = db.Column(db.Integer)
vclusterid = db.Column(db.Integer, db.ForeignKey('v_cluster.clusterid'))
def __init__(self, containername, hostname, ip, host, image, lastsave, setting):
self.containername = containername
self.hostname = hostname
self.ip = ip
self.host = host
self.image = image
self.lastsave = lastsave
self.setting_cpu = int(setting['cpu'])
self.setting_mem = int(setting['memory'])
self.setting_disk = int(setting['disk'])
def __repr__(self):
return "{\"containername\":\"%s\", \"hostname\":\"%s\", \"ip\": \"%s\", \"host\":\"%s\", \"image\":\"%s\", \"lastsave\":\"%s\", \"setting\":{\"cpu\":\"%d\",\"memory\":\"%d\",\"disk\":\"%d\"}}" % (self.containername, self.hostname, self.ip, self.host, self.image, self.lastsave.strftime("%Y-%m-%d %H:%M:%S"), self.setting_cpu, self.setting_mem, self.setting_disk)
class PortMapping(db.Model):
__bind_key__ = 'system'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
node_name = db.Column(db.String(100))
node_ip = db.Column(db.String(20))
node_port = db.Column(db.Integer)
host_port= db.Column(db.Integer)
vclusterid = db.Column(db.Integer, db.ForeignKey('v_cluster.clusterid'))
def __init__(self, node_name, node_ip, node_port, host_port):
self.node_name = node_name
self.node_ip = node_ip
self.node_port = node_port
self.host_port = host_port
def __repr__(self):
return "{\"id\":\"%d\", \"node_name\":\"%s\", \"node_ip\": \"%s\", \"node_port\":\"%s\", \"host_port\":\"%s\"}" % (self.id, self.node_name, self.node_ip, self.node_port, self.host_port)
class VCluster(db.Model):
__bind_key__ = 'system'
clusterid = db.Column(db.BigInteger, primary_key=True, autoincrement=False)
clustername = db.Column(db.String(50))
ownername = db.Column(db.String(20))
status = db.Column(db.String(10))
size = db.Column(db.Integer)
containers = db.relationship('Container', backref='v_cluster', lazy='dynamic')
nextcid = db.Column(db.Integer)
create_time = db.Column(db.DateTime)
start_time = db.Column(db.String(20))
proxy_server_ip = db.Column(db.String(20))
proxy_public_ip = db.Column(db.String(20))
port_mapping = db.relationship('PortMapping', backref='v_cluster', lazy='dynamic')
def __init__(self, clusterid, clustername, ownername, status, size, nextcid, proxy_server_ip, proxy_public_ip):
self.clusterid = clusterid
self.clustername = clustername
self.ownername = ownername
self.status = status
self.size = size
self.nextcid = nextcid
self.proxy_server_ip = proxy_server_ip
self.proxy_public_ip = proxy_public_ip
self.containers = []
self.port_mapping = []
self.create_time = datetime.now()
self.start_time = "------"
def __repr__(self):
info = {}
info["clusterid"] = self.clusterid
info["clustername"] = self.clustername
info["ownername"] = self.ownername
info["status"] = self.status
info["size"] = self.size
info["proxy_server_ip"] = self.proxy_server_ip
info["proxy_public_ip"] = self.proxy_public_ip
info["nextcid"] = self.nextcid
info["create_time"] = self.create_time.strftime("%Y-%m-%d %H:%M:%S")
info["start_time"] = self.start_time
info["containers"] = [dict(eval(str(con))) for con in self.containers]
info["port_mapping"] = [dict(eval(str(pm))) for pm in self.port_mapping]
#return "{\"clusterid\":\"%d\", \"clustername\":\"%s\", \"ownername\": \"%s\", \"status\":\"%s\", \"size\":\"%d\", \"proxy_server_ip\":\"%s\", \"create_time\":\"%s\"}" % (self.clusterid, self.clustername, self.ownername, self.status, self.size, self.proxy_server_ip, self.create_time.strftime("%Y-%m-%d %H:%M:%S"))
return json.dumps(info)
class Image(db.Model):
__bind_key__ = 'system'
imagename = db.Column(db.String(50))
id = db.Column(db.Integer, primary_key=True)
hasPrivate = db.Column(db.Boolean)
hasPublic = db.Column(db.Boolean)
ownername = db.Column(db.String(20))
create_time = db.Column(db.DateTime)
description = db.Column(db.Text)
def __init__(self,imagename,hasPrivate,hasPublic,ownername,description):
self.imagename = imagename
self.hasPrivate = hasPrivate
self.hasPublic = hasPublic
self.ownername = ownername
self.description = description
self.create_time = datetime.now()
def __repr__(self):
return "{\"id\":\"%d\",\"imagename\":\"%s\",\"hasPrivate\":\"%s\",\"hasPublic\":\"%s\",\"ownername\":\"%s\",\"updatetime\":\"%s\",\"description\":\"%s\"}" % (self.id,self.imagename,str(self.hasPrivate),str(self.hasPublic),self.create_time.strftime("%Y-%m-%d %H:%M:%S"),self.ownername,self.description)

View File

@ -9,6 +9,7 @@ import proxytool
import requests, threading
import traceback
from nettools import portcontrol
from model import db, Container, PortMapping, VCluster
userpoint = "http://" + env.getenv('USER_IP') + ":" + str(env.getenv('USER_PORT'))
def post_to_user(url = '/', data={}):
@ -33,14 +34,26 @@ class VclusterMgr(object):
self.fspath = env.getenv("FS_PREFIX")
self.clusterid_locks = threading.Lock()
# check database
try:
Container.query.all()
PortMapping.query.all()
VCluster.query.all()
except:
# create database
db.create_all()
logger.info ("vcluster start on %s" % (self.addr))
if self.mode == 'new':
logger.info ("starting in new mode on %s" % (self.addr))
# check if all clusters data are deleted in httprest.py
clean = True
usersdir = self.fspath+"/global/users/"
vclusters = VCluster.query.all()
if len(vclusters) != 0:
clean = False
for user in os.listdir(usersdir):
if len(os.listdir(usersdir+user+"/clusters")) > 0 or len(os.listdir(usersdir+user+"/hosts")) > 0:
if len(os.listdir(usersdir+user+"/hosts")) > 0:
clean = False
if not clean:
logger.error ("clusters files not clean, start failed")
@ -167,20 +180,26 @@ class VclusterMgr(object):
return [False, message]
logger.info("container create success")
hosts = hosts + ips[i].split("/")[0] + "\t" + hostname + "\t" + hostname + "."+clustername + "\n"
containers.append({ 'containername':lxc_name, 'hostname':hostname, 'ip':ips[i], 'host':workerip, 'image':image['name'], 'lastsave':datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'setting': setting })
containers.append(Container(lxc_name,hostname,ips[i],workerip,image['name'],datetime.datetime.now(),setting))
#containers.append({ 'containername':lxc_name, 'hostname':hostname, 'ip':ips[i], 'host':workerip, 'image':image['name'], 'lastsave':datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'setting': setting })
hostfile = open(hostpath, 'w')
hostfile.write(hosts)
hostfile.close()
clusterfile = open(clusterpath, 'w')
proxy_url = env.getenv("PORTAL_URL") +"/"+ proxy_public_ip +"/_web/" + username + "/" + clustername
info = {'clusterid':clusterid, 'status':'stopped', 'size':clustersize, 'containers':containers, 'nextcid': clustersize, 'create_time':datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'start_time':"------"}
info['proxy_url'] = proxy_url
info['proxy_server_ip'] = proxy_server_ip
info['proxy_public_ip'] = proxy_public_ip
info['port_mapping'] = []
clusterfile.write(json.dumps(info))
clusterfile.close()
return [True, info]
#clusterfile = open(clusterpath, 'w')
vcluster = VCluster(clusterid,clustername,username,'stopped',clustersize,clustersize,proxy_server_ip,proxy_public_ip)
for con in containers:
vcluster.containers.append(con)
db.session.add(vcluster)
db.session.commit()
#proxy_url = env.getenv("PORTAL_URL") +"/"+ proxy_public_ip +"/_web/" + username + "/" + clustername
#info = {'clusterid':clusterid, 'status':'stopped', 'size':clustersize, 'containers':containers, 'nextcid': clustersize, 'create_time':datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'start_time':"------"}
#info['proxy_url'] = proxy_url
#info['proxy_server_ip'] = proxy_server_ip
#info['proxy_public_ip'] = proxy_public_ip
#info['port_mapping'] = []
#clusterfile.write(json.dumps(info))
#clusterfile.close()
return [True, str(vcluster)]
def scale_out_cluster(self,clustername,username, image,user_info, setting):
if not self.is_cluster(clustername,username):
@ -225,12 +244,15 @@ class VclusterMgr(object):
hostfile = open(hostpath, 'a')
hostfile.write(ip.split("/")[0] + "\t" + hostname + "\t" + hostname + "." + clustername + "\n")
hostfile.close()
clusterinfo['nextcid'] = int(clusterinfo['nextcid']) + 1
clusterinfo['size'] = int(clusterinfo['size']) + 1
clusterinfo['containers'].append({'containername':lxc_name, 'hostname':hostname, 'ip':ip, 'host':workerip, 'image':image['name'], 'lastsave':datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'setting': setting})
clusterfile = open(clusterpath, 'w')
clusterfile.write(json.dumps(clusterinfo))
clusterfile.close()
[success,vcluster] = self.get_vcluster(clustername,username)
if not success:
return [False, "Fail to write info."]
vcluster.nextcid = int(clusterinfo['nextcid']) + 1
vcluster.size = int(clusterinfo['size']) + 1
vcluster.containers.append(Container(lxc_name,hostname,ip,workerip,image['name'],datetime.datetime.now(),setting))
#{'containername':lxc_name, 'hostname':hostname, 'ip':ip, 'host':workerip, 'image':image['name'], 'lastsave':datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'setting': setting})
db.session.add(vcluster)
db.session.commit()
return [True, clusterinfo]
def addproxy(self,username,clustername,ip,port):
@ -282,13 +304,13 @@ class VclusterMgr(object):
[success, host_port] = portcontrol.acquire_port_mapping(node_name, node_ip, port)
if not success:
return [False, host_port]
if 'port_mapping' not in clusterinfo.keys():
clusterinfo['port_mapping'] = []
clusterinfo['port_mapping'].append({'node_name':node_name, 'node_ip':node_ip, 'node_port':port, 'host_port':host_port})
clusterfile = open(self.fspath + "/global/users/" + username + "/clusters/" + clustername, 'w')
clusterfile.write(json.dumps(clusterinfo))
clusterfile.close()
return [True, clusterinfo]
[status,vcluster] = self.get_vcluster(clustername,username)
if not status:
return [False,"VCluster not found."]
vcluster.port_mapping.append(PortMapping(node_name,node_ip,port,host_port))
db.session.add(vcluster)
db.session.commit()
return [True, json.loads(str(vcluster))]
def recover_port_mapping(self,username,clustername):
[status, clusterinfo] = self.get_clusterinfo(clustername, username)
@ -303,28 +325,28 @@ class VclusterMgr(object):
return [True, clusterinfo]
def delete_all_port_mapping(self, username, clustername, node_name):
[status, clusterinfo] = self.get_clusterinfo(clustername, username)
[status, vcluster] = self.get_vcluster(clustername, username)
if not status:
return [False,"VCluster not found."]
error_msg = None
delete_list = []
for item in clusterinfo['port_mapping']:
if item['node_name'] == node_name:
node_ip = item['node_ip']
node_port = item['node_port']
for item in vcluster.port_mapping:
if item.node_name == node_name:
node_ip = item.node_ip
node_port = item.node_port
if self.distributedgw == 'True':
worker = self.nodemgr.ip_to_rpc(clusterinfo['proxy_server_ip'])
[success,msg] = worker.release_port_mapping(node_name, node_ip, node_port)
worker = self.nodemgr.ip_to_rpc(vcluster.proxy_server_ip)
[success,msg] = worker.release_port_mapping(node_name, node_ip, str(node_port))
else:
[success,msg] = portcontrol.release_port_mapping(node_name, node_ip, node_port)
[success,msg] = portcontrol.release_port_mapping(node_name, node_ip, str(node_port))
if not success:
error_msg = msg
else:
delete_list.append(item)
if len(delete_list) > 0:
for item in delete_list:
clusterinfo['port_mapping'].remove(item)
clusterfile = open(self.fspath + "/global/users/" + username + "/clusters/" + clustername, 'w')
clusterfile.write(json.dumps(clusterinfo))
clusterfile.close()
db.session.delete(item)
db.session.commit()
else:
return [True,"No port mapping."]
if error_msg is not None:
@ -333,28 +355,27 @@ class VclusterMgr(object):
return [True,"Success"]
def delete_port_mapping(self, username, clustername, node_name, node_port):
[status, clusterinfo] = self.get_clusterinfo(clustername, username)
idx = 0
for item in clusterinfo['port_mapping']:
if item['node_name'] == node_name and item['node_port'] == node_port:
[status, vcluster] = self.get_vcluster(clustername, username)
if not status:
return [False,"VCluster not found."]
for item in vcluster.port_mapping:
if item.node_name == node_name and str(item.node_port) == str(node_port):
node_ip = item.node_ip
node_port = item.node_port
if self.distributedgw == 'True':
worker = self.nodemgr.ip_to_rpc(vcluster.proxy_server_ip)
[success,msg] = worker.release_port_mapping(node_name, node_ip, str(node_port))
else:
[success,msg] = portcontrol.release_port_mapping(node_name, node_ip, str(node_port))
if not success:
return [False,msg]
db.session.delete(item)
print("HHH")
break
idx += 1
if idx == len(clusterinfo['port_mapping']):
return [False,"No port mapping."]
node_ip = clusterinfo['port_mapping'][idx]['node_ip']
node_port = clusterinfo['port_mapping'][idx]['node_port']
if self.distributedgw == 'True':
worker = self.nodemgr.ip_to_rpc(clusterinfo['proxy_server_ip'])
[success,msg] = worker.release_port_mapping(node_name, node_ip, node_port)
else:
[success,msg] = portcontrol.release_port_mapping(node_name, node_ip, node_port)
if not success:
return [False,msg]
clusterinfo['port_mapping'].pop(idx)
clusterfile = open(self.fspath + "/global/users/" + username + "/clusters/" + clustername, 'w')
clusterfile.write(json.dumps(clusterinfo))
clusterfile.close()
return [True, clusterinfo]
return [False,"No port mapping."]
db.session.commit()
return [True, json.loads(str(vcluster))]
def flush_cluster(self,username,clustername,containername):
begintime = datetime.datetime.now()
@ -402,47 +423,47 @@ class VclusterMgr(object):
return [True, "image not exists"]
def create_image(self,username,clustername,containername,imagename,description,imagenum=10):
[status, info] = self.get_clusterinfo(clustername,username)
[status, vcluster] = self.get_vcluster(clustername,username)
if not status:
return [False, "cluster not found"]
containers = info['containers']
containers = vcluster.containers
for container in containers:
if container['containername'] == containername:
if container.containername == containername:
logger.info("container: %s found" % containername)
worker = self.nodemgr.ip_to_rpc(container['host'])
worker = self.nodemgr.ip_to_rpc(container.host)
if worker is None:
return [False, "The worker can't be found or has been stopped."]
res = worker.create_image(username,imagename,containername,description,imagenum)
container['lastsave'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
container['image'] = imagename
container.lastsave = datetime.datetime.now()
container.image = imagename
break
else:
res = [False, "container not found"]
logger.error("container: %s not found" % containername)
clusterpath = self.fspath + "/global/users/" + username + "/clusters/" + clustername
infofile = open(clusterpath, 'w')
infofile.write(json.dumps(info))
infofile.close()
db.session.commit()
return res
def delete_cluster(self, clustername, username, user_info):
[status, info] = self.get_clusterinfo(clustername, username)
[status, vcluster] = self.get_vcluster(clustername, username)
if not status:
return [False, "cluster not found"]
if info['status']=='running':
if vcluster.status =='running':
return [False, "cluster is still running, you need to stop it and then delete"]
ips = []
for container in info['containers']:
worker = self.nodemgr.ip_to_rpc(container['host'])
worker = self.nodemgr.ip_to_rpc(container.host)
if worker is None:
return [False, "The worker can't be found or has been stopped."]
worker.delete_container(container['containername'])
ips.append(container['ip'])
worker.delete_container(container.containername)
db.session.delete(container)
ips.append(container.ip)
logger.info("delete vcluster and release vcluster ips")
self.networkmgr.release_userips(username, ips)
self.networkmgr.printpools()
os.remove(self.fspath+"/global/users/"+username+"/clusters/"+clustername)
os.remove(self.fspath+"/global/users/"+username+"/hosts/"+str(info['clusterid'])+".hosts")
#os.remove(self.fspath+"/global/users/"+username+"/clusters/"+clustername)
db.session.delete(vcluster)
db.session.commit()
os.remove(self.fspath+"/global/users/"+username+"/hosts/"+str(vcluster.clusterid)+".hosts")
groupname = json.loads(user_info)["data"]["group"]
uid = json.loads(user_info)["data"]["id"]
@ -455,29 +476,24 @@ class VclusterMgr(object):
return [True, "cluster delete"]
def scale_in_cluster(self, clustername, username, containername):
[status, info] = self.get_clusterinfo(clustername, username)
[status, vcluster] = self.get_vcluster(clustername, username)
if not status:
return [False, "cluster not found"]
new_containers = []
for container in info['containers']:
if container['containername'] == containername:
worker = self.nodemgr.ip_to_rpc(container['host'])
worker = self.nodemgr.ip_to_rpc(container.host)
if worker is None:
return [False, "The worker can't be found or has been stopped."]
worker.delete_container(containername)
self.networkmgr.release_userips(username, container['ip'])
db.session.delete(container)
self.networkmgr.release_userips(username, container.ip)
self.networkmgr.printpools()
else:
new_containers.append(container)
info['containers'] = new_containers
info['size'] -= 1
vcluster.size -= 1
cid = containername[containername.rindex("-")+1:]
clusterid = info['clusterid']
clusterpath = self.fspath + "/global/users/" + username + "/clusters/" + clustername
clusterid = vcluster.clusterid
hostpath = self.fspath + "/global/users/" + username + "/hosts/" + str(clusterid) + ".hosts"
clusterfile = open(clusterpath, 'w')
clusterfile.write(json.dumps(info))
clusterfile.close()
db.session.commit()
hostfile = open(hostpath, 'r')
hostinfo = hostfile.readlines()
hostfile.close()
@ -499,29 +515,24 @@ class VclusterMgr(object):
return [True, info]
def get_clustersetting(self, clustername, username, containername, allcontainer):
clusterpath = self.fspath + "/global/users/" + username + "/clusters/" + clustername
if not os.path.isfile(clusterpath):
[status,vcluster] = self.get_vcluster(clustername,username)
if vcluster is None:
logger.error("cluster file: %s not found" % clustername)
return [False, "cluster file not found"]
infofile = open(clusterpath, 'r')
info = json.loads(infofile.read())
infofile.close()
cpu = 0
memory = 0
disk = 0
if allcontainer:
for container in info['containers']:
if 'setting' in container:
cpu += int(container['setting']['cpu'])
memory += int(container['setting']['memory'])
disk += int(container['setting']['disk'])
for container in vcluster.containers:
cpu += int(container.setting_cpu)
memory += int(container.setting_mem)
disk += int(container.setting_disk)
else:
for container in info['containers']:
if container['containername'] == containername:
if 'setting' in container:
cpu += int(container['setting']['cpu'])
memory += int(container['setting']['memory'])
disk += int(container['setting']['disk'])
for container in vcluster.containers:
if container.containername == containername:
cpu += int(container.setting_cpu)
memory += int(container.setting_mem)
disk += int(container.setting_disk)
return [True, {'cpu':cpu, 'memory':memory, 'disk':disk}]
def update_cluster_baseurl(self, clustername, username, oldip, newip):
@ -558,8 +569,6 @@ class VclusterMgr(object):
if info['status'] == 'running':
return [False, "cluster is already running"]
# set proxy
if not "proxy_server_ip" in info.keys():
info['proxy_server_ip'] = self.addr
try:
target = 'http://'+info['containers'][0]['ip'].split('/')[0]+":10000"
if self.distributedgw == 'True':
@ -598,9 +607,10 @@ class VclusterMgr(object):
namesplit = container['containername'].split('-')
portname = namesplit[1] + '-' + namesplit[2]
worker.recover_usernet(portname, uid, info['proxy_server_ip'], container['host']==info['proxy_server_ip'])
info['status']='running'
info['start_time']=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.write_clusterinfo(info,clustername,username)
[status,vcluster] = self.get_vcluster(clustername,username)
vcluster.status ='running'
vcluster.start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
db.session.commit()
return [True, "start cluster"]
def mount_cluster(self, clustername, username):
@ -618,17 +628,6 @@ class VclusterMgr(object):
[status, info] = self.get_clusterinfo(clustername, username)
if not status:
return [False, "cluster not found"]
if not "proxy_server_ip" in info.keys():
info['proxy_server_ip'] = self.addr
self.write_clusterinfo(info,clustername,username)
[status, info] = self.get_clusterinfo(clustername, username)
if not "proxy_public_ip" in info.keys():
self.update_proxy_ipAndurl(clustername,username,info['proxy_server_ip'])
[status, info] = self.get_clusterinfo(clustername, username)
self.update_cluster_baseurl(clustername,username,info['proxy_server_ip'],info['proxy_public_ip'])
if not 'port_mapping' in info.keys():
info['port_mapping'] = []
self.write_clusterinfo(info,clustername,username)
if info['status'] == 'stopped':
return [True, "cluster no need to start"]
# recover proxy of cluster
@ -690,12 +689,10 @@ class VclusterMgr(object):
if worker is None:
return [False, "The worker can't be found or has been stopped."]
worker.stop_container(container['containername'])
[status, info] = self.get_clusterinfo(clustername, username)
info['status']='stopped'
info['start_time']="------"
infofile = open(self.fspath+"/global/users/"+username+"/clusters/"+clustername, 'w')
infofile.write(json.dumps(info))
infofile.close()
[status, vcluster] = self.get_vcluster(clustername, username)
vcluster.status = 'stopped'
vcluster.start_time ="------"
db.session.commit()
return [True, "stop cluster"]
def detach_cluster(self, clustername, username):
@ -712,10 +709,9 @@ class VclusterMgr(object):
return [True, "detach cluster"]
def list_clusters(self, user):
if not os.path.exists(self.fspath+"/global/users/"+user+"/clusters"):
return [True, []]
clusters = os.listdir(self.fspath+"/global/users/"+user+"/clusters")
full_clusters = []
clusters = VCluster.query.filter_by(ownername = user).all()
clusters = [clu.clustername for clu in clusters]
'''full_clusters = []
for cluster in clusters:
single_cluster = {}
single_cluster['name'] = cluster
@ -724,7 +720,7 @@ class VclusterMgr(object):
single_cluster['status'] = 'running'
else:
single_cluster['status'] = 'stopping'
full_clusters.append(single_cluster)
full_clusters.append(single_cluster)'''
return [True, clusters]
def is_cluster(self, clustername, username):
@ -745,36 +741,33 @@ class VclusterMgr(object):
return -1
def update_proxy_ipAndurl(self, clustername, username, proxy_server_ip):
[status, info] = self.get_clusterinfo(clustername, username)
[status, vcluster] = self.get_vcluster(clustername, username)
if not status:
return [False, "cluster not found"]
info['proxy_server_ip'] = proxy_server_ip
vcluster.proxy_server_ip = proxy_server_ip
[status, proxy_public_ip] = self.etcd.getkey("machines/publicIP/"+proxy_server_ip)
if not status:
logger.error("Fail to get proxy_public_ip %s."%(proxy_server_ip))
proxy_public_ip = proxy_server_ip
info['proxy_public_ip'] = proxy_public_ip
proxy_url = env.getenv("PORTAL_URL") +"/"+ proxy_public_ip +"/_web/" + username + "/" + clustername
info['proxy_url'] = proxy_url
self.write_clusterinfo(info,clustername,username)
vcluster.proxy_public_ip = proxy_public_ip
#proxy_url = env.getenv("PORTAL_URL") +"/"+ proxy_public_ip +"/_web/" + username + "/" + clustername
#info['proxy_url'] = proxy_url
db.session.commit()
return proxy_public_ip
def get_clusterinfo(self, clustername, username):
clusterpath = self.fspath + "/global/users/" + username + "/clusters/" + clustername
if not os.path.isfile(clusterpath):
[success,vcluster] = self.get_vcluster(clustername,username)
if vcluster is None:
return [False, "cluster not found"]
infofile = open(clusterpath, 'r')
info = json.loads(infofile.read())
return [True, info]
vcluster = json.loads(str(vcluster))
return [True, vcluster]
def write_clusterinfo(self, info, clustername, username):
clusterpath = self.fspath + "/global/users/" + username + "/clusters/" + clustername
if not os.path.isfile(clusterpath):
return [False, "cluster not found"]
infofile = open(clusterpath, 'w')
infofile.write(json.dumps(info))
infofile.close()
return [True, info]
def get_vcluster(self, clustername, username):
vcluster = VCluster.query.filter_by(ownername=username,clustername=clustername).first()
if vcluster is None:
return [False, None]
else:
return [True, vcluster]
# acquire cluster id from etcd
def _acquire_id(self):