From 36c1babcd9dc5931c4da4fe682105602a2a901bc Mon Sep 17 00:00:00 2001 From: Gallen Date: Wed, 8 Aug 2018 13:20:19 +0800 Subject: [PATCH] remove worker timeout & remove disk request --- src/master/taskmgr.py | 19 +++++++++++-------- src/master/testTaskMgr.py | 2 +- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index f7b9ce4..ab42aab 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -42,13 +42,12 @@ class TaskMgr(threading.Thread): # load task information from etcd # initial a task queue and task schedueler # taskmgr: a taskmgr instance - def __init__(self, nodemgr, monitor_fetcher, logger, worker_timeout=60, scheduler_interval=2): + def __init__(self, nodemgr, monitor_fetcher, logger, scheduler_interval=2): threading.Thread.__init__(self) self.thread_stop = False self.jobmgr = None self.task_queue = [] - self.heart_beat_timeout = worker_timeout # (s) self.scheduler_interval = scheduler_interval self.logger = logger @@ -109,7 +108,6 @@ class TaskMgr(threading.Thread): instance['status'] = report.instanceStatus instance['error_msg'] = report.errmsg - instance['last_update_time'] = time.time() if report.instanceStatus == COMPLETED: self.check_task_completed(task) @@ -174,7 +172,6 @@ class TaskMgr(threading.Thread): instance = task.instance_list[instance_id] instance['status'] = RUNNING - instance['last_update_time'] = time.time() instance['try_count'] += 1 instance['token'] = task.info.token instance['worker'] = worker_ip @@ -209,12 +206,12 @@ class TaskMgr(threading.Thread): return task, index, worker # find timeout instance elif instance['status'] == RUNNING: - if time.time() - instance['last_update_time'] > self.heart_beat_timeout: + if not is_alive(instance['worker']): instance['status'] = FAILED instance['token'] = '' self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu - self.logger.warning('[task_scheduler] worker timeout task [%s] instance [%d]' % (task.info.id, index)) + self.logger.warning('[task_scheduler] worker dead, retry task [%s] instance [%d]' % (task.info.id, index)) if worker is not None: return task, index, worker @@ -241,8 +238,8 @@ class TaskMgr(threading.Thread): continue if task.info.cluster.instance.memory > worker_info['memory']: continue - if task.info.cluster.instance.disk > worker_info['disk']: - continue + # if task.info.cluster.instance.disk > worker_info['disk']: + # continue if task.info.cluster.instance.gpu > worker_info['gpu']: continue return worker_ip @@ -258,6 +255,11 @@ class TaskMgr(threading.Thread): all_nodes = [(node_ip, self.get_worker_resource_info(node_ip)) for node_ip in node_ips] return all_nodes + + def is_alive(self, worker): + nodes = self.nodemgr.get_nodeips() + return worker in nodes + def get_worker_resource_info(self, worker_ip): fetcher = self.monitor_fetcher(worker_ip) @@ -324,3 +326,4 @@ class TaskMgr(threading.Thread): if task.info.id == taskid: return task return None + \ No newline at end of file diff --git a/src/master/testTaskMgr.py b/src/master/testTaskMgr.py index 23207f8..c421ed0 100644 --- a/src/master/testTaskMgr.py +++ b/src/master/testTaskMgr.py @@ -135,7 +135,7 @@ def test(): jobmgr = SimulatedJobMgr() jobmgr.start() - taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, SimulatedLogger(), worker_timeout=10, scheduler_interval=2) + taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, SimulatedLogger(), scheduler_interval=2) taskmgr.set_jobmgr(jobmgr) taskmgr.start()