update taskmgr

This commit is contained in:
Gallen 2018-08-08 20:15:22 +08:00
parent 36c1babcd9
commit 6059c10a1a
2 changed files with 23 additions and 6 deletions

View File

@ -5,9 +5,9 @@ import random
import json
# must import logger after initlogging, ugly
# from utils.log import initlogging
# initlogging("docklet-taskmgr")
# from utils.log import logger
from utils.log import initlogging
initlogging("docklet-taskmgr")
from utils.log import logger
# grpc
from concurrent import futures
@ -42,14 +42,17 @@ class TaskMgr(threading.Thread):
# load task information from etcd
# initial a task queue and task schedueler
# taskmgr: a taskmgr instance
def __init__(self, nodemgr, monitor_fetcher, logger, scheduler_interval=2):
def __init__(self, nodemgr, monitor_fetcher, scheduler_interval=2, external_logger=None):
threading.Thread.__init__(self)
self.thread_stop = False
self.jobmgr = None
self.task_queue = []
self.scheduler_interval = scheduler_interval
if external_logger is None:
self.logger = logger
else:
self.logger = external_logger
self.master_port = env.getenv('BATCH_MASTER_PORT')
self.worker_port = env.getenv('BATCH_WORKER_PORT')

View File

@ -135,13 +135,27 @@ def test():
jobmgr = SimulatedJobMgr()
jobmgr.start()
taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, SimulatedLogger(), scheduler_interval=2)
taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, scheduler_interval=2, SimulatedLogger())
taskmgr.set_jobmgr(jobmgr)
taskmgr.start()
add('task_0', instance_count=2, retry_count=2, timeout=60, cpu=2, memory=2048, disk=2048)
def test2():
global jobmgr
global taskmgr
jobmgr = SimulatedJobMgr()
jobmgr.start()
taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, scheduler_interval=2, SimulatedLogger())
taskmgr.set_jobmgr(jobmgr)
taskmgr.start()
add('task_0', instance_count=2, retry_count=2, timeout=60, cpu=2, memory=2048, disk=2048)
def add(taskid, instance_count, retry_count, timeout, cpu, memory, disk):
global jobmgr
global taskmgr