From 6059c10a1a15e264f6e54ae319310fb0a8379113 Mon Sep 17 00:00:00 2001 From: Gallen Date: Wed, 8 Aug 2018 20:15:22 +0800 Subject: [PATCH] update taskmgr --- src/master/taskmgr.py | 13 ++++++++----- src/master/testTaskMgr.py | 16 +++++++++++++++- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index ab42aab..1f4587c 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -5,9 +5,9 @@ import random import json # must import logger after initlogging, ugly -# from utils.log import initlogging -# initlogging("docklet-taskmgr") -# from utils.log import logger +from utils.log import initlogging +initlogging("docklet-taskmgr") +from utils.log import logger # grpc from concurrent import futures @@ -42,14 +42,17 @@ class TaskMgr(threading.Thread): # load task information from etcd # initial a task queue and task schedueler # taskmgr: a taskmgr instance - def __init__(self, nodemgr, monitor_fetcher, logger, scheduler_interval=2): + def __init__(self, nodemgr, monitor_fetcher, scheduler_interval=2, external_logger=None): threading.Thread.__init__(self) self.thread_stop = False self.jobmgr = None self.task_queue = [] self.scheduler_interval = scheduler_interval - self.logger = logger + if external_logger is None: + self.logger = logger + else: + self.logger = external_logger self.master_port = env.getenv('BATCH_MASTER_PORT') self.worker_port = env.getenv('BATCH_WORKER_PORT') diff --git a/src/master/testTaskMgr.py b/src/master/testTaskMgr.py index c421ed0..af8c873 100644 --- a/src/master/testTaskMgr.py +++ b/src/master/testTaskMgr.py @@ -135,13 +135,27 @@ def test(): jobmgr = SimulatedJobMgr() jobmgr.start() - taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, SimulatedLogger(), scheduler_interval=2) + taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, scheduler_interval=2, SimulatedLogger()) taskmgr.set_jobmgr(jobmgr) taskmgr.start() add('task_0', instance_count=2, retry_count=2, timeout=60, cpu=2, memory=2048, disk=2048) +def test2(): + global jobmgr + global taskmgr + jobmgr = SimulatedJobMgr() + jobmgr.start() + + taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, scheduler_interval=2, SimulatedLogger()) + taskmgr.set_jobmgr(jobmgr) + taskmgr.start() + + add('task_0', instance_count=2, retry_count=2, timeout=60, cpu=2, memory=2048, disk=2048) + + + def add(taskid, instance_count, retry_count, timeout, cpu, memory, disk): global jobmgr global taskmgr