diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 9c3587b..aea04da 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -59,6 +59,7 @@ class TaskMgr(threading.Thread): self.nodemgr = nodemgr self.monitor_fetcher = monitor_fetcher self.cpu_usage = {} + self.gpu_usage = {} # self.all_nodes = None # self.last_nodes_info_update_time = 0 # self.nodes_info_update_interval = 30 # (s) @@ -119,6 +120,7 @@ class TaskMgr(threading.Thread): if instance['status'] == RUNNING and report.instanceStatus != RUNNING: self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu + self.gpu_usage[instance['worker']] -= task.info.cluster.instance.gpu instance['status'] = report.instanceStatus instance['error_msg'] = report.errmsg @@ -197,6 +199,7 @@ class TaskMgr(threading.Thread): instance['worker'] = worker_ip self.cpu_usage[worker_ip] += task.info.cluster.instance.cpu + self.gpu_usage[worker_ip] += task.info.cluster.instance.gpu username = task.info.username container_name = task.info.username + '-batch-' + task.info.id + '-' + str(instance_id) + '-' + task.info.token if not username in self.user_containers.keys(): @@ -251,6 +254,7 @@ class TaskMgr(threading.Thread): instance['status'] = FAILED instance['token'] = '' self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu + self.gpu_usage[instance['worker']] -= task.info.cluster.instance.gpu self.logger.warning('[task_scheduler] worker dead, retry task [%s] instance [%d]' % (task.info.id, index)) if worker is not None: @@ -282,7 +286,7 @@ class TaskMgr(threading.Thread): # try not to assign non-gpu task to a worker with gpu if task.info.cluster.instance.gpu == 0 and worker_info['gpu'] > 0: continue - if task.info.cluster.instance.gpu > worker_info['gpu']: + if task.info.cluster.instance.gpu + self.get_gpu_usage(worker_ip) > worker_info['gpu']: continue return worker_ip return None @@ -322,6 +326,14 @@ class TaskMgr(threading.Thread): return 0 + def get_gpu_usage(self, worker_ip): + try: + return self.gpu_usage[worker_ip] + except: + self.gpu_usage[worker_ip] = 0 + return 0 + + def set_jobmgr(self, jobmgr): self.jobmgr = jobmgr