remove worker timeout & remove disk request
This commit is contained in:
parent
2d80d2505a
commit
36c1babcd9
|
@ -42,13 +42,12 @@ class TaskMgr(threading.Thread):
|
|||
# load task information from etcd
|
||||
# initial a task queue and task schedueler
|
||||
# taskmgr: a taskmgr instance
|
||||
def __init__(self, nodemgr, monitor_fetcher, logger, worker_timeout=60, scheduler_interval=2):
|
||||
def __init__(self, nodemgr, monitor_fetcher, logger, scheduler_interval=2):
|
||||
threading.Thread.__init__(self)
|
||||
self.thread_stop = False
|
||||
self.jobmgr = None
|
||||
self.task_queue = []
|
||||
|
||||
self.heart_beat_timeout = worker_timeout # (s)
|
||||
self.scheduler_interval = scheduler_interval
|
||||
self.logger = logger
|
||||
|
||||
|
@ -109,7 +108,6 @@ class TaskMgr(threading.Thread):
|
|||
|
||||
instance['status'] = report.instanceStatus
|
||||
instance['error_msg'] = report.errmsg
|
||||
instance['last_update_time'] = time.time()
|
||||
|
||||
if report.instanceStatus == COMPLETED:
|
||||
self.check_task_completed(task)
|
||||
|
@ -174,7 +172,6 @@ class TaskMgr(threading.Thread):
|
|||
|
||||
instance = task.instance_list[instance_id]
|
||||
instance['status'] = RUNNING
|
||||
instance['last_update_time'] = time.time()
|
||||
instance['try_count'] += 1
|
||||
instance['token'] = task.info.token
|
||||
instance['worker'] = worker_ip
|
||||
|
@ -209,12 +206,12 @@ class TaskMgr(threading.Thread):
|
|||
return task, index, worker
|
||||
# find timeout instance
|
||||
elif instance['status'] == RUNNING:
|
||||
if time.time() - instance['last_update_time'] > self.heart_beat_timeout:
|
||||
if not is_alive(instance['worker']):
|
||||
instance['status'] = FAILED
|
||||
instance['token'] = ''
|
||||
self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu
|
||||
|
||||
self.logger.warning('[task_scheduler] worker timeout task [%s] instance [%d]' % (task.info.id, index))
|
||||
self.logger.warning('[task_scheduler] worker dead, retry task [%s] instance [%d]' % (task.info.id, index))
|
||||
if worker is not None:
|
||||
return task, index, worker
|
||||
|
||||
|
@ -241,8 +238,8 @@ class TaskMgr(threading.Thread):
|
|||
continue
|
||||
if task.info.cluster.instance.memory > worker_info['memory']:
|
||||
continue
|
||||
if task.info.cluster.instance.disk > worker_info['disk']:
|
||||
continue
|
||||
# if task.info.cluster.instance.disk > worker_info['disk']:
|
||||
# continue
|
||||
if task.info.cluster.instance.gpu > worker_info['gpu']:
|
||||
continue
|
||||
return worker_ip
|
||||
|
@ -258,6 +255,11 @@ class TaskMgr(threading.Thread):
|
|||
all_nodes = [(node_ip, self.get_worker_resource_info(node_ip)) for node_ip in node_ips]
|
||||
return all_nodes
|
||||
|
||||
|
||||
def is_alive(self, worker):
|
||||
nodes = self.nodemgr.get_nodeips()
|
||||
return worker in nodes
|
||||
|
||||
|
||||
def get_worker_resource_info(self, worker_ip):
|
||||
fetcher = self.monitor_fetcher(worker_ip)
|
||||
|
@ -324,3 +326,4 @@ class TaskMgr(threading.Thread):
|
|||
if task.info.id == taskid:
|
||||
return task
|
||||
return None
|
||||
|
|
@ -135,7 +135,7 @@ def test():
|
|||
jobmgr = SimulatedJobMgr()
|
||||
jobmgr.start()
|
||||
|
||||
taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, SimulatedLogger(), worker_timeout=10, scheduler_interval=2)
|
||||
taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, SimulatedLogger(), scheduler_interval=2)
|
||||
taskmgr.set_jobmgr(jobmgr)
|
||||
taskmgr.start()
|
||||
|
||||
|
|
Loading…
Reference in New Issue