From 782baee617a9e31e0108946248e8b671f1a35c23 Mon Sep 17 00:00:00 2001 From: zhuyj17 Date: Tue, 5 Mar 2019 00:15:23 +0800 Subject: [PATCH] add network into taskmgr --- conf/docklet.conf.template | 11 ++-- src/master/taskmgr.py | 112 ++++++++++++++++++++++++++++++++--- src/master/testTaskWorker.py | 4 +- src/protos/rpc.proto | 2 +- src/protos/rpc_pb2.py | 6 +- src/protos/rpc_pb2_grpc.py | 4 +- src/utils/env.py | 6 +- src/utils/nettools.py | 2 +- src/worker/taskworker.py | 10 ++-- 9 files changed, 128 insertions(+), 29 deletions(-) diff --git a/conf/docklet.conf.template b/conf/docklet.conf.template index 912b6e5..be38f12 100644 --- a/conf/docklet.conf.template +++ b/conf/docklet.conf.template @@ -201,13 +201,12 @@ # default: 50051 # BATCH_WORKER_PORT=50051 -# BATCH_GATEWAY: the ip address of gateway for the containers processing -# batch jobs. default: 10.0.3.1 -# BATCH_GATEWAY=10.0.3.1 +# BATCH_NET: ip addresses range of containers for batch job, default is 10.16.0.0/16 +# BATCH_NET=10.16.0.0/16 -# BATCH_NET: ip addresses range of containers for batch job, default is 10.0.3.0/24 -# BATCH_NET=10.0.3.0/24 +# BATCH_TASK_CIDR: 2^(BATCH_TASK_CIDR) is the number of ip addresses for a task, default is 4 +# BATCH_TASK_CIDR=4 # BATCH_MAX_THREAD_WORKER: the maximun number of threads of the rpc server on # the batch job worker. default:5 -# BATCH_MAX_THREAD_WORKER=5 +# BATCH_MAX_THREAD_WORKER=5 diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 647ad33..523dcb5 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -13,11 +13,18 @@ from concurrent import futures import grpc from protos.rpc_pb2 import * from protos.rpc_pb2_grpc import MasterServicer, add_MasterServicer_to_server, WorkerStub - +from utils.nettools import netcontrol from utils import env +def ip_to_int(addr): + [a, b, c, d] = addr.split('.') + return (int(a)<<24) + (int(b)<<16) + (int(c)<<8) + int(d) + +def int_to_ip(num): + return str((num>>24)&255)+"."+str((num>>16)&255)+"."+str((num>>8)&255)+"."+str(num&255) + class Task(): - def __init__(self, configinfo, vnodeinfo, taskinfo, priority): + def __init__(self, configinfo, vnodeinfo, taskinfo, priority, max_size): self.vnodeinfo = vnodeinfo self.taskinfo = taskinfo self.status = WAITING @@ -30,6 +37,8 @@ class Task(): # priority the bigger the better # self.priority the smaller the better self.priority = int(time.time()) / 60 / 60 - priority + self.network = None + self.max_size = max_size for i in range(self.vnode_nums): self.subtask_list.append({'status':'WAITING','try_count':0}) @@ -59,10 +68,11 @@ class TaskMgr(threading.Thread): # load task information from etcd # initial a task queue and task schedueler # taskmgr: a taskmgr instance - def __init__(self, nodemgr, monitor_fetcher, scheduler_interval=2, external_logger=None): + def __init__(self, nodemgr, monitor_fetcher, master_ip, scheduler_interval=2, external_logger=None): threading.Thread.__init__(self) self.thread_stop = False self.jobmgr = None + self.master_ip = self.master_ip self.task_queue = [] self.lazy_append_list = [] self.lazy_delete_list = [] @@ -84,6 +94,19 @@ class TaskMgr(threading.Thread): # self.last_nodes_info_update_time = 0 # self.nodes_info_update_interval = 30 # (s) + self.network_lock = threading.Lock() + batch_net = env.getenv('BATCH_NET') + self.batch_cidr = int(batch_net.split('/')[1]) + batch_net = batch_net.split('/')[0] + task_cidr = int(env.getenv('BATCH_TASK_CIDR')) + task_cidr = min(task_cidr,31-self.batch_cidr) + self.task_cidr = max(task_cidr,2) + self.base_ip = ip_to_int(batch_net) + self.free_nets = [] + for i in range((1 << self.task_cidr), (1 << (32-self.batch_cidr)) - 1): + self.free_nets.append(i) + logger.info("Free nets addresses pool %s" % str(self.free_nets)) + logger.info("Each Batch Net CIDR:%s"%(str(self.task_cidr))) def queue_lock(f): @wraps(f) @@ -94,6 +117,14 @@ class TaskMgr(threading.Thread): return result return new_f + def net_lock(f): + @wraps(f) + def new_f(self, *args, **kwargs): + self.network_lock.acquire() + result = f(self, *args, **kwargs) + self.network_lock.release() + return result + return new_f def run(self): self.serve() @@ -128,14 +159,73 @@ class TaskMgr(threading.Thread): self.task_queue.append(task) self.task_queue = sorted(self.task_queue, key=lambda x: x.priority) + def stop_vnode(self, worker, task, vnodeid): + vnodeinfo = copy.copy(task.vnodeinfo) + vnodeinfo.vnodeid = vnodeid + try: + self.logger.info('[task_processor] Stopping vnode for task [%s] vnode [%d]' % (task.vnodeinfo.id, vnodeid)) + channel = grpc.insecure_channel('%s:%s' % (worker, self.worker_port)) + stub = WorkerStub(channel) + response = stub.stop_vnode(vnodeinfo) + if response.status != Reply.ACCEPTED: + raise Exception(response.message) + except Exception as e: + self.logger.error('[task_processor] rpc error message: %s' % e) + return [False, e] + return [True, ""] + + @net_lock + def acquire_task_net(self, task): + self.logger.info("[acquire_task_net] user(%s) task(%s) net(%s)"%(task.taskinfo.username, task.taskinfo.taskid, str(task.network))) + if task.network == None: + task.network = self.free_nets.pop(0) + return task.network + + @net_lock + def release_task_net(self,task): + self.logger.info("[release_task_net] user(%s) task(%s) net(%s)"%(task.taskinfo.username, task.taskinfo.taskid, str(task.network))) + if task.network == None: + return + self.free_nets.append(task.network) + self.logger.error('[release task_net] %s'%str(e)) + + def setup_tasknet(self, task, workers=None): + taskid = task.taskinfo.taskid + username = task.taskinfo.username + brname = "docklet-batch-%s-%s"%(username, taskid) + gwname = "Batch-%s-%s"%(username, taskid) + if task.network == None: + return [False, "task.network is None!"] + gatewayip = int_to_ip(self.base_ip + task.network + 1) + gatewayipcidr += "/" + str(32-self.task_cidr) + netcontrol.new_bridge(brname) + netcontrol.setup_gw(brname,gwname,gatewayipcidr,0,0) + + for wip in workers: + netcontrol.setup_gre(brname,wip) + return [True, gatewayip] + + def remove_tasknet(self, task): + taskid = task.taskinfo.taskid + username = task.taskinfo.username + brname = "docklet-batch-%s-%s"%(username, taskid) + netcontrol.del_bridge(brname) + def task_processor(self, task, vnodes_workers): task.status = RUNNING self.jobmgr.report(task.taskinfo.taskid,'running') - # properties for transaction + # properties for transactio + + self.acquire_task_net(task) + [success, gwip] = self.setup_tasknet(task,[w[1] for w in vnodes_workers]) + if not success: + return [False, gwip] token = ''.join(random.sample(string.ascii_letters + string.digits, 8)) + placed_workers = [] + # start vc for vid, worker in vnodes_workers: vnodeinfo = copy.copy(task.vnodeinfo) vnodeinfo.vnodeid = vid @@ -152,21 +242,30 @@ class TaskMgr(threading.Thread): #if not username in self.user_containers.keys(): #self.user_containers[username] = [] #self.user_containers[username].append(container_name) + ipaddr = int_to_ip(self.base_ip + task.network + vid%task.max_size + 2) + brname = "docklet-batch-%s-%s"%(username, taskid) + networkinfo = Network(ipaddr=ipaddr, gateway=gwip, masterip=self.masterip, brname=brname) + vnode.network = networkinfo try: self.logger.info('[task_processor] starting vnode for task [%s] instance [%d]' % (task.vnodeinfo.id, vid)) channel = grpc.insecure_channel('%s:%s' % (worker, self.worker_port)) stub = WorkerStub(channel) response = stub.start_vnode(vnodeinfo) + placed_workers.append(worker) if response.status != Reply.ACCEPTED: raise Exception(response.message) except Exception as e: self.logger.error('[task_processor] rpc error message: %s' % e) + task.status = FAILED vnode['status'] = FAILED vnode['try_count'] -= 1 + for pl_worker in placed_workers: + pass return #self.user_containers[username].remove(container_name) + # start tasks for vid, worker in vnodes_workers: taskinfo = copy.copy(task.taskinfo) taskinfo.vnodeid = vid @@ -181,8 +280,7 @@ class TaskMgr(threading.Thread): raise Exception(response.message) except Exception as e: self.logger.error('[task_processor] rpc error message: %s' % e) - vnode['status'] = FAILED - vnode['try_count'] -= 1 + task.status = FAILED # return task, workers def task_scheduler(self): @@ -347,7 +445,7 @@ class TaskMgr(threading.Thread): stdoutRedirectPath = json_task.get('stdOutRedPth',"")), timeout = int(json_task['expTime']), ), - priority=task_priority) + priority=task_priority,max_size=(1<