Fix some bugs & make jobmgr run on new scheduler policy

This commit is contained in:
Firmlyzhu 2019-01-17 17:08:12 +08:00
parent 6f72efda63
commit 9792f1c2e2
3 changed files with 41 additions and 17 deletions

View File

@ -1103,6 +1103,5 @@ if __name__ == '__main__':
G_jobmgr = jobmgr.JobMgr(G_taskmgr)
G_taskmgr.set_jobmgr(G_jobmgr)
G_taskmgr.start()
G_jobmgr.start()
app.run(host = masterip, port = masterport, threaded=True)

View File

@ -1,6 +1,6 @@
import time, threading, random, string, os, traceback
import master.monitor
import subprocess
import subprocess,json
from functools import wraps
from utils.log import initlogging, logger
@ -35,12 +35,14 @@ class BatchJob(object):
continue
for d in dependency:
if not d in raw_tasks.keys():
raise ValueError('task %s is not defined in the dependency of task %s' % (t, task_idx))
raise ValueError('task %s is not defined in the dependency of task %s' % (d, task_idx))
self.tasks[task_idx]['dependency'].append(d)
if not d in self.dependency_out.keys():
self.dependency_out[d] = []
self.dependency_out[d].append(task_idx)
self.log_status()
def data_lock(f):
@wraps(f)
def new_f(self, *args, **kwargs):
@ -60,14 +62,15 @@ class BatchJob(object):
logger.debug("Get tasks without dependencies of BatchJob(id:%s)" % self.job_id)
ret_tasks = []
for task_idx in self.tasks.keys():
if (self.tasks[task_idx]['status'] = 'pending' and
if (self.tasks[task_idx]['status'] == 'pending' and
len(self.tasks[task_idx]['dependency']) == 0):
if update_status:
self.tasks_cnt['pending'] -= 1
self.tasks_cnt['scheduling'] += 1
self.tasks[task_idx]['status'] = 'scheduling'
task_name = self.user + '_' + self.job_id + '_' + task_idx
ret_tasks.append([task_name, self.tasks[task_idx]['config']])
ret_tasks.append([task_name, self.tasks[task_idx]['config'], self.job_priority])
self.log_status()
return ret_tasks
# update status of this job based
@ -91,6 +94,7 @@ class BatchJob(object):
self.tasks[task_idx]['status'] = 'running'
self.tasks_cnt['running'] += 1
self._update_job_status()
self.log_status()
# a task has finished, update dependency and return tasks without dependencies
@data_lock
@ -105,17 +109,23 @@ class BatchJob(object):
self.tasks_cnt['finished'] += 1
self._update_job_status()
if task_idx not in self.dependency_out.keys():
self.log_status()
return []
ret_tasks = []
for out_idx in self.dependency_out[task_idx]:
try:
self.tasks[out_idx]['dependency'].remove(task_idx)
except Exception as err:
logger.warning(traceback.format_exc())
continue
if (self.tasks[out_idx]['status'] == 'pending' and
len(self.tasks[out_idx]['dependency']) == 0):
self.tasks_cnt['pending'] -= 1
self.tasks_cnt['scheduling'] += 1
self.tasks[out_idx]['status'] = 'scheduling'
task_name = self.user + '_' + self.job_id + '_' + out_idx
ret_tasks.append([task_name, self.tasks[out_idx]['config']])
ret_tasks.append([task_name, self.tasks[out_idx]['config'], self.job_priority])
self.log_status()
return ret_tasks
# update retrying status of task
@ -131,6 +141,18 @@ class BatchJob(object):
self.tasks_cnt['retrying'] += 1
self.tasks[task_idx]['status'] = 'retrying(%d times)' % int(tried_times)
self._update_job_status()
self.log_status()
# print status for debuging
def log_status(self):
task_copy = {}
for task_idx in self.tasks.keys():
task_copy[task_idx] = {}
task_copy[task_idx]['status'] = self.tasks[task_idx]['status']
task_copy[task_idx]['dependency'] = self.tasks[task_idx]['dependency']
logger.debug("BatchJob(id:%s) tasks status: %s" % (self.job_id, json.dumps(task_copy, indent=3)))
logger.debug("BatchJob(id:%s) dependency_out: %s" % (self.job_id, json.dumps(self.dependency_out, indent=3)))
logger.debug("BatchJob(id:%s) job_status: %s" %(self.job_id, self.status))
class JobMgr():
# load job information from etcd
@ -154,7 +176,8 @@ class JobMgr():
logger.error(err)
return [False, err.args[0]]
except Exception as err:
logger.error(err)
logger.error(traceback.format_exc())
#logger.error(err)
return [False, err.args[0]]
return [True, "add batch job success"]
@ -199,23 +222,24 @@ class JobMgr():
return job_id
# add tasks into taskmgr's queue
def add_task_taskmgr(self,tasks):
for task_name, task_info in tasks:
def add_task_taskmgr(self, user, tasks):
for task_name, task_info, task_priority in tasks:
if not task_info:
logger.error("task_info does not exist! task_name(%s)" % task_name)
return False
else:
task_priority = job.job_priority
logger.debug("Add task(name:%s) with priority(%s) to taskmgr's queue." % (task_name, task_priority) )
self.taskmgr.add_task(job.user, task_name, task_info, task_priority)
self.taskmgr.add_task(user, task_name, task_info, task_priority)
return True
# to process a job, add tasks without dependencies of the job into taskmgr
def process_job(self, job):
tasks = job.get_tasks_no_dependency(True)
return add_task_taskmgr(tasks)
return self.add_task_taskmgr(job.user, tasks)
# report task status from taskmgr when running, failed and finished
# task_name: user + '_' + job_id + '_' + task_idx
# status: 'running', 'retrying', 'failed', 'finished'
def report(self, task_name, status):
split_task_name = task_name.split('_')
if len(split_task_name) != 3:
@ -231,7 +255,7 @@ class JobMgr():
next_tasks = job.finish_task(task_idx)
if len(next_tasks) == 0:
return
ret = add_task_taskmgr(next_tasks)
ret = self.add_task_taskmgr(user, next_tasks)
# Get Batch job stdout or stderr from its file
def get_output(self, username, jobid, taskid, instid, issue):

View File

@ -171,7 +171,7 @@ class TaskMgr(threading.Thread):
if self.jobmgr is None:
self.logger.error('[task_completed] jobmgr is None!')
else:
self.jobmgr.report(task)
self.jobmgr.report(task.info.id,'finished')
self.logger.info('task %s completed' % task.info.id)
self.lazy_delete_list.append(task)
@ -182,7 +182,7 @@ class TaskMgr(threading.Thread):
if self.jobmgr is None:
self.logger.error('[task_failed] jobmgr is None!')
else:
self.jobmgr.report(task)
self.jobmgr.report(task.info.id,'failed')
self.logger.info('task %s failed' % task.info.id)
self.lazy_delete_list.append(task)
@ -201,6 +201,7 @@ class TaskMgr(threading.Thread):
def task_processor(self, task, instance_id, worker_ip):
task.status = RUNNING
self.jobmgr.report(task.info.id,'running')
# properties for transaction
task.info.instanceid = instance_id