update add_job & report of jobmgr

This commit is contained in:
Firmlyzhu 2019-01-16 17:30:06 +08:00
parent f39bb42dc7
commit 6f72efda63
1 changed files with 51 additions and 41 deletions

View File

@ -18,10 +18,10 @@ class BatchJob(object):
self.lock = threading.Lock()
self.tasks = {}
self.dependency_out = {}
self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'error':0, 'failed':0, 'finished':0}
#self.top_sort()
self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0}
#init self.tasks & self.dependency_out & self.tasks_cnt
logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.user, self.job_name, self.create_time))
raw_tasks = self.raw_job_info["tasks"]
self.tasks_cnt['pending'] = len(raw_tasks.keys())
for task_idx in raw_tasks.keys():
@ -57,6 +57,7 @@ class BatchJob(object):
# return the tasks without dependencies
@data_lock
def get_tasks_no_dependency(self,update_status=False):
logger.debug("Get tasks without dependencies of BatchJob(id:%s)" % self.job_id)
ret_tasks = []
for task_idx in self.tasks.keys():
if (self.tasks[task_idx]['status'] = 'pending' and
@ -84,6 +85,7 @@ class BatchJob(object):
# start run a task, update status
@data_lock
def update_task_running(self, task_idx):
logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) running." % (task_idx, self.job_id))
old_status = self.tasks[task_idx]['status'].split('(')[0]
self.tasks_cnt[old_status] -= 1
self.tasks[task_idx]['status'] = 'running'
@ -96,6 +98,7 @@ class BatchJob(object):
if task_idx not in self.tasks.keys():
logger.error('Task_idx %s not in job. user:%s job_name:%s job_id:%s'%(task_idx, self.user, self.job_name, self.job_id))
return []
logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished. Update dependency..." % (task_idx, self.job_id))
old_status = self.tasks[task_idx]['status'].split('(')[0]
self.tasks_cnt[old_status] -= 1
self.tasks[task_idx]['status'] = 'finished'
@ -115,55 +118,51 @@ class BatchJob(object):
ret_tasks.append([task_name, self.tasks[out_idx]['config']])
return ret_tasks
# update error status of task
# update retrying status of task
@data_lock
def update_task_error(self, task_idx, tried_times, try_out=False):
def update_task_retrying(self, task_idx, tried_times, try_out=False):
logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) retrying." % (task_idx, self.job_id))
old_status = self.tasks[task_idx]['status'].split('(')[0]
self.tasks_cnt[old_status] -= 1
self.tasks[task_idx]['status'] = 'error(tried %d times)' % int(tried_times)
if try_out:
self.tasks_cnt['failed'] += 1
self.tasks[task_idx]['status'] = 'failed(tried %d times)' % int(tried_times)
else:
self.tasks_cnt['error'] += 1
self.tasks_cnt['retrying'] += 1
self.tasks[task_idx]['status'] = 'retrying(%d times)' % int(tried_times)
self._update_job_status()
class JobMgr(threading.Thread):
class JobMgr():
# load job information from etcd
# initial a job queue and job schedueler
def __init__(self, taskmgr):
threading.Thread.__init__(self)
self.job_queue = []
self.job_map = {}
self.taskmgr = taskmgr
self.fspath = env.getenv('FS_PREFIX')
def run(self):
while True:
self.job_scheduler()
time.sleep(2)
# user: username
# job_data: a json string
# job_info: a json string
# user submit a new job, add this job to queue and database
def add_job(self, user, job_info):
try:
job = BatchJob(user, job_info)
job.job_id = self.gen_jobid()
self.job_queue.append(job.job_id)
self.job_map[job.job_id] = job
self.process_job(job)
except ValueError as err:
logger.error(err)
return [False, err.args[0]]
except Exception as err:
logger.error(err)
return [False, err.args[0]]
finally:
return [True, "add batch job success"]
return [True, "add batch job success"]
# user: username
# list a user's all job
def list_jobs(self,user):
res = []
for job_id in self.job_queue:
for job_id in self.job_map.keys():
job = self.job_map[job_id]
logger.debug('job_id: %s, user: %s' % (job_id, job.user))
if job.user == user:
@ -190,7 +189,7 @@ class JobMgr(threading.Thread):
# check if a job exists
def is_job_exist(self, job_id):
return job_id in self.job_queue
return job_id in self.job_map.keys()
# generate a random job id
def gen_jobid(self):
@ -199,31 +198,42 @@ class JobMgr(threading.Thread):
job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8))
return job_id
# this is a thread to process a job
def job_processor(self, job):
task_name, task_info = job.get_task()
if not task_info:
return False
else:
task_priority = job.job_priority
self.taskmgr.add_task(job.user, task_name, task_info, task_priority)
return True
# add tasks into taskmgr's queue
def add_task_taskmgr(self,tasks):
for task_name, task_info in tasks:
if not task_info:
logger.error("task_info does not exist! task_name(%s)" % task_name)
return False
else:
task_priority = job.job_priority
logger.debug("Add task(name:%s) with priority(%s) to taskmgr's queue." % (task_name, task_priority) )
self.taskmgr.add_task(job.user, task_name, task_info, task_priority)
return True
# this is a thread to schedule the jobs
def job_scheduler(self):
# choose a job from queue, create a job processor for it
for job_id in self.job_queue:
job = self.job_map[job_id]
if self.job_processor(job):
job.status = 'running'
break
#else:
#job.status = 'done'
# to process a job, add tasks without dependencies of the job into taskmgr
def process_job(self, job):
tasks = job.get_tasks_no_dependency(True)
return add_task_taskmgr(tasks)
# a task has finished
def report(self, task):
pass
# report task status from taskmgr when running, failed and finished
def report(self, task_name, status):
split_task_name = task_name.split('_')
if len(split_task_name) != 3:
logger.error("Illegal task_name(%s) report from taskmgr" % task_name)
return
user, job_id, task_idx = split_task_name
job = self.job_map[job_id]
if status == "running":
job.update_task_running(task_idx)
elif status == "failed":
pass # TODO
elif status == "finished":
next_tasks = job.finish_task(task_idx)
if len(next_tasks) == 0:
return
ret = add_task_taskmgr(next_tasks)
# Get Batch job stdout or stderr from its file
def get_output(self, username, jobid, taskid, instid, issue):
filename = username + "_" + jobid + "_" + taskid + "_" + instid + "_" + issue + ".txt"
fpath = "%s/global/users/%s/data/batch_%s/%s" % (self.fspath,username,jobid,filename)