support gpu

This commit is contained in:
Gallen 2018-11-01 15:43:35 +08:00
parent 39831e9fb9
commit cb7f476ec6
1 changed files with 13 additions and 1 deletions

View File

@ -59,6 +59,7 @@ class TaskMgr(threading.Thread):
self.nodemgr = nodemgr
self.monitor_fetcher = monitor_fetcher
self.cpu_usage = {}
self.gpu_usage = {}
# self.all_nodes = None
# self.last_nodes_info_update_time = 0
# self.nodes_info_update_interval = 30 # (s)
@ -119,6 +120,7 @@ class TaskMgr(threading.Thread):
if instance['status'] == RUNNING and report.instanceStatus != RUNNING:
self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu
self.gpu_usage[instance['worker']] -= task.info.cluster.instance.gpu
instance['status'] = report.instanceStatus
instance['error_msg'] = report.errmsg
@ -197,6 +199,7 @@ class TaskMgr(threading.Thread):
instance['worker'] = worker_ip
self.cpu_usage[worker_ip] += task.info.cluster.instance.cpu
self.gpu_usage[worker_ip] += task.info.cluster.instance.gpu
username = task.info.username
container_name = task.info.username + '-batch-' + task.info.id + '-' + str(instance_id) + '-' + task.info.token
if not username in self.user_containers.keys():
@ -251,6 +254,7 @@ class TaskMgr(threading.Thread):
instance['status'] = FAILED
instance['token'] = ''
self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu
self.gpu_usage[instance['worker']] -= task.info.cluster.instance.gpu
self.logger.warning('[task_scheduler] worker dead, retry task [%s] instance [%d]' % (task.info.id, index))
if worker is not None:
@ -282,7 +286,7 @@ class TaskMgr(threading.Thread):
# try not to assign non-gpu task to a worker with gpu
if task.info.cluster.instance.gpu == 0 and worker_info['gpu'] > 0:
continue
if task.info.cluster.instance.gpu > worker_info['gpu']:
if task.info.cluster.instance.gpu + self.get_gpu_usage(worker_ip) > worker_info['gpu']:
continue
return worker_ip
return None
@ -322,6 +326,14 @@ class TaskMgr(threading.Thread):
return 0
def get_gpu_usage(self, worker_ip):
try:
return self.gpu_usage[worker_ip]
except:
self.gpu_usage[worker_ip] = 0
return 0
def set_jobmgr(self, jobmgr):
self.jobmgr = jobmgr