Merge pull request #324 from GallenShao/batch

remove worker timeout & remove disk request
This commit is contained in:
GallenShao 2018-08-08 13:25:19 +08:00 committed by GitHub
commit 187ec5e956
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 9 deletions

View File

@ -42,13 +42,12 @@ class TaskMgr(threading.Thread):
# load task information from etcd
# initial a task queue and task schedueler
# taskmgr: a taskmgr instance
def __init__(self, nodemgr, monitor_fetcher, logger, worker_timeout=60, scheduler_interval=2):
def __init__(self, nodemgr, monitor_fetcher, logger, scheduler_interval=2):
threading.Thread.__init__(self)
self.thread_stop = False
self.jobmgr = None
self.task_queue = []
self.heart_beat_timeout = worker_timeout # (s)
self.scheduler_interval = scheduler_interval
self.logger = logger
@ -109,7 +108,6 @@ class TaskMgr(threading.Thread):
instance['status'] = report.instanceStatus
instance['error_msg'] = report.errmsg
instance['last_update_time'] = time.time()
if report.instanceStatus == COMPLETED:
self.check_task_completed(task)
@ -174,7 +172,6 @@ class TaskMgr(threading.Thread):
instance = task.instance_list[instance_id]
instance['status'] = RUNNING
instance['last_update_time'] = time.time()
instance['try_count'] += 1
instance['token'] = task.info.token
instance['worker'] = worker_ip
@ -209,12 +206,12 @@ class TaskMgr(threading.Thread):
return task, index, worker
# find timeout instance
elif instance['status'] == RUNNING:
if time.time() - instance['last_update_time'] > self.heart_beat_timeout:
if not is_alive(instance['worker']):
instance['status'] = FAILED
instance['token'] = ''
self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu
self.logger.warning('[task_scheduler] worker timeout task [%s] instance [%d]' % (task.info.id, index))
self.logger.warning('[task_scheduler] worker dead, retry task [%s] instance [%d]' % (task.info.id, index))
if worker is not None:
return task, index, worker
@ -241,8 +238,8 @@ class TaskMgr(threading.Thread):
continue
if task.info.cluster.instance.memory > worker_info['memory']:
continue
if task.info.cluster.instance.disk > worker_info['disk']:
continue
# if task.info.cluster.instance.disk > worker_info['disk']:
# continue
if task.info.cluster.instance.gpu > worker_info['gpu']:
continue
return worker_ip
@ -258,6 +255,11 @@ class TaskMgr(threading.Thread):
all_nodes = [(node_ip, self.get_worker_resource_info(node_ip)) for node_ip in node_ips]
return all_nodes
def is_alive(self, worker):
nodes = self.nodemgr.get_nodeips()
return worker in nodes
def get_worker_resource_info(self, worker_ip):
fetcher = self.monitor_fetcher(worker_ip)
@ -324,3 +326,4 @@ class TaskMgr(threading.Thread):
if task.info.id == taskid:
return task
return None

View File

@ -135,7 +135,7 @@ def test():
jobmgr = SimulatedJobMgr()
jobmgr.start()
taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, SimulatedLogger(), worker_timeout=10, scheduler_interval=2)
taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, SimulatedLogger(), scheduler_interval=2)
taskmgr.set_jobmgr(jobmgr)
taskmgr.start()