commit
98df83c1b2
|
@ -198,16 +198,16 @@ do_stop_user () {
|
|||
|
||||
case "$1" in
|
||||
init)
|
||||
do_start_master "new"
|
||||
do_start_user
|
||||
do_start_proxy
|
||||
do_start_web
|
||||
do_start_user
|
||||
do_start_master "new"
|
||||
;;
|
||||
start)
|
||||
do_start_master "recovery"
|
||||
do_start_user
|
||||
do_start_proxy
|
||||
do_start_web
|
||||
do_start_user
|
||||
do_start_master "recovery"
|
||||
;;
|
||||
|
||||
stop)
|
||||
|
@ -218,14 +218,14 @@ case "$1" in
|
|||
;;
|
||||
|
||||
restart)
|
||||
do_stop_user
|
||||
do_stop_web
|
||||
do_stop_proxy
|
||||
do_stop_master
|
||||
do_stop_user
|
||||
do_start_master "recovery"
|
||||
do_start_user
|
||||
do_start_proxy
|
||||
do_start_web
|
||||
do_start_user
|
||||
do_start_master "recovery"
|
||||
;;
|
||||
|
||||
start_proxy)
|
||||
|
@ -249,10 +249,10 @@ case "$1" in
|
|||
do_stop_proxy
|
||||
do_stop_master
|
||||
do_stop_user
|
||||
do_start_master "new"
|
||||
do_start_user
|
||||
do_start_proxy
|
||||
do_start_web
|
||||
do_start_user
|
||||
do_start_master "new"
|
||||
;;
|
||||
|
||||
status)
|
||||
|
|
|
@ -25,8 +25,8 @@ lxc.network.type = veth
|
|||
lxc.network.name = eth0
|
||||
# veth.pair is limited in 16 bytes
|
||||
lxc.network.veth.pair = %VETHPAIR%
|
||||
lxc.network.script.up = Bridge=docklet-br VLANID=%VLANID% %LXCSCRIPT%/lxc-ifup
|
||||
lxc.network.script.down = Bridge=docklet-br %LXCSCRIPT%/lxc-ifdown
|
||||
lxc.network.script.up = Bridge=docklet-br-%UserID% %LXCSCRIPT%/lxc-ifup
|
||||
lxc.network.script.down = Bridge=docklet-br-%UserID% %LXCSCRIPT%/lxc-ifdown
|
||||
lxc.network.ipv4 = %IP%
|
||||
lxc.network.ipv4.gateway = %GATEWAY%
|
||||
lxc.network.flags = up
|
||||
|
|
|
@ -7,3 +7,7 @@
|
|||
# $5 : value of lxc.network.veth.pair
|
||||
|
||||
ovs-vsctl --if-exists del-port $Bridge $5
|
||||
cnt=$(ovs-vsctl list-ports ${Bridge} | wc -l)
|
||||
if [ "$cnt" = "1" ]; then
|
||||
ovs-vsctl del-br $Bridge
|
||||
fi
|
||||
|
|
|
@ -7,4 +7,5 @@
|
|||
# $4 : network type, for example, veth
|
||||
# $5 : value of lxc.network.veth.pair
|
||||
|
||||
ovs-vsctl --may-exist add-port $Bridge $5 tag=$VLANID
|
||||
ovs-vsctl --may-exist add-br $Bridge
|
||||
ovs-vsctl --may-exist add-port $Bridge $5
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
import subprocess, os, json
|
||||
import imagemgr
|
||||
import network
|
||||
from log import logger
|
||||
import env
|
||||
from lvmtool import sys_run, check_volume
|
||||
|
@ -23,7 +24,7 @@ class Container(object):
|
|||
self.imgmgr = imagemgr.ImageMgr()
|
||||
self.historymgr = History_Manager()
|
||||
|
||||
def create_container(self, lxc_name, proxy_server_ip, username, setting, clustername, clusterid, containerid, hostname, ip, gateway, vlanid, image):
|
||||
def create_container(self, lxc_name, proxy_server_ip, username, uid, setting, clustername, clusterid, containerid, hostname, ip, gateway, image):
|
||||
logger.info("create container %s of %s for %s" %(lxc_name, clustername, username))
|
||||
try:
|
||||
setting = json.loads(setting)
|
||||
|
@ -37,7 +38,7 @@ class Container(object):
|
|||
|
||||
#Ret = subprocess.run([self.libpath+"/lxc_control.sh",
|
||||
# "create", lxc_name, username, str(clusterid), hostname,
|
||||
# ip, gateway, str(vlanid), str(cpu), str(memory)], stdout=subprocess.PIPE,
|
||||
# ip, gateway, str(cpu), str(memory)], stdout=subprocess.PIPE,
|
||||
# stderr=subprocess.STDOUT,shell=False, check=True)
|
||||
|
||||
rootfs = "/var/lib/lxc/%s/rootfs" % lxc_name
|
||||
|
@ -61,7 +62,7 @@ class Container(object):
|
|||
content = content.replace("%CLUSTERID%",str(clusterid))
|
||||
content = content.replace("%LXCSCRIPT%",env.getenv("LXC_SCRIPT"))
|
||||
content = content.replace("%LXCNAME%",lxc_name)
|
||||
content = content.replace("%VLANID%",str(vlanid))
|
||||
content = content.replace("%UserID%",str(uid))
|
||||
content = content.replace("%CLUSTERNAME%", clustername)
|
||||
content = content.replace("%VETHPAIR%", str(clusterid)+'-'+str(containerid))
|
||||
return content
|
||||
|
|
|
@ -217,8 +217,10 @@ def start_cluster(user, beans, form):
|
|||
clustername = form.get('clustername', None)
|
||||
if (clustername == None):
|
||||
return json.dumps({'success':'false', 'message':'clustername is null'})
|
||||
user_info = post_to_user("/user/selfQuery/", {'token':form.get("token")})
|
||||
uid = user_info['data']['id']
|
||||
logger.info ("handle request : start cluster %s" % clustername)
|
||||
[status, result] = G_vclustermgr.start_cluster(clustername, user)
|
||||
[status, result] = G_vclustermgr.start_cluster(clustername, user, uid)
|
||||
if status:
|
||||
return json.dumps({'success':'true', 'action':'start cluster', 'message':result})
|
||||
else:
|
||||
|
|
|
@ -153,7 +153,7 @@ class ovscontrol(object):
|
|||
@staticmethod
|
||||
def add_bridge(bridge):
|
||||
try:
|
||||
subprocess.run(['ovs-vsctl', 'add-br', str(bridge)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
|
||||
subprocess.run(['ovs-vsctl', '--may-exist', 'add-br', str(bridge)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
|
||||
return [True, str(bridge)]
|
||||
except subprocess.CalledProcessError as suberror:
|
||||
return [False, "add bridge failed : %s" % suberror.stdout.decode('utf-8')]
|
||||
|
@ -182,6 +182,14 @@ class ovscontrol(object):
|
|||
except subprocess.CalledProcessError as suberror:
|
||||
return [False, "delete port failed : %s" % suberror.stdout.decode('utf-8')]
|
||||
|
||||
@staticmethod
|
||||
def add_port(bridge, port):
|
||||
try:
|
||||
subprocess.run(['ovs-vsctl', '--may-exist', 'add-port', str(bridge), str(port)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
|
||||
return [True, str(port)]
|
||||
except subprocess.CalledProcessError as suberror:
|
||||
return [False, "add port failed : %s" % suberror.stdout.decode('utf-8')]
|
||||
|
||||
@staticmethod
|
||||
def add_port_internal(bridge, port):
|
||||
try:
|
||||
|
@ -206,6 +214,14 @@ class ovscontrol(object):
|
|||
except subprocess.CalledProcessError as suberror:
|
||||
return [False, "add port failed : %s" % suberror.stdout.decode('utf-8')]
|
||||
|
||||
@staticmethod
|
||||
def add_port_gre_withkey(bridge, port, remote, key):
|
||||
try:
|
||||
subprocess.run(['ovs-vsctl', '--may-exist', 'add-port', str(bridge), str(port), '--', 'set', 'interface', str(port), 'type=gre', 'options:remote_ip='+str(remote), 'options:key='+str(key)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
|
||||
return [True, str(port)]
|
||||
except subprocess.CalledProcessError as suberror:
|
||||
return [False, "add port failed : %s" % suberror.stdout.decode('utf-8')]
|
||||
|
||||
@staticmethod
|
||||
def set_port_tag(port, tag):
|
||||
try:
|
||||
|
@ -242,8 +258,8 @@ class netcontrol(object):
|
|||
return ovscontrol.port_exists(gwport)
|
||||
|
||||
@staticmethod
|
||||
def setup_gw(bridge, gwport, addr, tag):
|
||||
[status, result] = ovscontrol.add_port_internal_withtag(bridge, gwport, tag)
|
||||
def setup_gw(bridge, gwport, addr):
|
||||
[status, result] = ovscontrol.add_port_internal(bridge, gwport)
|
||||
if not status:
|
||||
return [status, result]
|
||||
[status, result] = ipcontrol.add_addr(gwport, addr)
|
||||
|
@ -256,9 +272,10 @@ class netcontrol(object):
|
|||
return ovscontrol.del_port(bridge, gwport)
|
||||
|
||||
@staticmethod
|
||||
def check_gw(bridge, gwport, addr, tag):
|
||||
def check_gw(bridge, gwport, uid, addr):
|
||||
ovscontrol.add_bridge(bridge)
|
||||
if not netcontrol.gw_exists(bridge, gwport):
|
||||
return netcontrol.setup_gw(bridge, gwport, addr, tag)
|
||||
return netcontrol.setup_gw(bridge, gwport, addr)
|
||||
[status, info] = ipcontrol.link_info(gwport)
|
||||
if not status:
|
||||
return [False, "get gateway info failed"]
|
||||
|
@ -268,9 +285,18 @@ class netcontrol(object):
|
|||
info['inet'].remove(addr)
|
||||
for otheraddr in info['inet']:
|
||||
ipcontrol.del_addr(gwport, otheraddr)
|
||||
ovscontrol.set_port_tag(gwport, tag)
|
||||
if info['state'] == 'DOWN':
|
||||
ipcontrol.up_link(gwport)
|
||||
return [True, "check gateway port %s" % gwport]
|
||||
|
||||
|
||||
@staticmethod
|
||||
def recover_usernet(portname, uid, GatewayHost, isGatewayHost):
|
||||
ovscontrol.add_bridge("docklet-br-"+str(uid))
|
||||
[success, ports] = ovscontrol.list_ports("docklet-br-"+str(uid))
|
||||
if success:
|
||||
for port in ports:
|
||||
if port.startswith("gre"):
|
||||
ovscontrol.del_port("docklet-br-"+str(uid),port)
|
||||
if not isGatewayHost:
|
||||
ovscontrol.add_port_gre_withkey("docklet-br-"+str(uid), "gre-"+str(uid)+"-"+GatewayHost, GatewayHost, str(uid))
|
||||
ovscontrol.add_port("docklet-br-"+str(uid), portname)
|
||||
|
|
101
src/network.py
101
src/network.py
|
@ -1,7 +1,7 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import json, sys, netifaces
|
||||
from nettools import netcontrol
|
||||
from nettools import netcontrol,ovscontrol
|
||||
|
||||
from log import logger
|
||||
|
||||
|
@ -239,16 +239,16 @@ class EnumPool(object):
|
|||
|
||||
# wrap EnumPool with vlanid and gateway
|
||||
class UserPool(EnumPool):
|
||||
def __init__(self, addr_cidr=None, vlanid=None, copy=None):
|
||||
if addr_cidr and vlanid:
|
||||
def __init__(self, addr_cidr=None, copy=None):
|
||||
if addr_cidr:
|
||||
EnumPool.__init__(self, addr_cidr = addr_cidr)
|
||||
self.vlanid=vlanid
|
||||
#self.vlanid=vlanid
|
||||
self.pool.sort(key=ip_to_int)
|
||||
self.gateway = self.pool[0]
|
||||
self.pool.remove(self.gateway)
|
||||
elif copy:
|
||||
EnumPool.__init__(self, copy = copy)
|
||||
self.vlanid = int(copy['vlanid'])
|
||||
#self.vlanid = int(copy['vlanid'])
|
||||
self.gateway = copy['gateway']
|
||||
else:
|
||||
logger.error("UserPool init failed with no addr_cidr or copy")
|
||||
|
@ -268,7 +268,7 @@ class UserPool(EnumPool):
|
|||
return False
|
||||
|
||||
def printpool(self):
|
||||
print("users ID:"+str(self.vlanid)+", net info:"+self.info+", gateway:"+self.gateway)
|
||||
print("net info:"+self.info+", gateway:"+self.gateway)
|
||||
print (str(self.pool))
|
||||
|
||||
# NetworkMgr : mange docklet network ip address
|
||||
|
@ -293,9 +293,9 @@ class NetworkMgr(object):
|
|||
self.system = EnumPool(sysaddr+"/"+str(syscidr))
|
||||
self.usrgws = {}
|
||||
self.users = {}
|
||||
self.vlanids = {}
|
||||
self.init_vlanids(4095, 60)
|
||||
self.init_shared_vlanids()
|
||||
#self.vlanids = {}
|
||||
#self.init_vlanids(4095, 60)
|
||||
#self.init_shared_vlanids()
|
||||
self.dump_center()
|
||||
self.dump_system()
|
||||
elif mode == 'recovery':
|
||||
|
@ -304,15 +304,15 @@ class NetworkMgr(object):
|
|||
self.system = None
|
||||
self.usrgws = {}
|
||||
self.users = {}
|
||||
self.vlanids = {}
|
||||
#self.vlanids = {}
|
||||
self.load_center()
|
||||
self.load_system()
|
||||
self.load_vlanids()
|
||||
self.load_shared_vlanids()
|
||||
#self.load_vlanids()
|
||||
#self.load_shared_vlanids()
|
||||
else:
|
||||
logger.error("mode: %s not supported" % mode)
|
||||
|
||||
def init_vlanids(self, total, block):
|
||||
'''def init_vlanids(self, total, block):
|
||||
self.vlanids['block'] = block
|
||||
self.etcd.setkey("network/vlanids/info", str(total)+"/"+str(block))
|
||||
for i in range(1, int((total-1)/block)):
|
||||
|
@ -320,11 +320,11 @@ class NetworkMgr(object):
|
|||
self.vlanids['currentpool'] = list(range(1+block*i, total+1))
|
||||
self.vlanids['currentindex'] = i+1
|
||||
self.etcd.setkey("network/vlanids/"+str(i+1), json.dumps(self.vlanids['currentpool']))
|
||||
self.etcd.setkey("network/vlanids/current", str(i+1))
|
||||
self.etcd.setkey("network/vlanids/current", str(i+1))'''
|
||||
|
||||
# Data Structure:
|
||||
# shared_vlanids = [{vlanid = ..., sharenum = ...}, {vlanid = ..., sharenum = ...}, ...]
|
||||
def init_shared_vlanids(self, vlannum = 128, sharenum = 128):
|
||||
'''def init_shared_vlanids(self, vlannum = 128, sharenum = 128):
|
||||
self.shared_vlanids = []
|
||||
for i in range(vlannum):
|
||||
shared_vlanid = {}
|
||||
|
@ -364,7 +364,7 @@ class NetworkMgr(object):
|
|||
self.shared_vlanids = json.loads(shared_vlanids)
|
||||
|
||||
def dump_shared_vlanids(self):
|
||||
self.etcd.setkey("network/shared_vlanids", json.dumps(self.shared_vlanids))
|
||||
self.etcd.setkey("network/shared_vlanids", json.dumps(self.shared_vlanids))'''
|
||||
|
||||
def load_center(self):
|
||||
[status, centerdata] = self.etcd.getkey("network/center")
|
||||
|
@ -389,7 +389,7 @@ class NetworkMgr(object):
|
|||
self.users[username] = user
|
||||
|
||||
def dump_user(self, username):
|
||||
self.etcd.setkey("network/users/"+username, json.dumps({'info':self.users[username].info, 'vlanid':self.users[username].vlanid, 'gateway':self.users[username].gateway, 'pool':self.users[username].pool}))
|
||||
self.etcd.setkey("network/users/"+username, json.dumps({'info':self.users[username].info, 'gateway':self.users[username].gateway, 'pool':self.users[username].pool}))
|
||||
|
||||
def load_usrgw(self,username):
|
||||
[status, data] = self.etcd.getkey("network/usrgws/"+username)
|
||||
|
@ -406,10 +406,10 @@ class NetworkMgr(object):
|
|||
self.system.printpool()
|
||||
print ("<users>")
|
||||
print (" users in users is in etcd, not in memory")
|
||||
print ("<vlanids>")
|
||||
print (str(self.vlanids['currentindex'])+":"+str(self.vlanids['currentpool']))
|
||||
#print ("<vlanids>")
|
||||
#print (str(self.vlanids['currentindex'])+":"+str(self.vlanids['currentpool']))
|
||||
|
||||
def acquire_vlanid(self, isshared = False):
|
||||
'''def acquire_vlanid(self, isshared = False):
|
||||
if isshared:
|
||||
# only share vlanid of the front entry
|
||||
# if sharenum is reduced to 0, move the front entry to the back
|
||||
|
@ -444,13 +444,13 @@ class NetworkMgr(object):
|
|||
else:
|
||||
self.vlanids['currentpool'].append(vlanid)
|
||||
self.dump_vlanids()
|
||||
return [True, "Release VLAN ID success"]
|
||||
return [True, "Release VLAN ID success"]'''
|
||||
|
||||
def has_usrgw(self, username):
|
||||
self.load_usrgw(username)
|
||||
return username in self.usrgws.keys()
|
||||
|
||||
def setup_usrgw(self, username, nodemgr, workerip=None):
|
||||
def setup_usrgw(self, username, uid, nodemgr, workerip=None):
|
||||
if not self.has_user(username):
|
||||
return [False,"user doesn't exist."]
|
||||
self.load_usrgw(username)
|
||||
|
@ -461,15 +461,15 @@ class NetworkMgr(object):
|
|||
if(workerip is not None):
|
||||
ip = workerip
|
||||
worker = nodemgr.ip_to_rpc(workerip)
|
||||
logger.info("setup gateway for %s with %s and vlan=%s on %s" % (username, usrpools.get_gateway_cidr(), str(usrpools.vlanid), ip))
|
||||
logger.info("setup gateway for %s with %s on %s" % (username, usrpools.get_gateway_cidr(), ip))
|
||||
self.usrgws[username] = ip
|
||||
self.dump_usrgw(username)
|
||||
worker.setup_gw('docklet-br', username, usrpools.get_gateway_cidr(), str(usrpools.vlanid))
|
||||
worker.setup_gw('docklet-br-'+str(uid), username, usrpools.get_gateway_cidr())
|
||||
else:
|
||||
logger.info("setup gateway for %s with %s and vlan=%s on master" % (username, usrpools.get_gateway_cidr(), str(usrpools.vlanid)))
|
||||
logger.info("setup gateway for %s with %s on master" % (username, usrpools.get_gateway_cidr() ))
|
||||
self.usrgws[username] = self.masterip
|
||||
self.dump_usrgw(username)
|
||||
netcontrol.setup_gw('docklet-br', username, usrpools.get_gateway_cidr(), str(usrpools.vlanid))
|
||||
netcontrol.setup_gw('docklet-br-'+str(uid), username, usrpools.get_gateway_cidr())
|
||||
self.dump_user(username)
|
||||
del self.users[username]
|
||||
return [True, "set up gateway success"]
|
||||
|
@ -482,34 +482,37 @@ class NetworkMgr(object):
|
|||
self.dump_center()
|
||||
if status == False:
|
||||
return [False, result]
|
||||
[status, vlanid] = self.acquire_vlanid(isshared)
|
||||
'''[status, vlanid] = self.acquire_vlanid(isshared)
|
||||
if status:
|
||||
vlanid = int(vlanid)
|
||||
else:
|
||||
self.center.free(result, cidr)
|
||||
self.dump_center()
|
||||
return [False, vlanid]
|
||||
self.users[username] = UserPool(addr_cidr = result+"/"+str(cidr), vlanid=vlanid)
|
||||
return [False, vlanid]'''
|
||||
self.users[username] = UserPool(addr_cidr = result+"/"+str(cidr))
|
||||
#logger.info("setup gateway for %s with %s and vlan=%s" % (username, self.users[username].get_gateway_cidr(), str(vlanid)))
|
||||
#netcontrol.setup_gw('docklet-br', username, self.users[username].get_gateway_cidr(), str(vlanid))
|
||||
self.dump_user(username)
|
||||
del self.users[username]
|
||||
return [True, 'add user success']
|
||||
|
||||
def del_usrgw(self, username, nodemgr):
|
||||
def del_usrgwbr(self, username, uid, nodemgr):
|
||||
if username not in self.usrgws.keys():
|
||||
return [False, "user does't have gateway or user doesn't exist."]
|
||||
ip = self.usrgws[username]
|
||||
logger.info("Delete user %s(%s) gateway on %s" %(username, str(uid), ip))
|
||||
if ip == self.masterip:
|
||||
netcontrol.del_gw('docklet-br', username)
|
||||
netcontrol.del_gw('docklet-br-'+str(uid), username)
|
||||
netcontrol.del_bridge('docklet-br-'+str(uid))
|
||||
else:
|
||||
worker = nodemgr.ip_to_rpc(ip)
|
||||
worker.del_gw('docklet-br', username)
|
||||
worker.del_gw('docklet-br-'+str(uid), username)
|
||||
worker.del_bridge('docklet-br-'+str(uid))
|
||||
del self.usrgws[username]
|
||||
self.etcd.delkey("network/usrgws/"+username)
|
||||
return [True, 'delete user\' gateway success']
|
||||
|
||||
def del_user(self, username, isshared = False):
|
||||
def del_user(self, username):
|
||||
if not self.has_user(username):
|
||||
return [False, username+" not in users set"]
|
||||
self.load_user(username)
|
||||
|
@ -517,14 +520,15 @@ class NetworkMgr(object):
|
|||
logger.info ("delete user %s with cidr=%s" % (username, int(cidr)))
|
||||
self.center.free(addr, int(cidr))
|
||||
self.dump_center()
|
||||
if not isshared:
|
||||
self.release_vlanid(self.users[username].vlanid)
|
||||
#if not isshared:
|
||||
#self.release_vlanid(self.users[username].vlanid)
|
||||
#netcontrol.del_gw('docklet-br', username)
|
||||
self.etcd.deldir("network/users/"+username)
|
||||
del self.users[username]
|
||||
return [True, 'delete user success']
|
||||
|
||||
def check_usergw(self, username, nodemgr, distributedgw=False):
|
||||
def check_usergw(self, username, uid, nodemgr, distributedgw=False):
|
||||
logger.info("Check %s(%s) user gateway."%(username, str(uid)))
|
||||
if not self.has_user(username):
|
||||
return [False,"user doesn't exist."]
|
||||
self.load_usrgw(username)
|
||||
|
@ -535,16 +539,31 @@ class NetworkMgr(object):
|
|||
self.load_user(username)
|
||||
if not distributedgw:
|
||||
if not ip == self.masterip:
|
||||
self.del_usrgw(username,nodemgr)
|
||||
self.del_usrgwbr(username,uid,nodemgr)
|
||||
self.usrgws[username] = self.masterip
|
||||
self.dump_usrgw(username)
|
||||
netcontrol.check_gw('docklet-br', username, self.users[username].get_gateway_cidr(), str(self.users[username].vlanid))
|
||||
netcontrol.check_gw('docklet-br-'+str(uid), username, uid, self.users[username].get_gateway_cidr())
|
||||
else:
|
||||
worker = nodemgr.ip_to_rpc(ip)
|
||||
worker.check_gw('docklet-br', username, self.users[username].get_gateway_cidr(), str(self.users[username].vlanid))
|
||||
worker.check_gw('docklet-br-'+str(uid), username, uid, self.users[username].get_gateway_cidr())
|
||||
del self.users[username]
|
||||
return [True, 'check gw ok']
|
||||
|
||||
def check_usergre(self, username, uid, remote, nodemgr, distributedgw=False):
|
||||
logger.info("Check %s(%s) gre from gateway host to %s." % (username, str(uid), remote))
|
||||
self.load_usrgw(username)
|
||||
if username not in self.usrgws.keys():
|
||||
return [False, 'user does not exist.']
|
||||
ip = self.usrgws[username]
|
||||
if not distributedgw:
|
||||
if not remote == self.masterip:
|
||||
ovscontrol.add_port_gre_withkey('docklet-br-'+str(uid), 'gre-'+str(uid)+'-'+remote, remote, uid)
|
||||
else:
|
||||
if not remote == ip:
|
||||
worker = nodemgr.ip_to_rpc(ip)
|
||||
worker.add_port_gre_withkey('docklet-br-'+str(uid), 'gre-'+str(uid)+'-'+remote, remote, uid)
|
||||
return [True, 'check gre ok']
|
||||
|
||||
def has_user(self, username):
|
||||
[status, _value] = self.etcd.getkey("network/users/"+username)
|
||||
return status
|
||||
|
@ -598,14 +617,14 @@ class NetworkMgr(object):
|
|||
del self.users[username]
|
||||
return result
|
||||
|
||||
def get_uservlanid(self, username):
|
||||
'''def get_uservlanid(self, username):
|
||||
if not self.has_user(username):
|
||||
return [False, 'username not exists in users set']
|
||||
self.load_user(username)
|
||||
result = self.users[username].vlanid
|
||||
self.dump_user(username)
|
||||
del self.users[username]
|
||||
return result
|
||||
return result'''
|
||||
|
||||
def acquire_sysips(self, num=1):
|
||||
logger.info ("acquire system ips")
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
import threading, random, time, xmlrpc.client, sys
|
||||
#import network
|
||||
from nettools import netcontrol
|
||||
from nettools import netcontrol,ovscontrol
|
||||
from log import logger
|
||||
import env
|
||||
|
||||
|
@ -24,27 +24,24 @@ class NodeMgr(object):
|
|||
self.mode = mode
|
||||
self.workerport = env.getenv('WORKER_PORT')
|
||||
|
||||
# initialize the network
|
||||
logger.info ("initialize network")
|
||||
# delete the existing network
|
||||
logger.info ("delete the existing network")
|
||||
[success, bridges] = ovscontrol.list_bridges()
|
||||
if success:
|
||||
for bridge in bridges:
|
||||
if bridge.startswith("docklet-br"):
|
||||
ovscontrol.del_bridge(bridge)
|
||||
else:
|
||||
logger.error(bridges)
|
||||
|
||||
# 'docklet-br' not need ip address. Because every user has gateway
|
||||
#[status, result] = self.networkmgr.acquire_sysips_cidr()
|
||||
#self.networkmgr.printpools()
|
||||
#if not status:
|
||||
# logger.info ("initialize network failed, no IP for system bridge")
|
||||
# sys.exit(1)
|
||||
#self.bridgeip = result[0]
|
||||
#logger.info ("initialize bridge wih ip %s" % self.bridgeip)
|
||||
#network.netsetup("init", self.bridgeip)
|
||||
|
||||
if self.mode == 'new':
|
||||
'''if self.mode == 'new':
|
||||
if netcontrol.bridge_exists('docklet-br'):
|
||||
netcontrol.del_bridge('docklet-br')
|
||||
netcontrol.new_bridge('docklet-br')
|
||||
else:
|
||||
if not netcontrol.bridge_exists('docklet-br'):
|
||||
logger.error("docklet-br not found")
|
||||
sys.exit(1)
|
||||
sys.exit(1)'''
|
||||
|
||||
# get allnodes
|
||||
self.allnodes = self._nodelist_etcd("allnodes")
|
||||
|
@ -109,14 +106,14 @@ class NodeMgr(object):
|
|||
logger.info ("new node %s joins" % nodeip)
|
||||
etcd_runip.append(nodeip)
|
||||
# setup GRE tunnels for new nodes
|
||||
if self.addr == nodeip:
|
||||
'''if self.addr == nodeip:
|
||||
logger.debug ("worker start on master node. not need to setup GRE")
|
||||
else:
|
||||
logger.debug ("setup GRE for %s" % nodeip)
|
||||
if netcontrol.gre_exists('docklet-br', nodeip):
|
||||
logger.debug("GRE for %s already exists, reuse it" % nodeip)
|
||||
else:
|
||||
netcontrol.setup_gre('docklet-br', nodeip)
|
||||
netcontrol.setup_gre('docklet-br', nodeip)'''
|
||||
self.etcd.setkey("machines/runnodes/"+nodeip, "ok")
|
||||
if nodeip not in self.runnodes:
|
||||
self.runnodes.append(nodeip)
|
||||
|
|
|
@ -440,6 +440,7 @@ class userManager:
|
|||
"success": 'true',
|
||||
"data":{
|
||||
"username" : user.username,
|
||||
"id": user.id,
|
||||
"password" : user.password,
|
||||
"avatar" : user.avatar,
|
||||
"nickname" : user.nickname,
|
||||
|
|
|
@ -7,6 +7,11 @@ import xmlrpc.client
|
|||
from log import logger
|
||||
import env
|
||||
import proxytool
|
||||
import requests
|
||||
|
||||
userpoint = "http://" + env.getenv('USER_IP') + ":" + str(env.getenv('USER_PORT'))
|
||||
def post_to_user(url = '/', data={}):
|
||||
return requests.post(userpoint+url,data=data).json()
|
||||
|
||||
##################################################
|
||||
# VclusterMgr
|
||||
|
@ -48,10 +53,12 @@ class VclusterMgr(object):
|
|||
def recover_allclusters(self):
|
||||
logger.info("recovering all vclusters for all users...")
|
||||
usersdir = self.fspath+"/global/users/"
|
||||
auth_key = env.getenv('AUTH_KEY')
|
||||
for user in os.listdir(usersdir):
|
||||
for cluster in self.list_clusters(user)[1]:
|
||||
logger.info ("recovering cluster:%s for user:%s ..." % (cluster, user))
|
||||
self.recover_cluster(cluster, user)
|
||||
res = post_to_user('/user/uid/',{'username':user,'auth_key':auth_key})
|
||||
self.recover_cluster(cluster, user, res['uid'])
|
||||
logger.info("recovered all vclusters for all users")
|
||||
|
||||
def mount_allclusters(self):
|
||||
|
@ -89,6 +96,7 @@ class VclusterMgr(object):
|
|||
workers = self.nodemgr.get_nodeips()
|
||||
image_json = json.dumps(image)
|
||||
groupname = json.loads(user_info)["data"]["group"]
|
||||
uid = json.loads(user_info)["data"]["id"]
|
||||
if (len(workers) == 0):
|
||||
logger.warning ("no workers to start containers, start cluster failed")
|
||||
return [False, "no workers are running"]
|
||||
|
@ -96,7 +104,7 @@ class VclusterMgr(object):
|
|||
if not self.networkmgr.has_user(username):
|
||||
self.networkmgr.add_user(username, cidr=29, isshared = True if str(groupname) == "fundation" else False)
|
||||
if self.distributedgw == "False":
|
||||
[success,message] = self.networkmgr.setup_usrgw(username, self.nodemgr)
|
||||
[success,message] = self.networkmgr.setup_usrgw(username, uid, self.nodemgr)
|
||||
if not success:
|
||||
return [False, message]
|
||||
elif not self.networkmgr.has_usrgw(username):
|
||||
|
@ -104,7 +112,7 @@ class VclusterMgr(object):
|
|||
self.networkmgr.dump_usrgw(username)
|
||||
[status, result] = self.networkmgr.acquire_userips_cidr(username, clustersize)
|
||||
gateway = self.networkmgr.get_usergw(username)
|
||||
vlanid = self.networkmgr.get_uservlanid(username)
|
||||
#vlanid = self.networkmgr.get_uservlanid(username)
|
||||
logger.info ("create cluster with gateway : %s" % gateway)
|
||||
self.networkmgr.printpools()
|
||||
if not status:
|
||||
|
@ -121,7 +129,7 @@ class VclusterMgr(object):
|
|||
workerip = workers[random.randint(0, len(workers)-1)]
|
||||
oneworker = xmlrpc.client.ServerProxy("http://%s:%s" % (workerip, env.getenv("WORKER_PORT")))
|
||||
if self.distributedgw == "True" and i == 0 and not self.networkmgr.has_usrgw(username):
|
||||
[success,message] = self.networkmgr.setup_usrgw(username, self.nodemgr, workerip)
|
||||
[success,message] = self.networkmgr.setup_usrgw(username, uid, self.nodemgr, workerip)
|
||||
if not success:
|
||||
return [False, message]
|
||||
if i == 0:
|
||||
|
@ -130,7 +138,7 @@ class VclusterMgr(object):
|
|||
lxc_name = username + "-" + str(clusterid) + "-" + str(i)
|
||||
hostname = "host-"+str(i)
|
||||
logger.info ("create container with : name-%s, username-%s, clustername-%s, clusterid-%s, hostname-%s, ip-%s, gateway-%s, image-%s" % (lxc_name, username, clustername, str(clusterid), hostname, ips[i], gateway, image_json))
|
||||
[success,message] = oneworker.create_container(lxc_name, proxy_server_ip, username, json.dumps(setting) , clustername, str(clusterid), str(i), hostname, ips[i], gateway, str(vlanid), image_json)
|
||||
[success,message] = oneworker.create_container(lxc_name, proxy_server_ip, username, uid, json.dumps(setting) , clustername, str(clusterid), str(i), hostname, ips[i], gateway, image_json)
|
||||
if success is False:
|
||||
logger.info("container create failed, so vcluster create failed")
|
||||
return [False, message]
|
||||
|
@ -147,7 +155,7 @@ class VclusterMgr(object):
|
|||
clusterfile.close()
|
||||
return [True, info]
|
||||
|
||||
def scale_out_cluster(self,clustername,username,image,user_info, setting):
|
||||
def scale_out_cluster(self,clustername,username, image,user_info, setting):
|
||||
if not self.is_cluster(clustername,username):
|
||||
return [False, "cluster:%s not found" % clustername]
|
||||
workers = self.nodemgr.get_nodeips()
|
||||
|
@ -157,7 +165,7 @@ class VclusterMgr(object):
|
|||
image_json = json.dumps(image)
|
||||
[status, result] = self.networkmgr.acquire_userips_cidr(username)
|
||||
gateway = self.networkmgr.get_usergw(username)
|
||||
vlanid = self.networkmgr.get_uservlanid(username)
|
||||
#vlanid = self.networkmgr.get_uservlanid(username)
|
||||
self.networkmgr.printpools()
|
||||
if not status:
|
||||
return [False, result]
|
||||
|
@ -172,13 +180,18 @@ class VclusterMgr(object):
|
|||
lxc_name = username + "-" + str(clusterid) + "-" + str(cid)
|
||||
hostname = "host-" + str(cid)
|
||||
proxy_server_ip = clusterinfo['proxy_server_ip']
|
||||
[success, message] = oneworker.create_container(lxc_name, proxy_server_ip, username, json.dumps(setting), clustername, clusterid, str(cid), hostname, ip, gateway, str(vlanid), image_json)
|
||||
uid = json.loads(user_info)["data"]["id"]
|
||||
[success, message] = oneworker.create_container(lxc_name, proxy_server_ip, username, uid, json.dumps(setting), clustername, clusterid, str(cid), hostname, ip, gateway, image_json)
|
||||
if success is False:
|
||||
logger.info("create container failed, so scale out failed")
|
||||
return [False, message]
|
||||
if clusterinfo['status'] == "running":
|
||||
self.networkmgr.check_usergre(username, uid, workerip, self.nodemgr, self.distributedgw=='True')
|
||||
oneworker.start_container(lxc_name)
|
||||
oneworker.start_services(lxc_name, ["ssh"]) # TODO: need fix
|
||||
oneworker.start_services(lxc_name, ["ssh"]) # TODO: need fix
|
||||
namesplit = lxc_name.split('-')
|
||||
portname = namesplit[1] + '-' + namesplit[2]
|
||||
oneworker.recover_usernet(portname, uid, proxy_server_ip, workerip==proxy_server_ip)
|
||||
logger.info("scale out success")
|
||||
hostfile = open(hostpath, 'a')
|
||||
hostfile.write(ip.split("/")[0] + "\t" + hostname + "\t" + hostname + "." + clustername + "\n")
|
||||
|
@ -311,11 +324,12 @@ class VclusterMgr(object):
|
|||
os.remove(self.fspath+"/global/users/"+username+"/hosts/"+str(info['clusterid'])+".hosts")
|
||||
|
||||
groupname = json.loads(user_info)["data"]["group"]
|
||||
uid = json.loads(user_info)["data"]["id"]
|
||||
[status, clusters] = self.list_clusters(username)
|
||||
if len(clusters) == 0:
|
||||
self.networkmgr.del_user(username, isshared = True if str(groupname) == "fundation" else False)
|
||||
self.networkmgr.del_usrgw(username, self.nodemgr)
|
||||
logger.info("vlanid release triggered")
|
||||
self.networkmgr.del_user(username)
|
||||
self.networkmgr.del_usrgwbr(username, uid, self.nodemgr)
|
||||
#logger.info("vlanid release triggered")
|
||||
|
||||
return [True, "cluster delete"]
|
||||
|
||||
|
@ -386,7 +400,7 @@ class VclusterMgr(object):
|
|||
return [True, {'cpu':cpu, 'memory':memory, 'disk':disk}]
|
||||
|
||||
|
||||
def start_cluster(self, clustername, username):
|
||||
def start_cluster(self, clustername, username, uid):
|
||||
[status, info] = self.get_clusterinfo(clustername, username)
|
||||
if not status:
|
||||
return [False, "cluster not found"]
|
||||
|
@ -395,7 +409,7 @@ class VclusterMgr(object):
|
|||
# check gateway for user
|
||||
# after reboot, user gateway goes down and lose its configuration
|
||||
# so, check is necessary
|
||||
self.networkmgr.check_usergw(username, self.nodemgr,self.distributedgw=='True')
|
||||
self.networkmgr.check_usergw(username, uid, self.nodemgr,self.distributedgw=='True')
|
||||
# set proxy
|
||||
if not "proxy_server_ip" in info.keys():
|
||||
info['proxy_server_ip'] = self.addr
|
||||
|
@ -421,11 +435,16 @@ class VclusterMgr(object):
|
|||
except:
|
||||
return [False, "start cluster failed with setting proxy failed"]
|
||||
for container in info['containers']:
|
||||
# set up gre from user's gateway host to container's host.
|
||||
self.networkmgr.check_usergre(username, uid, container['host'], self.nodemgr, self.distributedgw=='True')
|
||||
worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT")))
|
||||
if worker is None:
|
||||
return [False, "The worker can't be found or has been stopped."]
|
||||
worker.start_container(container['containername'])
|
||||
worker.start_services(container['containername'])
|
||||
namesplit = container['containername'].split('-')
|
||||
portname = namesplit[1] + '-' + namesplit[2]
|
||||
worker.recover_usernet(portname, uid, info['proxy_server_ip'], container['host']==info['proxy_server_ip'])
|
||||
info['status']='running'
|
||||
info['start_time']=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
self.write_clusterinfo(info,clustername,username)
|
||||
|
@ -442,12 +461,12 @@ class VclusterMgr(object):
|
|||
worker.mount_container(container['containername'])
|
||||
return [True, "mount cluster"]
|
||||
|
||||
def recover_cluster(self, clustername, username):
|
||||
def recover_cluster(self, clustername, username, uid):
|
||||
[status, info] = self.get_clusterinfo(clustername, username)
|
||||
if not status:
|
||||
return [False, "cluster not found"]
|
||||
# need to check and recover gateway of this user
|
||||
self.networkmgr.check_usergw(username, self.nodemgr,self.distributedgw=='True')
|
||||
self.networkmgr.check_usergw(username, uid, self.nodemgr,self.distributedgw=='True')
|
||||
# recover proxy of cluster
|
||||
if not "proxy_server_ip" in info.keys():
|
||||
info['proxy_server_ip'] = self.addr
|
||||
|
@ -477,10 +496,15 @@ class VclusterMgr(object):
|
|||
return [False, "start cluster failed with setting proxy failed"]
|
||||
# recover containers of this cluster
|
||||
for container in info['containers']:
|
||||
# set up gre from user's gateway host to container's host.
|
||||
self.networkmgr.check_usergre(username, uid, container['host'], self.nodemgr, self.distributedgw=='True')
|
||||
worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT")))
|
||||
if worker is None:
|
||||
return [False, "The worker can't be found or has been stopped."]
|
||||
worker.recover_container(container['containername'])
|
||||
namesplit = container['containername'].split('-')
|
||||
portname = namesplit[1] + '-' + namesplit[2]
|
||||
worker.recover_usernet(portname, uid, info['proxy_server_ip'], container['host']==info['proxy_server_ip'])
|
||||
return [True, "start cluster"]
|
||||
|
||||
# maybe here should use cluster id
|
||||
|
|
|
@ -14,7 +14,7 @@ import xmlrpc.server, sys, time
|
|||
from socketserver import ThreadingMixIn
|
||||
import threading
|
||||
import etcdlib, network, container
|
||||
from nettools import netcontrol
|
||||
from nettools import netcontrol,ovscontrol
|
||||
import monitor, proxytool
|
||||
from lvmtool import new_group, recover_group
|
||||
|
||||
|
@ -127,7 +127,10 @@ class Worker(object):
|
|||
self.rpcserver.register_function(monitor.workerFetchInfo)
|
||||
self.rpcserver.register_function(netcontrol.setup_gw)
|
||||
self.rpcserver.register_function(netcontrol.del_gw)
|
||||
self.rpcserver.register_function(netcontrol.del_bridge)
|
||||
self.rpcserver.register_function(ovscontrol.add_port_gre_withkey)
|
||||
self.rpcserver.register_function(netcontrol.check_gw)
|
||||
self.rpcserver.register_function(netcontrol.recover_usernet)
|
||||
self.rpcserver.register_function(proxytool.set_route)
|
||||
self.rpcserver.register_function(proxytool.delete_route)
|
||||
# register functions or instances to server for rpc
|
||||
|
@ -137,10 +140,15 @@ class Worker(object):
|
|||
self.con_collector = monitor.Container_Collector()
|
||||
self.hosts_collector = monitor.Collector()
|
||||
|
||||
# initialize the network
|
||||
# if worker and master run on the same node, reuse bridges
|
||||
# don't need to create new bridges
|
||||
if (self.addr == self.master):
|
||||
# delete the existing network
|
||||
[success, bridges] = ovscontrol.list_bridges()
|
||||
if success:
|
||||
for bridge in bridges:
|
||||
if bridge.startswith("docklet-br"):
|
||||
ovscontrol.del_bridge(bridge)
|
||||
else:
|
||||
logger.error(bridges)
|
||||
'''if (self.addr == self.master):
|
||||
logger.info ("master also on this node. reuse master's network")
|
||||
else:
|
||||
logger.info ("initialize network")
|
||||
|
@ -161,8 +169,8 @@ class Worker(object):
|
|||
sys.exit(1)
|
||||
logger.info ("setup GRE tunnel to master %s" % self.master)
|
||||
#network.netsetup("gre", self.master)
|
||||
if not netcontrol.gre_exists('docklet-br', self.master):
|
||||
netcontrol.setup_gre('docklet-br', self.master)
|
||||
#if not netcontrol.gre_exists('docklet-br', self.master):
|
||||
#netcontrol.setup_gre('docklet-br', self.master)'''
|
||||
|
||||
# start service of worker
|
||||
def start(self):
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
import sys,os
|
||||
sys.path.append("../src/")
|
||||
import env,requests
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print("Please enter USER_IP")
|
||||
exit()
|
||||
|
||||
userpoint = "http://" + sys.argv[1] + ":" + str(env.getenv('USER_PORT'))
|
||||
auth_key = env.getenv('AUTH_KEY')
|
||||
|
||||
def post_to_user(url = '/', data={}):
|
||||
return requests.post(userpoint+url,data=data).json()
|
||||
|
||||
cons = os.listdir('/var/lib/lxc')
|
||||
for con in cons:
|
||||
print("Update %s..."%(con))
|
||||
namesplit = con.split('-')
|
||||
user = namesplit[0]
|
||||
res = post_to_user('/user/uid/',{'username':user,'auth_key':auth_key})
|
||||
try:
|
||||
configfile = open('/var/lib/lxc/'+con+'/config','r')
|
||||
except:
|
||||
continue
|
||||
context = configfile.read()
|
||||
configfile.close()
|
||||
#print(context)
|
||||
#print(res['uid'])
|
||||
context = context.replace("docklet-br","docklet-br-"+str(res['uid']))
|
||||
newfile = open('/var/lib/lxc/'+con+'/config','w')
|
||||
newfile.write(context)
|
||||
newfile.close()
|
|
@ -323,6 +323,15 @@ def selfQuery_user(cur_user, user, form):
|
|||
result = G_usermgr.selfQuery(cur_user = cur_user)
|
||||
return json.dumps(result)
|
||||
|
||||
@app.route("/user/uid/", methods=['POST'])
|
||||
@auth_key_required
|
||||
def get_userid():
|
||||
username = request.form.get("username",None)
|
||||
if username is None:
|
||||
return json.dumps({'success':'false', 'message':'username field is required.'})
|
||||
else:
|
||||
user = User.query.filter_by(username=username).first()
|
||||
return json.dumps({'success':'true', 'uid':user.id})
|
||||
|
||||
@app.route("/user/selfModify/", methods=['POST'])
|
||||
@login_required
|
||||
|
|
Loading…
Reference in New Issue