diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 6e23f3b..92b5575 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -1,6 +1,8 @@ import threading import time +import master.monitor + # must import logger after initlogging, ugly from utils.log import initlogging initlogging("docklet-taskmgr") @@ -26,19 +28,28 @@ class TaskMgr(threading.Thread): # load task information from etcd # initial a task queue and task schedueler # taskmgr: a taskmgr instance - def __init__(self): + def __init__(self, nodemgr): threading.Thread.__init__(self) self.thread_stop = False - self.taskQueue = [] + + # tasks + self.task_queue = [] + + # nodes + self.nodemgr = nodemgr + self.all_nodes = None + self.last_nodes_info_update_time = 0 + self.nodes_info_update_interval = 30 # (s) def run(self): self.serve() while not self.thread_stop: - task = self.task_scheduler() - if task is not None: - self.task_processor(task) - time.sleep(2) + task, instance_id, worker = self.task_scheduler() + if task is not None and worker is not None: + self.task_processor(task, instance_id, worker) + else: + time.sleep(2) def serve(self): @@ -46,11 +57,13 @@ class TaskMgr(threading.Thread): add_MasterServicer_to_server(TaskReporter(self), self.server) self.server.add_insecure_port('[::]:50051') self.server.start() + logger.info('[taskmgr_rpc] start rpc server') def stop(self): self.thread_stop = True self.server.stop(0) + logger.info('[taskmgr_rpc] stop rpc server') # this method is called when worker send heart-beat rpc request @@ -61,41 +74,93 @@ class TaskMgr(threading.Thread): logger.error('[on_task_report] task not found') return - task.status = report.status - if task.status == Task.RUNNING: + instance_id = report.parameters.command.envVars['INSTANCE_ID'] + instance = task.instance_list[instance_id] + + if report.status == Task.RUNNING: pass - elif task.status == Task.COMPLETED: - # tell jobmgr - pass - elif task.status == Task.FAILED || task.status == Task.TIMEOUT: - # retry - if task.maxRetryCount <= 0: - # tell jobmgr - pass - else: - # decrease max retry count & waiting for retry - task.maxRetryCount -= 1 - task.status = Task.WAITING + elif report.status == Task.COMPLETED: + instance['status'] = 'completed' + check_task_completed(task) + elif report.status == Task.FAILED || report.status == Task.TIMEOUT: + instance['status'] = 'failed' + if instance['try_count'] > task.maxRetryCount: + check_task_completed(task) else: logger.error('[on_task_report] receive report from waiting task') - # this is a thread to process task(or a instance) - def task_processor(self,task): - # call the rpc to call a function in worker - # create container -> execute task - # (one instance or multiple instances) - # retry when failed - print('processing %s' % task) + def check_task_completed(self, task): + if len(task.instance_list) < task.instanceCount: + return + failed = False + for instance in task.instance_list: + if instance['status'] == 'running': + return + if instance['status'] == 'failed': + if instance['try_count'] > task.maxRetryCount: + failed = True + else: + return + if failed: + # tell jobmgr task failed + task.status = Task.FAILED + else: + # tell jobmgr task completed + task.status = Task.COMPLETED + self.task_queue.remove(task) - # this is a thread to schdule the tasks + def task_processor(self, task, instance_id, worker): + task.status = Task.RUNNING + task.parameters.command.envVars['INSTANCE_ID'] = instance_id + # TODO call the rpc to call a function in worker + print('processing %s' % task.id) + + + # return task, worker def task_scheduler(self): - try: - task = self.taskQueue.pop(0) - except: - task = None - return task + # simple FIFO + for task in self.task_queue: + worker = self.find_proper_worker(task) + if worker is not None: + # find instance to retry + for instance, index in enumerate(task.instance_list): + if instance['status'] == 'failed' and instance['try_count'] <= task.maxRetryCount: + instance['try_count'] += 1 + return task, index, worker + + # start new instance + if len(task.instance_list) < task.instanceCount: + instance = {} + instance['status'] = 'running' + instance['try_count'] = 0 + task.instance_list.append(instance) + return task, len(task.instance_list) - 1, worker + return None + + + def find_proper_worker(self, task): + nodes = get_all_nodes() + if nodes is None or len(nodes) == 0: + logger.warning('[task_scheduler] running nodes not found') + return None + + # TODO + return nodes[0] + + + def get_all_nodes(self): + # cache running nodes + if self.all_nodes is not None and time.time() - self.last_nodes_info_update_time < self.nodes_info_update_interval: + return self.all_nodes + # get running nodes + node_ips = self.nodemgr.get_nodeips() + self.all_nodes = [] + for node_ip in node_ips: + fetcher = master.monitor.Fetcher(node_ip) + self.all_nodes.append(fetcher.info) + return self.all_nodes # user: username @@ -103,13 +168,15 @@ class TaskMgr(threading.Thread): # save the task information into database # called when jobmgr assign task to taskmgr def add_task(self, task): - pass + # decode json string to object defined in grpc + task.instance_list = [] + self.task_queue.append(task) # user: username # get the information of a task, including the status, task description and other information def get_task(self, taskid): - for task in self.taskQueue: + for task in self.task_queue: if task.id == taskid: return task return None