diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py index 1b454a2..fc02ec5 100644 --- a/src/master/jobmgr.py +++ b/src/master/jobmgr.py @@ -171,6 +171,7 @@ class BatchJob(object): self.tasks[task_idx]['db'].running_time = running_time self.tasks[task_idx]['db'].end_time = datetime.now() self.tasks[task_idx]['db'].billing = billing + self.tasks[task_idx]['db'].failed_reason = "" self.job_db = Batchjob.query.get(self.job_id) self.job_db.billing += billing self.tasks_cnt['finished'] += 1 diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 99b1fc8..0bed1a6 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -417,7 +417,8 @@ class TaskMgr(threading.Thread): [success, msg] = self.start_vnode(sub_task) if not success: sub_task.waiting_for_retry("Fail to start vnode.") - self.jobmgr.report(task.username, task.id, 'retrying', "Fail to start vnode.") + if sub_task.status == WAITING: + self.jobmgr.report(task.username, task.id, 'retrying', "Fail to start vnode.") sub_task.worker = None start_all_vnode_success = False @@ -436,8 +437,9 @@ class TaskMgr(threading.Thread): if success: sub_task.status = RUNNING else: - sub_task.waiting_for_retry("Failt to start task.") - self.jobmgr.report(task.username, task.id, 'retrying', "Fail to start task.") + sub_task.waiting_for_retry("Fail to start task.") + if sub_task.status == WAITING: + self.jobmgr.report(task.username, task.id, 'retrying', "Fail to start task.") def clear_sub_tasks(self, sub_task_list): for sub_task in sub_task_list: diff --git a/src/worker/ossmounter.py b/src/worker/ossmounter.py index acd2812..cbdd9e3 100644 --- a/src/worker/ossmounter.py +++ b/src/worker/ossmounter.py @@ -42,19 +42,31 @@ class AliyunOssMounter(OssMounter): cmd = "chmod 640 /etc/passwd-ossfs" [success1, msg] = OssMounter.execute_cmd(cmd) + if not success1: + logger.error("Aliyun OSS mount chmod err:%s" % msg) + return [False, msg] mountpath = datapath+"/Aliyun/"+mount_info.remotePath logger.info("Mount oss %s %s" % (mount_info.remotePath, mountpath)) if not os.path.isdir(mountpath): os.makedirs(mountpath) cmd = "ossfs %s %s -ourl=%s" % (mount_info.remotePath, mountpath, mount_info.other) [success, msg] = OssMounter.execute_cmd(cmd) + if not success: + logger.error("Aliyun OSS mount err:%s" % msg) + return [False, msg] return [True,""] @staticmethod def umount_oss(datapath, mount_info): - mountpath = datapath + "/" + mount_info.remotePath + mountpath = datapath+"/Aliyun/"+mount_info.remotePath logger.info("UMount oss %s %s" % (mount_info.remotePath, mountpath)) cmd = "fusermount -u %s" % (mountpath) [success, msg] = OssMounter.execute_cmd(cmd) + if not success: + logger.error("Aliyun OSS umount err:%s"%msg) + return [False,msg] [success, msg] = OssMounter.execute_cmd("rm -rf %s" % mountpath) + if not success: + logger.error("Aliyun OSS umount err:%s"%msg) + return [False,msg] return [True,""] diff --git a/src/worker/taskworker.py b/src/worker/taskworker.py index bff7a59..f0fa3f3 100755 --- a/src/worker/taskworker.py +++ b/src/worker/taskworker.py @@ -82,6 +82,16 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): def stop_and_rm_containers(self,lxcname): logger.info("Stop the container with name:"+lxcname) subprocess.run("lxc-stop -k -n %s" % lxcname, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) + lxcpath = "/var/lib/lxc/%s" % lxcname + try: + mount_info = [] + for provider in os.listdir(lxcpath+"/oss"): + for bkname in os.listdir(lxcpath+"/oss/"+provider): + mount_info.append(rpc_pb2.Mount(provider=provider,remotePath=bkname)) + self.umount_oss(lxcpath+"/oss", mount_info) + except Exception as err: + logger.info(err) + pass return self.imgmgr.deleteFS(lxcname) def rm_all_batch_containers(self): @@ -202,12 +212,13 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED, message=msg) #mount oss - rootfs = "/var/lib/lxc/%s/rootfs" % lxcname - self.mount_oss("%s/global/users/%s/oss" % (self.fspath,username), mount_list) - conffile = open("/var/lib/lxc/%s/config" % lxcname, 'a+') - mount_str = "lxc.mount.entry = %s/global/users/%s/oss/%s/%s %s/root/oss/%s none bind,rw,create=dir 0 0" + lxcpath = "/var/lib/lxc/%s" % lxcname + rootfs = lxcpath + "/rootfs" + self.mount_oss(lxcpath + "/oss", mount_list) + conffile = open(lxcpath + "/config", 'a+') + mount_str = "lxc.mount.entry = "+ lxcpath +"/oss/%s/%s %s/root/oss/%s none bind,rw,create=dir 0 0" for mount in mount_list: - conffile.write("\n"+ mount_str % (self.fspath, username, mount.provider, mount.remotePath, rootfs, mount.remotePath)) + conffile.write("\n"+ mount_str % (mount.provider, mount.remotePath, rootfs, mount.remotePath)) conffile.close() logger.info("Start container %s..." % lxcname) @@ -215,6 +226,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): ret = subprocess.run('lxc-start -n %s'%lxcname,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True) if ret.returncode != 0: logger.error('start container %s failed' % lxcname) + self.umount_oss("/var/lib/lxc/%s/oss" % (lxcname), mount_list) self.imgmgr.deleteFS(lxcname) return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Can't start the container(%s)"%lxcname) @@ -228,6 +240,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): if not success: logger.error("Fail to add gpu device. " + msg) container.stop() + self.umount_oss("/var/lib/lxc/%s/oss" % (lxcname), mount_list) self.imgmgr.deleteFS(lxcname) return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Fail to add gpu device. " + msg) @@ -235,6 +248,10 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): cmd = "lxc-attach -n %s -- service ssh start" % lxcname ret = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True) if ret.returncode != 0: + logger.error('Fail to start ssh service of container %s' % lxcname) + container.stop() + self.umount_oss("/var/lib/lxc/%s/oss" % (lxcname), mount_list) + self.imgmgr.deleteFS(lxcname) return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Fail to start ssh service. lxc(%s)"%lxcname) return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="") @@ -289,6 +306,9 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): else: logger.error("stop container %s failed" % lxcname) + #umount oss + self.umount_oss("/var/lib/lxc/%s/oss" % (lxcname), mount_list) + logger.info("deleting container:%s" % lxcname) if self.imgmgr.deleteFS(lxcname): logger.info("delete container %s success" % lxcname) @@ -302,9 +322,6 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): #release gpu self.release_gpu_device(lxcname) - #umount oss - self.umount_oss("%s/global/users/%s/oss" % (self.fspath,username), mount_list) - return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")