diff --git a/src/master/httprest.py b/src/master/httprest.py index f328c31..d71c974 100755 --- a/src/master/httprest.py +++ b/src/master/httprest.py @@ -1103,6 +1103,5 @@ if __name__ == '__main__': G_jobmgr = jobmgr.JobMgr(G_taskmgr) G_taskmgr.set_jobmgr(G_jobmgr) G_taskmgr.start() - G_jobmgr.start() app.run(host = masterip, port = masterport, threaded=True) diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py index 7f80034..484fcce 100644 --- a/src/master/jobmgr.py +++ b/src/master/jobmgr.py @@ -1,6 +1,6 @@ import time, threading, random, string, os, traceback import master.monitor -import subprocess +import subprocess,json from functools import wraps from utils.log import initlogging, logger @@ -35,12 +35,14 @@ class BatchJob(object): continue for d in dependency: if not d in raw_tasks.keys(): - raise ValueError('task %s is not defined in the dependency of task %s' % (t, task_idx)) + raise ValueError('task %s is not defined in the dependency of task %s' % (d, task_idx)) self.tasks[task_idx]['dependency'].append(d) if not d in self.dependency_out.keys(): self.dependency_out[d] = [] self.dependency_out[d].append(task_idx) + self.log_status() + def data_lock(f): @wraps(f) def new_f(self, *args, **kwargs): @@ -60,14 +62,15 @@ class BatchJob(object): logger.debug("Get tasks without dependencies of BatchJob(id:%s)" % self.job_id) ret_tasks = [] for task_idx in self.tasks.keys(): - if (self.tasks[task_idx]['status'] = 'pending' and + if (self.tasks[task_idx]['status'] == 'pending' and len(self.tasks[task_idx]['dependency']) == 0): if update_status: self.tasks_cnt['pending'] -= 1 self.tasks_cnt['scheduling'] += 1 self.tasks[task_idx]['status'] = 'scheduling' task_name = self.user + '_' + self.job_id + '_' + task_idx - ret_tasks.append([task_name, self.tasks[task_idx]['config']]) + ret_tasks.append([task_name, self.tasks[task_idx]['config'], self.job_priority]) + self.log_status() return ret_tasks # update status of this job based @@ -91,6 +94,7 @@ class BatchJob(object): self.tasks[task_idx]['status'] = 'running' self.tasks_cnt['running'] += 1 self._update_job_status() + self.log_status() # a task has finished, update dependency and return tasks without dependencies @data_lock @@ -105,17 +109,23 @@ class BatchJob(object): self.tasks_cnt['finished'] += 1 self._update_job_status() if task_idx not in self.dependency_out.keys(): + self.log_status() return [] ret_tasks = [] for out_idx in self.dependency_out[task_idx]: - self.tasks[out_idx]['dependency'].remove(task_idx) + try: + self.tasks[out_idx]['dependency'].remove(task_idx) + except Exception as err: + logger.warning(traceback.format_exc()) + continue if (self.tasks[out_idx]['status'] == 'pending' and len(self.tasks[out_idx]['dependency']) == 0): self.tasks_cnt['pending'] -= 1 self.tasks_cnt['scheduling'] += 1 self.tasks[out_idx]['status'] = 'scheduling' task_name = self.user + '_' + self.job_id + '_' + out_idx - ret_tasks.append([task_name, self.tasks[out_idx]['config']]) + ret_tasks.append([task_name, self.tasks[out_idx]['config'], self.job_priority]) + self.log_status() return ret_tasks # update retrying status of task @@ -131,6 +141,18 @@ class BatchJob(object): self.tasks_cnt['retrying'] += 1 self.tasks[task_idx]['status'] = 'retrying(%d times)' % int(tried_times) self._update_job_status() + self.log_status() + + # print status for debuging + def log_status(self): + task_copy = {} + for task_idx in self.tasks.keys(): + task_copy[task_idx] = {} + task_copy[task_idx]['status'] = self.tasks[task_idx]['status'] + task_copy[task_idx]['dependency'] = self.tasks[task_idx]['dependency'] + logger.debug("BatchJob(id:%s) tasks status: %s" % (self.job_id, json.dumps(task_copy, indent=3))) + logger.debug("BatchJob(id:%s) dependency_out: %s" % (self.job_id, json.dumps(self.dependency_out, indent=3))) + logger.debug("BatchJob(id:%s) job_status: %s" %(self.job_id, self.status)) class JobMgr(): # load job information from etcd @@ -154,7 +176,8 @@ class JobMgr(): logger.error(err) return [False, err.args[0]] except Exception as err: - logger.error(err) + logger.error(traceback.format_exc()) + #logger.error(err) return [False, err.args[0]] return [True, "add batch job success"] @@ -199,23 +222,24 @@ class JobMgr(): return job_id # add tasks into taskmgr's queue - def add_task_taskmgr(self,tasks): - for task_name, task_info in tasks: + def add_task_taskmgr(self, user, tasks): + for task_name, task_info, task_priority in tasks: if not task_info: logger.error("task_info does not exist! task_name(%s)" % task_name) return False else: - task_priority = job.job_priority logger.debug("Add task(name:%s) with priority(%s) to taskmgr's queue." % (task_name, task_priority) ) - self.taskmgr.add_task(job.user, task_name, task_info, task_priority) - return True + self.taskmgr.add_task(user, task_name, task_info, task_priority) + return True # to process a job, add tasks without dependencies of the job into taskmgr def process_job(self, job): tasks = job.get_tasks_no_dependency(True) - return add_task_taskmgr(tasks) + return self.add_task_taskmgr(job.user, tasks) # report task status from taskmgr when running, failed and finished + # task_name: user + '_' + job_id + '_' + task_idx + # status: 'running', 'retrying', 'failed', 'finished' def report(self, task_name, status): split_task_name = task_name.split('_') if len(split_task_name) != 3: @@ -231,7 +255,7 @@ class JobMgr(): next_tasks = job.finish_task(task_idx) if len(next_tasks) == 0: return - ret = add_task_taskmgr(next_tasks) + ret = self.add_task_taskmgr(user, next_tasks) # Get Batch job stdout or stderr from its file def get_output(self, username, jobid, taskid, instid, issue): diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 07936d8..e08f536 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -171,7 +171,7 @@ class TaskMgr(threading.Thread): if self.jobmgr is None: self.logger.error('[task_completed] jobmgr is None!') else: - self.jobmgr.report(task) + self.jobmgr.report(task.info.id,'finished') self.logger.info('task %s completed' % task.info.id) self.lazy_delete_list.append(task) @@ -182,7 +182,7 @@ class TaskMgr(threading.Thread): if self.jobmgr is None: self.logger.error('[task_failed] jobmgr is None!') else: - self.jobmgr.report(task) + self.jobmgr.report(task.info.id,'failed') self.logger.info('task %s failed' % task.info.id) self.lazy_delete_list.append(task) @@ -201,6 +201,7 @@ class TaskMgr(threading.Thread): def task_processor(self, task, instance_id, worker_ip): task.status = RUNNING + self.jobmgr.report(task.info.id,'running') # properties for transaction task.info.instanceid = instance_id