From b71138b3a7c00e923ca3e255178d4d8d50564df3 Mon Sep 17 00:00:00 2001 From: Firmlyzhu Date: Mon, 1 Apr 2019 18:22:40 +0800 Subject: [PATCH 1/6] Fix some bugs --- src/master/taskmgr.py | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 6428355..3e7d06d 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -113,6 +113,7 @@ class SubTask(): self.status_reason = '' self.try_count = 0 self.worker = None + self.lock = threading.Lock() def waiting_for_retry(self,reason=""): self.try_count += 1 @@ -120,7 +121,7 @@ class SubTask(): if self.status == FAILED: self.root_task.status = FAILED self.failed_reason = reason - + self.root_task.failed_reason = reason class TaskReporter(MasterServicer): @@ -197,6 +198,19 @@ class TaskMgr(threading.Thread): return new_f return lock + def subtask_lock(f): + @wraps(f) + def new_f(self, subtask, *args, **kwargs): + subtask.lock.acquire() + try: + result = f(self, subtask, *args, **kwargs) + except Exception as err: + subtask.lock.release() + raise err + subtask.lock.release() + return result + return new_f + def run(self): self.serve() while not self.thread_stop: @@ -272,7 +286,10 @@ class TaskMgr(threading.Thread): self.gpu_usage[subtask.worker] += subtask.vnode_info.vnode.instance.gpu return [True, ''] + @subtask_lock def stop_vnode(self, subtask): + if not subtask.vnode_started: + return [True, ""] try: self.logger.info('[task_processor] Stopping vnode for task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid)) channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port)) @@ -484,11 +501,15 @@ class TaskMgr(threading.Thread): sub_task.status = report.subTaskStatus sub_task.status_reason = report.errmsg + sub_task.task_started = False if report.subTaskStatus == FAILED or report.subTaskStatus == TIMEOUT: + self.clear_sub_task(sub_task) sub_task.waiting_for_retry(report.errmsg) - self.jobmgr.report(task.username, task.id, 'retrying', report.errmsg) + if sub_task.status == WAITING: + self.jobmgr.report(task.username, task.id, 'retrying', report.errmsg) elif report.subTaskStatus == OUTPUTERROR: + self.clear_sub_task(sub_task) sub_task.status = FAILED task.status = FAILED task.failed_reason = report.errmsg @@ -506,7 +527,7 @@ class TaskMgr(threading.Thread): self.logger.info('task %s sub_tasks %s' % (task.id, str([sub_task.status for sub_task in task.subtask_list]))) if self.check_task_completed(task): continue - self.logger.info("test") + self.logger.info('schedule task %s sub_tasks %s' % (task.id, str([sub_task.status for sub_task in task.subtask_list]))) if task.at_same_time: # parallel tasks @@ -689,7 +710,7 @@ class TaskMgr(threading.Thread): timeout = int(json_task['expTime']) # commands are executed in all vnodes / only excuted in the first vnode # if in traditional mode, commands will be executed in all vnodes - ) if (not 'atSameTime' in json_task.keys() or json_task['runon'] == 'all' or vnode_index == 0) else None + ) if (json_task['runon'] == 'all' or vnode_index == 0) else None } for vnode_index in range(int(json_task['vnodeCount']))]) if task.at_same_time: From 77bdec498c6b7529ebcd2e76005f329d9c92c3bf Mon Sep 17 00:00:00 2001 From: Firmlyzhu Date: Tue, 2 Apr 2019 17:15:38 +0800 Subject: [PATCH 2/6] update oss mount on create job page --- web/templates/batch/batch_create.html | 43 ++++++++++++++++----------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/web/templates/batch/batch_create.html b/web/templates/batch/batch_create.html index 469f560..eb7238f 100644 --- a/web/templates/batch/batch_create.html +++ b/web/templates/batch/batch_create.html @@ -130,17 +130,26 @@ var table = $("#storage_mapping_" + obj.id)[0]; var new_mapping = table.insertRow(); new_mapping.id = "mapping_" + task_number + "_" + mapping_number; - var local_dir = new_mapping.insertCell(); - var remote_dir = new_mapping.insertCell(); - var source = new_mapping.insertCell(); + var provider = new_mapping.insertCell(); + var bucket_name = new_mapping.insertCell(); + var accessKey = new_mapping.insertCell(); + var secretKey = new_mapping.insertCell(); + var endpoint = new_mapping.insertCell(); + var mountpath = new_mapping.insertCell(); var remove = new_mapping.insertCell(); - local_dir.innerHTML = ''; - remote_dir.innerHTML = ''; - source.innerHTML = ''; + secretKey.innerHTML = ''; + endpoint.innerHTML = 'http://'; + mountpath.innerHTML = ''; + provider.innerHTML = ''; + +''; remove.innerHTML = '
'; } @@ -324,18 +333,18 @@ +'' +'' +'
' - +'' - +'' - +'' + +'' + +'
' +'' - +'' - +'' + +'' +'' +'' +'' - +'
Local DirRemote DirsourceOperation
ProviderBucket NameAccessKey IDAccessKey SecretEndpointMount PathRemove
' - +'
' + +'' + +'' + +'
' +'' $(task_html).appendTo("#accordion"); } From 60fa6cc156c02794c74eb75d49abf7c3000f4e16 Mon Sep 17 00:00:00 2001 From: Firmlyzhu Date: Wed, 3 Apr 2019 17:18:10 +0800 Subject: [PATCH 3/6] add mount to task --- src/master/taskmgr.py | 9 +++++-- src/worker/ossmounter.py | 4 +-- src/worker/taskworker.py | 5 ++-- web/templates/batch/batch_create.html | 39 +++++++++++++++------------ 4 files changed, 34 insertions(+), 23 deletions(-) diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 3e7d06d..f56f394 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -692,8 +692,13 @@ class TaskMgr(threading.Thread): disk = int(json_task['diskSetting']), gpu = int(json_task['gpuSetting'])), mount = [Mount( - localPath = json_task['mapping'][mapping_key]['mappingLocalDir'], - remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir']) + provider = json_task['mapping'][mapping_key]['mappingProvider'], + localPath = json_task['mapping'][mapping_key]['mappingMountpath'], + remotePath = json_task['mapping'][mapping_key]['mappingBucketName'], + accessKey = json_task['mapping'][mapping_key]['mappingAccessKey'], + secretKey = json_task['mapping'][mapping_key]['mappingSecretKey'], + other = json_task['mapping'][mapping_key]['mappingEndpoint'] + ) for mapping_key in json_task['mapping']] if 'mapping' in json_task else [] ), ), diff --git a/src/worker/ossmounter.py b/src/worker/ossmounter.py index 607ca45..acd2812 100644 --- a/src/worker/ossmounter.py +++ b/src/worker/ossmounter.py @@ -27,7 +27,7 @@ class OssMounter(object): # umount oss pass -class aliyunOssMounter(OssMounter): +class AliyunOssMounter(OssMounter): @staticmethod def mount_oss(datapath, mount_info): @@ -42,7 +42,7 @@ class aliyunOssMounter(OssMounter): cmd = "chmod 640 /etc/passwd-ossfs" [success1, msg] = OssMounter.execute_cmd(cmd) - mountpath = datapath+"/"+mount_info.remotePath + mountpath = datapath+"/Aliyun/"+mount_info.remotePath logger.info("Mount oss %s %s" % (mount_info.remotePath, mountpath)) if not os.path.isdir(mountpath): os.makedirs(mountpath) diff --git a/src/worker/taskworker.py b/src/worker/taskworker.py index 83304ed..bff7a59 100755 --- a/src/worker/taskworker.py +++ b/src/worker/taskworker.py @@ -202,11 +202,12 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED, message=msg) #mount oss + rootfs = "/var/lib/lxc/%s/rootfs" % lxcname self.mount_oss("%s/global/users/%s/oss" % (self.fspath,username), mount_list) conffile = open("/var/lib/lxc/%s/config" % lxcname, 'a+') - mount_str = "lxc.mount.entry = %s/global/users/%s/oss/%s %s/root/oss/%s none bind,rw,create=dir 0 0" + mount_str = "lxc.mount.entry = %s/global/users/%s/oss/%s/%s %s/root/oss/%s none bind,rw,create=dir 0 0" for mount in mount_list: - conffile.write("\n"+ mount_str % (self.fspath, username, mount.remotePath, rootfs, mount.remotePath)) + conffile.write("\n"+ mount_str % (self.fspath, username, mount.provider, mount.remotePath, rootfs, mount.remotePath)) conffile.close() logger.info("Start container %s..." % lxcname) diff --git a/web/templates/batch/batch_create.html b/web/templates/batch/batch_create.html index eb7238f..9a873ad 100644 --- a/web/templates/batch/batch_create.html +++ b/web/templates/batch/batch_create.html @@ -121,15 +121,20 @@ $("#collapse" + obj.id).collapse('hide'); } + function chmountPath(obj,task_num,mapping_num) { + cellid = 'mapping_mountpath_' + task_num + '_' + mapping_num; + $('#'+cellid).val("/root/oss/"+obj.value); + } + function removeMapping(obj) { $("#mapping_" + obj.id).remove(); } - function addMapping(obj) { + function addMapping(obj,task_num) { mapping_number += 1; var table = $("#storage_mapping_" + obj.id)[0]; var new_mapping = table.insertRow(); - new_mapping.id = "mapping_" + task_number + "_" + mapping_number; + new_mapping.id = "mapping_" + task_num + "_" + mapping_number; var provider = new_mapping.insertCell(); var bucket_name = new_mapping.insertCell(); var accessKey = new_mapping.insertCell(); @@ -137,20 +142,20 @@ var endpoint = new_mapping.insertCell(); var mountpath = new_mapping.insertCell(); var remove = new_mapping.insertCell(); - bucket_name.innerHTML = ''; - accessKey.innerHTML = ''; - secretKey.innerHTML = ''; - endpoint.innerHTML = 'http://'; - mountpath.innerHTML = ''; - provider.innerHTML = ''; + accessKey.innerHTML = ''; + secretKey.innerHTML = ''; + endpoint.innerHTML = 'http://'; + mountpath.innerHTML = ''; + provider.innerHTML = ''; - remove.innerHTML = '
'; } @@ -217,7 +222,7 @@ function addTask() { task_number += 1; var masterip=$("select#masterselector").children('option:selected').val(); - mapping_number = 0; + //mapping_number = 0; var task_html = ''; task_html += '
' @@ -334,7 +339,7 @@ +'
' +'
' +'' +'
' +'' From 357f280bc820d84ad6db36eb2403e08052f06130 Mon Sep 17 00:00:00 2001 From: zhuyj17 Date: Wed, 3 Apr 2019 23:46:56 +0800 Subject: [PATCH 4/6] update batch_info.html to append mount info --- web/templates/batch/batch_info.html | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/web/templates/batch/batch_info.html b/web/templates/batch/batch_info.html index c0e8d57..9dbaea8 100644 --- a/web/templates/batch/batch_info.html +++ b/web/templates/batch/batch_info.html @@ -216,6 +216,30 @@
+
+ + + + + + + + + + + + {% for key in task['config']['mapping'].keys() %} + + + + + + + + {% endfor %} + +
ProviderBucket NameAccessKey IDEndpointMount Path
{{ task['config']['mapping'][key]['mappingProvider'] }}{{ task['config']['mapping'][key]['mappingBucketName'] }}{{ task['config']['mapping'][key]['mappingAccessKey'] }}{{ task['config']['mapping'][key]['mappingEndpoint'] }}{{ task['config']['mapping'][key]['mappingMountpath'] }}
+
From 5478a8f431592289b3eea2e1075b9ec9857b04fd Mon Sep 17 00:00:00 2001 From: zhuyj17 Date: Thu, 4 Apr 2019 01:06:59 +0800 Subject: [PATCH 5/6] Recover jobs when restart master --- src/master/jobmgr.py | 43 +++++++++++++++++++++++++---- src/master/taskmgr.py | 3 +- src/utils/model.py | 15 ++++++++++ web/templates/batch/batch_info.html | 2 ++ 4 files changed, 57 insertions(+), 6 deletions(-) diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py index 5097c03..1b454a2 100644 --- a/src/master/jobmgr.py +++ b/src/master/jobmgr.py @@ -17,8 +17,19 @@ def db_commit(): raise class BatchJob(object): - def __init__(self, jobid, user, job_info): - self.job_db = Batchjob(jobid,user,job_info['jobName'],int(job_info['jobPriority'])) + def __init__(self, jobid, user, job_info, old_job_db=None): + if old_job_db is None: + self.job_db = Batchjob(jobid,user,job_info['jobName'],int(job_info['jobPriority'])) + else: + self.job_db = old_job_db + self.job_db.clear() + job_info = {} + job_info['jobName'] = self.job_db.name + job_info['jobPriority'] = self.job_db.priority + all_tasks = self.job_db.tasks.all() + job_info['tasks'] = {} + for t in all_tasks: + job_info['tasks'][t.idx] = json.loads(t.config) self.user = user #self.raw_job_info = job_info self.job_id = jobid @@ -35,8 +46,12 @@ class BatchJob(object): self.tasks_cnt['pending'] = len(raw_tasks.keys()) for task_idx in raw_tasks.keys(): task_info = raw_tasks[task_idx] - task_db = Batchtask(jobid+"_"+task_idx, task_idx, task_info) - self.job_db.tasks.append(task_db) + if old_job_db is None: + task_db = Batchtask(jobid+"_"+task_idx, task_idx, task_info) + self.job_db.tasks.append(task_db) + else: + task_db = Batchtask.query.get(jobid+"_"+task_idx) + task_db.clear() self.tasks[task_idx] = {} self.tasks[task_idx]['id'] = jobid+"_"+task_idx self.tasks[task_idx]['config'] = task_info @@ -54,7 +69,8 @@ class BatchJob(object): self.dependency_out[d] = [] self.dependency_out[d].append(task_idx) - db.session.add(self.job_db) + if old_job_db is None: + db.session.add(self.job_db) db_commit() self.log_status() @@ -259,6 +275,7 @@ class JobMgr(): # load job information from etcd # initial a job queue and job schedueler def __init__(self, taskmgr): + logger.info("Init jobmgr...") try: Batchjob.query.all() except: @@ -270,6 +287,22 @@ class JobMgr(): self.userpoint = "http://" + env.getenv('USER_IP') + ":" + str(env.getenv('USER_PORT')) self.auth_key = env.getenv('AUTH_KEY') + self.recover_jobs() + + def recover_jobs(self): + logger.info("Rerun the unfailed and unfinished jobs...") + try: + rejobs = Batchjob.query.filter(~Batchjob.status.in_(['done','failed'])) + rejobs = rejobs.order_by(Batchjob.create_time).all() + for rejob in rejobs: + logger.info("Rerun job: "+rejob.id) + logger.debug(str(rejob)) + job = BatchJob(rejob.id, rejob.username, None, rejob) + self.job_map[job.job_id] = job + self.process_job(job) + except Exception as err: + logger.error(traceback.format_exc()) + def charge_beans(self,username,billing): logger.debug("Charge user(%s) for %d beans"%(username, billing)) data = {"owner_name":username,"billing":billing, "auth_key":self.auth_key} diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index f56f394..1e501a0 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -499,13 +499,14 @@ class TaskMgr(threading.Thread): if sub_task.status != RUNNING: self.logger.error('[on_task_report] receive task report when vnode is not running') - sub_task.status = report.subTaskStatus + #sub_task.status = report.subTaskStatus sub_task.status_reason = report.errmsg sub_task.task_started = False if report.subTaskStatus == FAILED or report.subTaskStatus == TIMEOUT: self.clear_sub_task(sub_task) sub_task.waiting_for_retry(report.errmsg) + self.logger.info('task %s report failed, status %d, subtasks: %s' % (task.id, task.status, str([sub_task.status for sub_task in task.subtask_list]))) if sub_task.status == WAITING: self.jobmgr.report(task.username, task.id, 'retrying', report.errmsg) elif report.subTaskStatus == OUTPUTERROR: diff --git a/src/utils/model.py b/src/utils/model.py index 8eca040..959d7d5 100755 --- a/src/utils/model.py +++ b/src/utils/model.py @@ -461,6 +461,12 @@ class Batchjob(db.Model): self.end_time = None self.billing = 0 + def clear(self): + self.status = "pending" + self.failed_reason = "" + self.end_time = None + self.billing = 0 + def __repr__(self): info = {} info['job_id'] = self.id @@ -503,6 +509,15 @@ class Batchtask(db.Model): self.config = json.dumps(config) self.tried_times = 0 + def clear(self): + self.status = "pending" + self.failed_reason = "" + self.start_time = None + self.end_time = None + self.running_time = 0 + self.billing = 0 + self.tried_times = 0 + def __repr__(self): info = {} info['id'] = self.id diff --git a/web/templates/batch/batch_info.html b/web/templates/batch/batch_info.html index 9dbaea8..05fdd87 100644 --- a/web/templates/batch/batch_info.html +++ b/web/templates/batch/batch_info.html @@ -216,6 +216,7 @@ + {% if 'mapping' in task['config'].keys() %}
@@ -240,6 +241,7 @@
+ {% endif %} From 10e000df8bcca5c5853458a0fd46e39212d6a582 Mon Sep 17 00:00:00 2001 From: zhuyj17 Date: Thu, 4 Apr 2019 01:11:26 +0800 Subject: [PATCH 6/6] update year --- web/templates/base_AdminLTE.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/templates/base_AdminLTE.html b/web/templates/base_AdminLTE.html index d015311..1395c3c 100644 --- a/web/templates/base_AdminLTE.html +++ b/web/templates/base_AdminLTE.html @@ -244,7 +244,7 @@ Docklet {{ version }} - Copyright© 2017 UniAS@ SEI, PKU + Copyright© 2019 UniAS@ SEI, PKU