From 567955c777246eadc11e063c5e5ba72262ac78a4 Mon Sep 17 00:00:00 2001 From: zhuyj17 Date: Tue, 5 Mar 2019 01:59:12 +0800 Subject: [PATCH] update network in taskworker --- conf/lxc-script/lxc-ifdown | 2 +- src/master/taskmgr.py | 32 ++++++++++++++++++-------------- src/master/testTaskWorker.py | 7 ++++--- src/worker/taskworker.py | 12 ++++++++++++ 4 files changed, 35 insertions(+), 18 deletions(-) diff --git a/conf/lxc-script/lxc-ifdown b/conf/lxc-script/lxc-ifdown index 7d55723..927c6d5 100755 --- a/conf/lxc-script/lxc-ifdown +++ b/conf/lxc-script/lxc-ifdown @@ -9,7 +9,7 @@ ovs-vsctl --if-exists del-port $Bridge $5 cnt=$(ovs-vsctl list-ports ${Bridge} | wc -l) if [ "$cnt" = "1" ]; then - greport=$(ovs-vsctl list-ports $(Bridge) | grep "^gre-[[:digit:]][[:digit:]]*-[[:digit:]][[:digit:]]*\.[[:digit:]][[:digit:]]*\.[[:digit:]][[:digit:]]*\.[[:digit:]][[:digit:]]*$" | wc -l) + greport=$(ovs-vsctl list-ports ${Bridge} | grep "gre" | wc -l) if [ "$greport" = "1" ]; then ovs-vsctl del-br $Bridge fi diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 523dcb5..f96c7e6 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -37,7 +37,8 @@ class Task(): # priority the bigger the better # self.priority the smaller the better self.priority = int(time.time()) / 60 / 60 - priority - self.network = None + self.task_base_ip = None + self.ips = None self.max_size = max_size for i in range(self.vnode_nums): @@ -175,18 +176,18 @@ class TaskMgr(threading.Thread): return [True, ""] @net_lock - def acquire_task_net(self, task): - self.logger.info("[acquire_task_net] user(%s) task(%s) net(%s)"%(task.taskinfo.username, task.taskinfo.taskid, str(task.network))) - if task.network == None: - task.network = self.free_nets.pop(0) - return task.network + def acquire_task_ips(self, task): + self.logger.info("[acquire_task_ips] user(%s) task(%s) net(%s)"%(task.taskinfo.username, task.taskinfo.taskid, str(task.task_base_ip))) + if task.task_base_ip == None: + task.task_base_ip = self.free_nets.pop(0) + return task.task_base_ip @net_lock - def release_task_net(self,task): - self.logger.info("[release_task_net] user(%s) task(%s) net(%s)"%(task.taskinfo.username, task.taskinfo.taskid, str(task.network))) - if task.network == None: + def release_task_ips(self,task): + self.logger.info("[release_task_ips] user(%s) task(%s) net(%s)"%(task.taskinfo.username, task.taskinfo.taskid, str(task.task_base_ip))) + if task.task_base_ip == None: return - self.free_nets.append(task.network) + self.free_nets.append(task.task_base_ip) self.logger.error('[release task_net] %s'%str(e)) def setup_tasknet(self, task, workers=None): @@ -194,9 +195,9 @@ class TaskMgr(threading.Thread): username = task.taskinfo.username brname = "docklet-batch-%s-%s"%(username, taskid) gwname = "Batch-%s-%s"%(username, taskid) - if task.network == None: - return [False, "task.network is None!"] - gatewayip = int_to_ip(self.base_ip + task.network + 1) + if task.task_base_ip == None: + return [False, "task.task_base_ip is None!"] + gatewayip = int_to_ip(self.base_ip + task.task_base_ip + 1) gatewayipcidr += "/" + str(32-self.task_cidr) netcontrol.new_bridge(brname) netcontrol.setup_gw(brname,gwname,gatewayipcidr,0,0) @@ -218,6 +219,9 @@ class TaskMgr(threading.Thread): # properties for transactio self.acquire_task_net(task) + #task.ips = [] + #for i in + #need to create hosts [success, gwip] = self.setup_tasknet(task,[w[1] for w in vnodes_workers]) if not success: return [False, gwip] @@ -242,7 +246,7 @@ class TaskMgr(threading.Thread): #if not username in self.user_containers.keys(): #self.user_containers[username] = [] #self.user_containers[username].append(container_name) - ipaddr = int_to_ip(self.base_ip + task.network + vid%task.max_size + 2) + ipaddr = int_to_ip(self.base_ip + task.task_base_ip + vid%task.max_size + 2) brname = "docklet-batch-%s-%s"%(username, taskid) networkinfo = Network(ipaddr=ipaddr, gateway=gwip, masterip=self.masterip, brname=brname) vnode.network = networkinfo diff --git a/src/master/testTaskWorker.py b/src/master/testTaskWorker.py index b434b93..d444884 100644 --- a/src/master/testTaskWorker.py +++ b/src/master/testTaskWorker.py @@ -39,7 +39,8 @@ def stop_task(): def stop_vnode(): channel = grpc.insecure_channel('localhost:50051') stub = rpc_pb2_grpc.WorkerStub(channel) - vnodeinfo = rpc_pb2.VNodeInfo(taskid="test",username="root",vnodeid=1) + network = rpc_pb2.Network(brname="batch-root-test") + vnodeinfo = rpc_pb2.VNodeInfo(taskid="test",username="root",vnodeid=1,vnode=rpc_pb2.VNode(network=network)) response = stub.stop_vnode(vnodeinfo) print("Batch client received: " + str(response.status)+" "+response.message) @@ -58,8 +59,8 @@ def start_task(): if __name__ == '__main__': #for i in range(10): - #run() + run() #start_task() - stop_vnode() + #stop_vnode() #time.sleep(4) #stop_task() diff --git a/src/worker/taskworker.py b/src/worker/taskworker.py index d99693f..b60a9fd 100755 --- a/src/worker/taskworker.py +++ b/src/worker/taskworker.py @@ -19,6 +19,7 @@ from utils import imagemgr,etcdlib,gputools from utils.lvmtool import sys_run from worker import ossmounter from protos import rpc_pb2, rpc_pb2_grpc +from utils.nettools import netcontrol _ONE_DAY_IN_SECONDS = 60 * 60 * 24 MAX_RUNNING_TIME = _ONE_DAY_IN_SECONDS @@ -171,6 +172,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): ipaddr = request.vnode.network.ipaddr gateway = request.vnode.network.gateway brname = request.vnode.network.brname + masterip = request.vnode.network.masterip #create container [success, msg] = self.create_container(taskid, vnodeid, username, image, lxcname, instance_type, ipaddr, gateway, brname) @@ -195,6 +197,8 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): logger.info('start container %s success' % lxcname) + netcontrol.setup_gre(brname, masterip) + #add GPU [success, msg] = self.add_gpu_device(lxcname,gpu_need) if not success: @@ -206,6 +210,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="") def start_task(self, request, context): + logger.info('start task with config: ' + str(request)) taskid = request.taskid username = request.username vnodeid = request.vnodeid @@ -228,6 +233,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="") def stop_task(self, request, context): + logger.info('stop task with config: ' + str(request)) taskid = request.taskid username = request.username vnodeid = request.vnodeid @@ -238,9 +244,11 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): # stop and remove container def stop_vnode(self, request, context): + logger.info('stop vnode with config: ' + str(request)) taskid = request.taskid username = request.username vnodeid = request.vnodeid + brname = request.vnode.network.brname mount_list = request.vnode.mount lxcname = '%s-batch-%s-%s' % (username,taskid,str(vnodeid)) @@ -257,6 +265,10 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): else: logger.error("delete container %s failed" % lxcname) + #del ovs bridge + if brname is not None: + netcontrol.del_bridge(brname) + #release gpu self.release_gpu_device(lxcname)