update network in taskworker

This commit is contained in:
zhuyj17 2019-03-05 01:59:12 +08:00
parent 782baee617
commit 567955c777
4 changed files with 35 additions and 18 deletions

View File

@ -9,7 +9,7 @@
ovs-vsctl --if-exists del-port $Bridge $5 ovs-vsctl --if-exists del-port $Bridge $5
cnt=$(ovs-vsctl list-ports ${Bridge} | wc -l) cnt=$(ovs-vsctl list-ports ${Bridge} | wc -l)
if [ "$cnt" = "1" ]; then if [ "$cnt" = "1" ]; then
greport=$(ovs-vsctl list-ports $(Bridge) | grep "^gre-[[:digit:]][[:digit:]]*-[[:digit:]][[:digit:]]*\.[[:digit:]][[:digit:]]*\.[[:digit:]][[:digit:]]*\.[[:digit:]][[:digit:]]*$" | wc -l) greport=$(ovs-vsctl list-ports ${Bridge} | grep "gre" | wc -l)
if [ "$greport" = "1" ]; then if [ "$greport" = "1" ]; then
ovs-vsctl del-br $Bridge ovs-vsctl del-br $Bridge
fi fi

View File

@ -37,7 +37,8 @@ class Task():
# priority the bigger the better # priority the bigger the better
# self.priority the smaller the better # self.priority the smaller the better
self.priority = int(time.time()) / 60 / 60 - priority self.priority = int(time.time()) / 60 / 60 - priority
self.network = None self.task_base_ip = None
self.ips = None
self.max_size = max_size self.max_size = max_size
for i in range(self.vnode_nums): for i in range(self.vnode_nums):
@ -175,18 +176,18 @@ class TaskMgr(threading.Thread):
return [True, ""] return [True, ""]
@net_lock @net_lock
def acquire_task_net(self, task): def acquire_task_ips(self, task):
self.logger.info("[acquire_task_net] user(%s) task(%s) net(%s)"%(task.taskinfo.username, task.taskinfo.taskid, str(task.network))) self.logger.info("[acquire_task_ips] user(%s) task(%s) net(%s)"%(task.taskinfo.username, task.taskinfo.taskid, str(task.task_base_ip)))
if task.network == None: if task.task_base_ip == None:
task.network = self.free_nets.pop(0) task.task_base_ip = self.free_nets.pop(0)
return task.network return task.task_base_ip
@net_lock @net_lock
def release_task_net(self,task): def release_task_ips(self,task):
self.logger.info("[release_task_net] user(%s) task(%s) net(%s)"%(task.taskinfo.username, task.taskinfo.taskid, str(task.network))) self.logger.info("[release_task_ips] user(%s) task(%s) net(%s)"%(task.taskinfo.username, task.taskinfo.taskid, str(task.task_base_ip)))
if task.network == None: if task.task_base_ip == None:
return return
self.free_nets.append(task.network) self.free_nets.append(task.task_base_ip)
self.logger.error('[release task_net] %s'%str(e)) self.logger.error('[release task_net] %s'%str(e))
def setup_tasknet(self, task, workers=None): def setup_tasknet(self, task, workers=None):
@ -194,9 +195,9 @@ class TaskMgr(threading.Thread):
username = task.taskinfo.username username = task.taskinfo.username
brname = "docklet-batch-%s-%s"%(username, taskid) brname = "docklet-batch-%s-%s"%(username, taskid)
gwname = "Batch-%s-%s"%(username, taskid) gwname = "Batch-%s-%s"%(username, taskid)
if task.network == None: if task.task_base_ip == None:
return [False, "task.network is None!"] return [False, "task.task_base_ip is None!"]
gatewayip = int_to_ip(self.base_ip + task.network + 1) gatewayip = int_to_ip(self.base_ip + task.task_base_ip + 1)
gatewayipcidr += "/" + str(32-self.task_cidr) gatewayipcidr += "/" + str(32-self.task_cidr)
netcontrol.new_bridge(brname) netcontrol.new_bridge(brname)
netcontrol.setup_gw(brname,gwname,gatewayipcidr,0,0) netcontrol.setup_gw(brname,gwname,gatewayipcidr,0,0)
@ -218,6 +219,9 @@ class TaskMgr(threading.Thread):
# properties for transactio # properties for transactio
self.acquire_task_net(task) self.acquire_task_net(task)
#task.ips = []
#for i in
#need to create hosts
[success, gwip] = self.setup_tasknet(task,[w[1] for w in vnodes_workers]) [success, gwip] = self.setup_tasknet(task,[w[1] for w in vnodes_workers])
if not success: if not success:
return [False, gwip] return [False, gwip]
@ -242,7 +246,7 @@ class TaskMgr(threading.Thread):
#if not username in self.user_containers.keys(): #if not username in self.user_containers.keys():
#self.user_containers[username] = [] #self.user_containers[username] = []
#self.user_containers[username].append(container_name) #self.user_containers[username].append(container_name)
ipaddr = int_to_ip(self.base_ip + task.network + vid%task.max_size + 2) ipaddr = int_to_ip(self.base_ip + task.task_base_ip + vid%task.max_size + 2)
brname = "docklet-batch-%s-%s"%(username, taskid) brname = "docklet-batch-%s-%s"%(username, taskid)
networkinfo = Network(ipaddr=ipaddr, gateway=gwip, masterip=self.masterip, brname=brname) networkinfo = Network(ipaddr=ipaddr, gateway=gwip, masterip=self.masterip, brname=brname)
vnode.network = networkinfo vnode.network = networkinfo

View File

@ -39,7 +39,8 @@ def stop_task():
def stop_vnode(): def stop_vnode():
channel = grpc.insecure_channel('localhost:50051') channel = grpc.insecure_channel('localhost:50051')
stub = rpc_pb2_grpc.WorkerStub(channel) stub = rpc_pb2_grpc.WorkerStub(channel)
vnodeinfo = rpc_pb2.VNodeInfo(taskid="test",username="root",vnodeid=1) network = rpc_pb2.Network(brname="batch-root-test")
vnodeinfo = rpc_pb2.VNodeInfo(taskid="test",username="root",vnodeid=1,vnode=rpc_pb2.VNode(network=network))
response = stub.stop_vnode(vnodeinfo) response = stub.stop_vnode(vnodeinfo)
print("Batch client received: " + str(response.status)+" "+response.message) print("Batch client received: " + str(response.status)+" "+response.message)
@ -58,8 +59,8 @@ def start_task():
if __name__ == '__main__': if __name__ == '__main__':
#for i in range(10): #for i in range(10):
#run() run()
#start_task() #start_task()
stop_vnode() #stop_vnode()
#time.sleep(4) #time.sleep(4)
#stop_task() #stop_task()

View File

@ -19,6 +19,7 @@ from utils import imagemgr,etcdlib,gputools
from utils.lvmtool import sys_run from utils.lvmtool import sys_run
from worker import ossmounter from worker import ossmounter
from protos import rpc_pb2, rpc_pb2_grpc from protos import rpc_pb2, rpc_pb2_grpc
from utils.nettools import netcontrol
_ONE_DAY_IN_SECONDS = 60 * 60 * 24 _ONE_DAY_IN_SECONDS = 60 * 60 * 24
MAX_RUNNING_TIME = _ONE_DAY_IN_SECONDS MAX_RUNNING_TIME = _ONE_DAY_IN_SECONDS
@ -171,6 +172,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
ipaddr = request.vnode.network.ipaddr ipaddr = request.vnode.network.ipaddr
gateway = request.vnode.network.gateway gateway = request.vnode.network.gateway
brname = request.vnode.network.brname brname = request.vnode.network.brname
masterip = request.vnode.network.masterip
#create container #create container
[success, msg] = self.create_container(taskid, vnodeid, username, image, lxcname, instance_type, ipaddr, gateway, brname) [success, msg] = self.create_container(taskid, vnodeid, username, image, lxcname, instance_type, ipaddr, gateway, brname)
@ -195,6 +197,8 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
logger.info('start container %s success' % lxcname) logger.info('start container %s success' % lxcname)
netcontrol.setup_gre(brname, masterip)
#add GPU #add GPU
[success, msg] = self.add_gpu_device(lxcname,gpu_need) [success, msg] = self.add_gpu_device(lxcname,gpu_need)
if not success: if not success:
@ -206,6 +210,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="") return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
def start_task(self, request, context): def start_task(self, request, context):
logger.info('start task with config: ' + str(request))
taskid = request.taskid taskid = request.taskid
username = request.username username = request.username
vnodeid = request.vnodeid vnodeid = request.vnodeid
@ -228,6 +233,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="") return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
def stop_task(self, request, context): def stop_task(self, request, context):
logger.info('stop task with config: ' + str(request))
taskid = request.taskid taskid = request.taskid
username = request.username username = request.username
vnodeid = request.vnodeid vnodeid = request.vnodeid
@ -238,9 +244,11 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
# stop and remove container # stop and remove container
def stop_vnode(self, request, context): def stop_vnode(self, request, context):
logger.info('stop vnode with config: ' + str(request))
taskid = request.taskid taskid = request.taskid
username = request.username username = request.username
vnodeid = request.vnodeid vnodeid = request.vnodeid
brname = request.vnode.network.brname
mount_list = request.vnode.mount mount_list = request.vnode.mount
lxcname = '%s-batch-%s-%s' % (username,taskid,str(vnodeid)) lxcname = '%s-batch-%s-%s' % (username,taskid,str(vnodeid))
@ -257,6 +265,10 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
else: else:
logger.error("delete container %s failed" % lxcname) logger.error("delete container %s failed" % lxcname)
#del ovs bridge
if brname is not None:
netcontrol.del_bridge(brname)
#release gpu #release gpu
self.release_gpu_device(lxcname) self.release_gpu_device(lxcname)