diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py index 5097c03..1b454a2 100644 --- a/src/master/jobmgr.py +++ b/src/master/jobmgr.py @@ -17,8 +17,19 @@ def db_commit(): raise class BatchJob(object): - def __init__(self, jobid, user, job_info): - self.job_db = Batchjob(jobid,user,job_info['jobName'],int(job_info['jobPriority'])) + def __init__(self, jobid, user, job_info, old_job_db=None): + if old_job_db is None: + self.job_db = Batchjob(jobid,user,job_info['jobName'],int(job_info['jobPriority'])) + else: + self.job_db = old_job_db + self.job_db.clear() + job_info = {} + job_info['jobName'] = self.job_db.name + job_info['jobPriority'] = self.job_db.priority + all_tasks = self.job_db.tasks.all() + job_info['tasks'] = {} + for t in all_tasks: + job_info['tasks'][t.idx] = json.loads(t.config) self.user = user #self.raw_job_info = job_info self.job_id = jobid @@ -35,8 +46,12 @@ class BatchJob(object): self.tasks_cnt['pending'] = len(raw_tasks.keys()) for task_idx in raw_tasks.keys(): task_info = raw_tasks[task_idx] - task_db = Batchtask(jobid+"_"+task_idx, task_idx, task_info) - self.job_db.tasks.append(task_db) + if old_job_db is None: + task_db = Batchtask(jobid+"_"+task_idx, task_idx, task_info) + self.job_db.tasks.append(task_db) + else: + task_db = Batchtask.query.get(jobid+"_"+task_idx) + task_db.clear() self.tasks[task_idx] = {} self.tasks[task_idx]['id'] = jobid+"_"+task_idx self.tasks[task_idx]['config'] = task_info @@ -54,7 +69,8 @@ class BatchJob(object): self.dependency_out[d] = [] self.dependency_out[d].append(task_idx) - db.session.add(self.job_db) + if old_job_db is None: + db.session.add(self.job_db) db_commit() self.log_status() @@ -259,6 +275,7 @@ class JobMgr(): # load job information from etcd # initial a job queue and job schedueler def __init__(self, taskmgr): + logger.info("Init jobmgr...") try: Batchjob.query.all() except: @@ -270,6 +287,22 @@ class JobMgr(): self.userpoint = "http://" + env.getenv('USER_IP') + ":" + str(env.getenv('USER_PORT')) self.auth_key = env.getenv('AUTH_KEY') + self.recover_jobs() + + def recover_jobs(self): + logger.info("Rerun the unfailed and unfinished jobs...") + try: + rejobs = Batchjob.query.filter(~Batchjob.status.in_(['done','failed'])) + rejobs = rejobs.order_by(Batchjob.create_time).all() + for rejob in rejobs: + logger.info("Rerun job: "+rejob.id) + logger.debug(str(rejob)) + job = BatchJob(rejob.id, rejob.username, None, rejob) + self.job_map[job.job_id] = job + self.process_job(job) + except Exception as err: + logger.error(traceback.format_exc()) + def charge_beans(self,username,billing): logger.debug("Charge user(%s) for %d beans"%(username, billing)) data = {"owner_name":username,"billing":billing, "auth_key":self.auth_key} diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 6428355..1e501a0 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -113,6 +113,7 @@ class SubTask(): self.status_reason = '' self.try_count = 0 self.worker = None + self.lock = threading.Lock() def waiting_for_retry(self,reason=""): self.try_count += 1 @@ -120,7 +121,7 @@ class SubTask(): if self.status == FAILED: self.root_task.status = FAILED self.failed_reason = reason - + self.root_task.failed_reason = reason class TaskReporter(MasterServicer): @@ -197,6 +198,19 @@ class TaskMgr(threading.Thread): return new_f return lock + def subtask_lock(f): + @wraps(f) + def new_f(self, subtask, *args, **kwargs): + subtask.lock.acquire() + try: + result = f(self, subtask, *args, **kwargs) + except Exception as err: + subtask.lock.release() + raise err + subtask.lock.release() + return result + return new_f + def run(self): self.serve() while not self.thread_stop: @@ -272,7 +286,10 @@ class TaskMgr(threading.Thread): self.gpu_usage[subtask.worker] += subtask.vnode_info.vnode.instance.gpu return [True, ''] + @subtask_lock def stop_vnode(self, subtask): + if not subtask.vnode_started: + return [True, ""] try: self.logger.info('[task_processor] Stopping vnode for task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid)) channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port)) @@ -482,13 +499,18 @@ class TaskMgr(threading.Thread): if sub_task.status != RUNNING: self.logger.error('[on_task_report] receive task report when vnode is not running') - sub_task.status = report.subTaskStatus + #sub_task.status = report.subTaskStatus sub_task.status_reason = report.errmsg + sub_task.task_started = False if report.subTaskStatus == FAILED or report.subTaskStatus == TIMEOUT: + self.clear_sub_task(sub_task) sub_task.waiting_for_retry(report.errmsg) - self.jobmgr.report(task.username, task.id, 'retrying', report.errmsg) + self.logger.info('task %s report failed, status %d, subtasks: %s' % (task.id, task.status, str([sub_task.status for sub_task in task.subtask_list]))) + if sub_task.status == WAITING: + self.jobmgr.report(task.username, task.id, 'retrying', report.errmsg) elif report.subTaskStatus == OUTPUTERROR: + self.clear_sub_task(sub_task) sub_task.status = FAILED task.status = FAILED task.failed_reason = report.errmsg @@ -506,7 +528,7 @@ class TaskMgr(threading.Thread): self.logger.info('task %s sub_tasks %s' % (task.id, str([sub_task.status for sub_task in task.subtask_list]))) if self.check_task_completed(task): continue - self.logger.info("test") + self.logger.info('schedule task %s sub_tasks %s' % (task.id, str([sub_task.status for sub_task in task.subtask_list]))) if task.at_same_time: # parallel tasks @@ -671,8 +693,13 @@ class TaskMgr(threading.Thread): disk = int(json_task['diskSetting']), gpu = int(json_task['gpuSetting'])), mount = [Mount( - localPath = json_task['mapping'][mapping_key]['mappingLocalDir'], - remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir']) + provider = json_task['mapping'][mapping_key]['mappingProvider'], + localPath = json_task['mapping'][mapping_key]['mappingMountpath'], + remotePath = json_task['mapping'][mapping_key]['mappingBucketName'], + accessKey = json_task['mapping'][mapping_key]['mappingAccessKey'], + secretKey = json_task['mapping'][mapping_key]['mappingSecretKey'], + other = json_task['mapping'][mapping_key]['mappingEndpoint'] + ) for mapping_key in json_task['mapping']] if 'mapping' in json_task else [] ), ), @@ -689,7 +716,7 @@ class TaskMgr(threading.Thread): timeout = int(json_task['expTime']) # commands are executed in all vnodes / only excuted in the first vnode # if in traditional mode, commands will be executed in all vnodes - ) if (not 'atSameTime' in json_task.keys() or json_task['runon'] == 'all' or vnode_index == 0) else None + ) if (json_task['runon'] == 'all' or vnode_index == 0) else None } for vnode_index in range(int(json_task['vnodeCount']))]) if task.at_same_time: diff --git a/src/utils/model.py b/src/utils/model.py index 8eca040..959d7d5 100755 --- a/src/utils/model.py +++ b/src/utils/model.py @@ -461,6 +461,12 @@ class Batchjob(db.Model): self.end_time = None self.billing = 0 + def clear(self): + self.status = "pending" + self.failed_reason = "" + self.end_time = None + self.billing = 0 + def __repr__(self): info = {} info['job_id'] = self.id @@ -503,6 +509,15 @@ class Batchtask(db.Model): self.config = json.dumps(config) self.tried_times = 0 + def clear(self): + self.status = "pending" + self.failed_reason = "" + self.start_time = None + self.end_time = None + self.running_time = 0 + self.billing = 0 + self.tried_times = 0 + def __repr__(self): info = {} info['id'] = self.id diff --git a/src/worker/ossmounter.py b/src/worker/ossmounter.py index 607ca45..acd2812 100644 --- a/src/worker/ossmounter.py +++ b/src/worker/ossmounter.py @@ -27,7 +27,7 @@ class OssMounter(object): # umount oss pass -class aliyunOssMounter(OssMounter): +class AliyunOssMounter(OssMounter): @staticmethod def mount_oss(datapath, mount_info): @@ -42,7 +42,7 @@ class aliyunOssMounter(OssMounter): cmd = "chmod 640 /etc/passwd-ossfs" [success1, msg] = OssMounter.execute_cmd(cmd) - mountpath = datapath+"/"+mount_info.remotePath + mountpath = datapath+"/Aliyun/"+mount_info.remotePath logger.info("Mount oss %s %s" % (mount_info.remotePath, mountpath)) if not os.path.isdir(mountpath): os.makedirs(mountpath) diff --git a/src/worker/taskworker.py b/src/worker/taskworker.py index 83304ed..bff7a59 100755 --- a/src/worker/taskworker.py +++ b/src/worker/taskworker.py @@ -202,11 +202,12 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED, message=msg) #mount oss + rootfs = "/var/lib/lxc/%s/rootfs" % lxcname self.mount_oss("%s/global/users/%s/oss" % (self.fspath,username), mount_list) conffile = open("/var/lib/lxc/%s/config" % lxcname, 'a+') - mount_str = "lxc.mount.entry = %s/global/users/%s/oss/%s %s/root/oss/%s none bind,rw,create=dir 0 0" + mount_str = "lxc.mount.entry = %s/global/users/%s/oss/%s/%s %s/root/oss/%s none bind,rw,create=dir 0 0" for mount in mount_list: - conffile.write("\n"+ mount_str % (self.fspath, username, mount.remotePath, rootfs, mount.remotePath)) + conffile.write("\n"+ mount_str % (self.fspath, username, mount.provider, mount.remotePath, rootfs, mount.remotePath)) conffile.close() logger.info("Start container %s..." % lxcname) diff --git a/web/templates/base_AdminLTE.html b/web/templates/base_AdminLTE.html index d015311..1395c3c 100644 --- a/web/templates/base_AdminLTE.html +++ b/web/templates/base_AdminLTE.html @@ -244,7 +244,7 @@ Docklet {{ version }} - Copyright© 2017 UniAS@ SEI, PKU + Copyright© 2019 UniAS@ SEI, PKU diff --git a/web/templates/batch/batch_create.html b/web/templates/batch/batch_create.html index 469f560..9a873ad 100644 --- a/web/templates/batch/batch_create.html +++ b/web/templates/batch/batch_create.html @@ -121,27 +121,41 @@ $("#collapse" + obj.id).collapse('hide'); } + function chmountPath(obj,task_num,mapping_num) { + cellid = 'mapping_mountpath_' + task_num + '_' + mapping_num; + $('#'+cellid).val("/root/oss/"+obj.value); + } + function removeMapping(obj) { $("#mapping_" + obj.id).remove(); } - function addMapping(obj) { + function addMapping(obj,task_num) { mapping_number += 1; var table = $("#storage_mapping_" + obj.id)[0]; var new_mapping = table.insertRow(); - new_mapping.id = "mapping_" + task_number + "_" + mapping_number; - var local_dir = new_mapping.insertCell(); - var remote_dir = new_mapping.insertCell(); - var source = new_mapping.insertCell(); + new_mapping.id = "mapping_" + task_num + "_" + mapping_number; + var provider = new_mapping.insertCell(); + var bucket_name = new_mapping.insertCell(); + var accessKey = new_mapping.insertCell(); + var secretKey = new_mapping.insertCell(); + var endpoint = new_mapping.insertCell(); + var mountpath = new_mapping.insertCell(); var remove = new_mapping.insertCell(); - local_dir.innerHTML = ''; - remote_dir.innerHTML = ''; - source.innerHTML = ''; - remove.innerHTML = '
'; } @@ -208,7 +222,7 @@ function addTask() { task_number += 1; var masterip=$("select#masterselector").children('option:selected').val(); - mapping_number = 0; + //mapping_number = 0; var task_html = ''; task_html += '
' @@ -324,18 +338,18 @@ +'
' +'' +'
' - +'' - +'' - +'' + +'' + +'
' +'' - +'' - +'' + +'' +'' +'' +'' - +'
Local DirRemote DirsourceOperation
ProviderBucket NameAccessKey IDAccessKey SecretEndpointMount PathRemove
' - +'
' + +'' + +'' + +'
' +'' $(task_html).appendTo("#accordion"); } diff --git a/web/templates/batch/batch_info.html b/web/templates/batch/batch_info.html index c0e8d57..05fdd87 100644 --- a/web/templates/batch/batch_info.html +++ b/web/templates/batch/batch_info.html @@ -216,6 +216,32 @@ + {% if 'mapping' in task['config'].keys() %} +
+ + + + + + + + + + + + {% for key in task['config']['mapping'].keys() %} + + + + + + + + {% endfor %} + +
ProviderBucket NameAccessKey IDEndpointMount Path
{{ task['config']['mapping'][key]['mappingProvider'] }}{{ task['config']['mapping'][key]['mappingBucketName'] }}{{ task['config']['mapping'][key]['mappingAccessKey'] }}{{ task['config']['mapping'][key]['mappingEndpoint'] }}{{ task['config']['mapping'][key]['mappingMountpath'] }}
+
+ {% endif %}