update
This commit is contained in:
parent
ca8467e6fb
commit
c531be3b64
|
@ -344,7 +344,7 @@ class TaskMgr(threading.Thread):
|
||||||
vnode_info.vnode.network.CopyFrom(networkinfo)
|
vnode_info.vnode.network.CopyFrom(networkinfo)
|
||||||
|
|
||||||
placed_workers.append(sub_task.worker)
|
placed_workers.append(sub_task.worker)
|
||||||
[success,msg] = self.start_vnode(sub_task)
|
[success, msg] = self.start_vnode(sub_task)
|
||||||
if not success:
|
if not success:
|
||||||
sub_task.waiting_for_retry()
|
sub_task.waiting_for_retry()
|
||||||
sub_task.worker = None
|
sub_task.worker = None
|
||||||
|
@ -361,7 +361,8 @@ class TaskMgr(threading.Thread):
|
||||||
continue
|
continue
|
||||||
task_info.token = ''.join(random.sample(string.ascii_letters + string.digits, 8))
|
task_info.token = ''.join(random.sample(string.ascii_letters + string.digits, 8))
|
||||||
|
|
||||||
if self.start_task(sub_task):
|
[success, msg] = self.start_task(sub_task)
|
||||||
|
if success:
|
||||||
sub_task.status = RUNNING
|
sub_task.status = RUNNING
|
||||||
else:
|
else:
|
||||||
sub_task.waiting_for_retry()
|
sub_task.waiting_for_retry()
|
||||||
|
@ -373,6 +374,7 @@ class TaskMgr(threading.Thread):
|
||||||
def clear_sub_task(self, sub_task):
|
def clear_sub_task(self, sub_task):
|
||||||
if sub_task.task_started:
|
if sub_task.task_started:
|
||||||
self.stop_task(sub_task)
|
self.stop_task(sub_task)
|
||||||
|
#pass
|
||||||
if sub_task.vnode_started:
|
if sub_task.vnode_started:
|
||||||
self.stop_vnode(sub_task)
|
self.stop_vnode(sub_task)
|
||||||
#pass
|
#pass
|
||||||
|
@ -458,7 +460,7 @@ class TaskMgr(threading.Thread):
|
||||||
|
|
||||||
return None, None
|
return None, None
|
||||||
|
|
||||||
def find_proper_workers(self, sub_task_list):
|
def find_proper_workers(self, sub_task_list, all_res=False):
|
||||||
nodes = self.get_all_nodes()
|
nodes = self.get_all_nodes()
|
||||||
if nodes is None or len(nodes) == 0:
|
if nodes is None or len(nodes) == 0:
|
||||||
self.logger.warning('[task_scheduler] running nodes not found')
|
self.logger.warning('[task_scheduler] running nodes not found')
|
||||||
|
@ -478,7 +480,7 @@ class TaskMgr(threading.Thread):
|
||||||
for worker_ip, worker_info in nodes:
|
for worker_ip, worker_info in nodes:
|
||||||
#logger.info(worker_info)
|
#logger.info(worker_info)
|
||||||
#logger.info(self.get_cpu_usage(worker_ip))
|
#logger.info(self.get_cpu_usage(worker_ip))
|
||||||
if needs.cpu + self.get_cpu_usage(worker_ip) > worker_info['cpu']:
|
if needs.cpu + (not all_res) * self.get_cpu_usage(worker_ip) > worker_info['cpu']:
|
||||||
continue
|
continue
|
||||||
elif needs.memory > worker_info['memory']:
|
elif needs.memory > worker_info['memory']:
|
||||||
continue
|
continue
|
||||||
|
@ -487,7 +489,7 @@ class TaskMgr(threading.Thread):
|
||||||
# try not to assign non-gpu task to a worker with gpu
|
# try not to assign non-gpu task to a worker with gpu
|
||||||
#if needs['gpu'] == 0 and worker_info['gpu'] > 0:
|
#if needs['gpu'] == 0 and worker_info['gpu'] > 0:
|
||||||
#continue
|
#continue
|
||||||
elif needs.gpu + self.get_gpu_usage(worker_ip) > worker_info['gpu']:
|
elif needs.gpu + (not all_res) * self.get_gpu_usage(worker_ip) > worker_info['gpu']:
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
worker_info['cpu'] -= needs.cpu
|
worker_info['cpu'] -= needs.cpu
|
||||||
|
@ -548,6 +550,7 @@ class TaskMgr(threading.Thread):
|
||||||
def add_task(self, username, taskid, json_task, task_priority=1):
|
def add_task(self, username, taskid, json_task, task_priority=1):
|
||||||
# decode json string to object defined in grpc
|
# decode json string to object defined in grpc
|
||||||
self.logger.info('[taskmgr add_task] receive task %s' % taskid)
|
self.logger.info('[taskmgr add_task] receive task %s' % taskid)
|
||||||
|
|
||||||
image_dict = {
|
image_dict = {
|
||||||
"private": Image.PRIVATE,
|
"private": Image.PRIVATE,
|
||||||
"base": Image.BASE,
|
"base": Image.BASE,
|
||||||
|
@ -596,7 +599,24 @@ class TaskMgr(threading.Thread):
|
||||||
# if in traditional mode, commands will be executed in all vnodes
|
# if in traditional mode, commands will be executed in all vnodes
|
||||||
) if (not 'atSameTime' in json_task.keys() or json_task['runon'] == 'all' or vnode_index == 0) else None
|
) if (not 'atSameTime' in json_task.keys() or json_task['runon'] == 'all' or vnode_index == 0) else None
|
||||||
} for vnode_index in range(int(json_task['vnodeCount']))])
|
} for vnode_index in range(int(json_task['vnodeCount']))])
|
||||||
|
|
||||||
|
if task.at_same_time:
|
||||||
|
workers = self.find_proper_workers(task.subtask_list, all_res=True)
|
||||||
|
if len(workers) == 0:
|
||||||
|
task.status = FAILED
|
||||||
|
# tell jobmgr
|
||||||
|
self.jobmgr.report(username,taskid,"failed","Resources needs exceed limits")
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
for sub_task in task.subtask_list:
|
||||||
|
workers = self.find_proper_workers([sub_task], all_res=True)
|
||||||
|
if len(workers) == 0:
|
||||||
|
task.status = FAILED
|
||||||
|
# tell jobmgr
|
||||||
|
self.jobmgr.report(username,taskid,"failed","Resources needs exceed limits")
|
||||||
|
return False
|
||||||
self.lazy_append_list.append(task)
|
self.lazy_append_list.append(task)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
# user: username
|
# user: username
|
||||||
|
|
|
@ -41,7 +41,7 @@
|
||||||
|
|
||||||
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
|
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
|
||||||
<div class="form-group"><label class="col-sm-2 control-label">Job Name</label>
|
<div class="form-group"><label class="col-sm-2 control-label">Job Name</label>
|
||||||
<div class="col-sm-10"><input type="text" class="form-control" name="jobName" id="job_name"></div>
|
<div class="col-sm-10"><input type="text" class="form-control" name="jobName" id="job_name" required></div>
|
||||||
</div>
|
</div>
|
||||||
<div class="hr-line-dashed"></div>
|
<div class="hr-line-dashed"></div>
|
||||||
<br/>
|
<br/>
|
||||||
|
|
Loading…
Reference in New Issue