From c6707d406d8c23ea87002fc6850a68839ca0363e Mon Sep 17 00:00:00 2001 From: Gallen Date: Thu, 19 Jul 2018 18:16:48 +0800 Subject: [PATCH] update taskmgr --- src/master/httprest.py | 3 +++ src/master/taskmgr.py | 50 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/src/master/httprest.py b/src/master/httprest.py index 9250ed6..7ef349e 100755 --- a/src/master/httprest.py +++ b/src/master/httprest.py @@ -905,6 +905,9 @@ if __name__ == '__main__': G_cloudmgr = cloudmgr.CloudMgr() G_taskmgr = taskmgr.TaskMgr() G_jobmgr = jobmgr.JobMgr(taskmgr) + G_jobmgr.start() + G_taskmgr.set_jobmgr(G_jobmgr) + G_taskmgr.start() # start NodeMgr and NodeMgr will wait for all nodes to start ... G_nodemgr = nodemgr.NodeMgr(G_networkmgr, etcdclient, addr = ipaddr, mode=mode) diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 470aefb..1b2e931 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -2,6 +2,7 @@ import threading import time import string import random +import json import master.monitor @@ -13,7 +14,7 @@ from utils.log import logger # grpc from concurrent import futures import grpc -from protos.rpc_pb2 import Task, TaskMsg, Status, Reply +from protos.rpc_pb2 import Task, TaskMsg, Status, Reply, Parameters, Cluster, Command, Image, Mount, Instance from protos.rpc_pb2_grpc import MasterServicer, add_MasterServicer_to_server, WorkerStub @@ -35,8 +36,7 @@ class TaskMgr(threading.Thread): def __init__(self, nodemgr): threading.Thread.__init__(self) self.thread_stop = False - - # tasks + self.jobmgr = None self.task_queue = [] self.heart_beat_timeout = 60 # (s) @@ -73,7 +73,7 @@ class TaskMgr(threading.Thread): # this method is called when worker send heart-beat rpc request def on_task_report(self, report): - logger.info('[on_task_report] receive task report: id %d, status %d' % (report.taskid, report.instanceStatus)) + logger.info('[on_task_report] receive task report: id %s-%d, status %d' % (report.taskid, report.instanceid, report.instanceStatus)) task = get_task(report.taskid) if task == None: logger.error('[on_task_report] task not found') @@ -81,6 +81,7 @@ class TaskMgr(threading.Thread): instance = task.instance_list[report.instanceid] if instance['token'] != report.token: + logger.warning('[on_task_report] wrong token') return instance['status'] = report.instanceStatus @@ -107,12 +108,16 @@ class TaskMgr(threading.Thread): failed = True else: return + if self.jobmgr is None: + logger.error('[check_task_completed] jobmgr is None!') + return if failed: # TODO tell jobmgr task failed task.status = Status.FAILED else: # TODO tell jobmgr task completed task.status = Status.COMPLETED + logger.info('task %s completed' % task.id) self.task_queue.remove(task) @@ -192,14 +197,47 @@ class TaskMgr(threading.Thread): return self.all_nodes + def set_jobmgr(self, jobmgr): + self.jobmgr = jobmgr + + # user: username # task: a json string # save the task information into database # called when jobmgr assign task to taskmgr - def add_task(self, task): + def add_task(self, username, taskid, json_task): # decode json string to object defined in grpc - task.instance_list = [] + json_task = json.loads(json_task) + task = Task( + id = taskid, + username = username, + instanceCount = json_task['instanceCount'], + maxRetryCount = json_task['maxRetryCount'], + timeout = json_task['timeout'], + parameters = Parameters( + command = Command( + commandLine = json_task['parameters']['command']['commandLine'], + packagePath = json_task['parameters']['command']['packagePath'], + envVars = json_task['parameters']['command']['envVars']), + stderrRedirectPath = json_task['parameters']['stderrRedirectPath'], + stdoutRedirectPath = json_task['parameters']['stdoutRedirectPath']), + cluster = Cluster( + ,image = Image( + name = json_task['cluster']['image']['name'], + type = json_task['cluster']['image']['type'], + owner = json_task['cluster']['image']['owner']), + instance = Instance( + cpu = json_task['cluster']['instance']['cpu'], + memory = json_task['cluster']['instance']['memory'], + disk = json_task['cluster']['instance']['disk'], + gpu = json_task['cluster']['instance']['gpu']))) + task.cluster.mount = [] + for mount in json_task['cluster']['mount']: + task.cluster.mount.append(Mount(localPath=mount['localPath'], remotePath=mount['remotePath'])) + + # local properties task.status = Status.WAITING + task.instance_list = [] self.task_queue.append(task)