update taskmgr

This commit is contained in:
Gallen 2018-07-19 18:16:48 +08:00
parent 6ecf54da74
commit c6707d406d
2 changed files with 47 additions and 6 deletions

View File

@ -905,6 +905,9 @@ if __name__ == '__main__':
G_cloudmgr = cloudmgr.CloudMgr()
G_taskmgr = taskmgr.TaskMgr()
G_jobmgr = jobmgr.JobMgr(taskmgr)
G_jobmgr.start()
G_taskmgr.set_jobmgr(G_jobmgr)
G_taskmgr.start()
# start NodeMgr and NodeMgr will wait for all nodes to start ...
G_nodemgr = nodemgr.NodeMgr(G_networkmgr, etcdclient, addr = ipaddr, mode=mode)

View File

@ -2,6 +2,7 @@ import threading
import time
import string
import random
import json
import master.monitor
@ -13,7 +14,7 @@ from utils.log import logger
# grpc
from concurrent import futures
import grpc
from protos.rpc_pb2 import Task, TaskMsg, Status, Reply
from protos.rpc_pb2 import Task, TaskMsg, Status, Reply, Parameters, Cluster, Command, Image, Mount, Instance
from protos.rpc_pb2_grpc import MasterServicer, add_MasterServicer_to_server, WorkerStub
@ -35,8 +36,7 @@ class TaskMgr(threading.Thread):
def __init__(self, nodemgr):
threading.Thread.__init__(self)
self.thread_stop = False
# tasks
self.jobmgr = None
self.task_queue = []
self.heart_beat_timeout = 60 # (s)
@ -73,7 +73,7 @@ class TaskMgr(threading.Thread):
# this method is called when worker send heart-beat rpc request
def on_task_report(self, report):
logger.info('[on_task_report] receive task report: id %d, status %d' % (report.taskid, report.instanceStatus))
logger.info('[on_task_report] receive task report: id %s-%d, status %d' % (report.taskid, report.instanceid, report.instanceStatus))
task = get_task(report.taskid)
if task == None:
logger.error('[on_task_report] task not found')
@ -81,6 +81,7 @@ class TaskMgr(threading.Thread):
instance = task.instance_list[report.instanceid]
if instance['token'] != report.token:
logger.warning('[on_task_report] wrong token')
return
instance['status'] = report.instanceStatus
@ -107,12 +108,16 @@ class TaskMgr(threading.Thread):
failed = True
else:
return
if self.jobmgr is None:
logger.error('[check_task_completed] jobmgr is None!')
return
if failed:
# TODO tell jobmgr task failed
task.status = Status.FAILED
else:
# TODO tell jobmgr task completed
task.status = Status.COMPLETED
logger.info('task %s completed' % task.id)
self.task_queue.remove(task)
@ -192,14 +197,47 @@ class TaskMgr(threading.Thread):
return self.all_nodes
def set_jobmgr(self, jobmgr):
self.jobmgr = jobmgr
# user: username
# task: a json string
# save the task information into database
# called when jobmgr assign task to taskmgr
def add_task(self, task):
def add_task(self, username, taskid, json_task):
# decode json string to object defined in grpc
task.instance_list = []
json_task = json.loads(json_task)
task = Task(
id = taskid,
username = username,
instanceCount = json_task['instanceCount'],
maxRetryCount = json_task['maxRetryCount'],
timeout = json_task['timeout'],
parameters = Parameters(
command = Command(
commandLine = json_task['parameters']['command']['commandLine'],
packagePath = json_task['parameters']['command']['packagePath'],
envVars = json_task['parameters']['command']['envVars']),
stderrRedirectPath = json_task['parameters']['stderrRedirectPath'],
stdoutRedirectPath = json_task['parameters']['stdoutRedirectPath']),
cluster = Cluster(
,image = Image(
name = json_task['cluster']['image']['name'],
type = json_task['cluster']['image']['type'],
owner = json_task['cluster']['image']['owner']),
instance = Instance(
cpu = json_task['cluster']['instance']['cpu'],
memory = json_task['cluster']['instance']['memory'],
disk = json_task['cluster']['instance']['disk'],
gpu = json_task['cluster']['instance']['gpu'])))
task.cluster.mount = []
for mount in json_task['cluster']['mount']:
task.cluster.mount.append(Mount(localPath=mount['localPath'], remotePath=mount['remotePath']))
# local properties
task.status = Status.WAITING
task.instance_list = []
self.task_queue.append(task)