Merge pull request #382 from FirmlyReality/batch

Batch
This commit is contained in:
Yujian Zhu 2019-04-12 18:04:47 +08:00 committed by GitHub
commit f054578686
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 44 additions and 12 deletions

View File

@ -171,6 +171,7 @@ class BatchJob(object):
self.tasks[task_idx]['db'].running_time = running_time self.tasks[task_idx]['db'].running_time = running_time
self.tasks[task_idx]['db'].end_time = datetime.now() self.tasks[task_idx]['db'].end_time = datetime.now()
self.tasks[task_idx]['db'].billing = billing self.tasks[task_idx]['db'].billing = billing
self.tasks[task_idx]['db'].failed_reason = ""
self.job_db = Batchjob.query.get(self.job_id) self.job_db = Batchjob.query.get(self.job_id)
self.job_db.billing += billing self.job_db.billing += billing
self.tasks_cnt['finished'] += 1 self.tasks_cnt['finished'] += 1

View File

@ -417,6 +417,7 @@ class TaskMgr(threading.Thread):
[success, msg] = self.start_vnode(sub_task) [success, msg] = self.start_vnode(sub_task)
if not success: if not success:
sub_task.waiting_for_retry("Fail to start vnode.") sub_task.waiting_for_retry("Fail to start vnode.")
if sub_task.status == WAITING:
self.jobmgr.report(task.username, task.id, 'retrying', "Fail to start vnode.") self.jobmgr.report(task.username, task.id, 'retrying', "Fail to start vnode.")
sub_task.worker = None sub_task.worker = None
start_all_vnode_success = False start_all_vnode_success = False
@ -436,7 +437,8 @@ class TaskMgr(threading.Thread):
if success: if success:
sub_task.status = RUNNING sub_task.status = RUNNING
else: else:
sub_task.waiting_for_retry("Failt to start task.") sub_task.waiting_for_retry("Fail to start task.")
if sub_task.status == WAITING:
self.jobmgr.report(task.username, task.id, 'retrying', "Fail to start task.") self.jobmgr.report(task.username, task.id, 'retrying', "Fail to start task.")
def clear_sub_tasks(self, sub_task_list): def clear_sub_tasks(self, sub_task_list):

View File

@ -42,19 +42,31 @@ class AliyunOssMounter(OssMounter):
cmd = "chmod 640 /etc/passwd-ossfs" cmd = "chmod 640 /etc/passwd-ossfs"
[success1, msg] = OssMounter.execute_cmd(cmd) [success1, msg] = OssMounter.execute_cmd(cmd)
if not success1:
logger.error("Aliyun OSS mount chmod err:%s" % msg)
return [False, msg]
mountpath = datapath+"/Aliyun/"+mount_info.remotePath mountpath = datapath+"/Aliyun/"+mount_info.remotePath
logger.info("Mount oss %s %s" % (mount_info.remotePath, mountpath)) logger.info("Mount oss %s %s" % (mount_info.remotePath, mountpath))
if not os.path.isdir(mountpath): if not os.path.isdir(mountpath):
os.makedirs(mountpath) os.makedirs(mountpath)
cmd = "ossfs %s %s -ourl=%s" % (mount_info.remotePath, mountpath, mount_info.other) cmd = "ossfs %s %s -ourl=%s" % (mount_info.remotePath, mountpath, mount_info.other)
[success, msg] = OssMounter.execute_cmd(cmd) [success, msg] = OssMounter.execute_cmd(cmd)
if not success:
logger.error("Aliyun OSS mount err:%s" % msg)
return [False, msg]
return [True,""] return [True,""]
@staticmethod @staticmethod
def umount_oss(datapath, mount_info): def umount_oss(datapath, mount_info):
mountpath = datapath + "/" + mount_info.remotePath mountpath = datapath+"/Aliyun/"+mount_info.remotePath
logger.info("UMount oss %s %s" % (mount_info.remotePath, mountpath)) logger.info("UMount oss %s %s" % (mount_info.remotePath, mountpath))
cmd = "fusermount -u %s" % (mountpath) cmd = "fusermount -u %s" % (mountpath)
[success, msg] = OssMounter.execute_cmd(cmd) [success, msg] = OssMounter.execute_cmd(cmd)
if not success:
logger.error("Aliyun OSS umount err:%s"%msg)
return [False,msg]
[success, msg] = OssMounter.execute_cmd("rm -rf %s" % mountpath) [success, msg] = OssMounter.execute_cmd("rm -rf %s" % mountpath)
if not success:
logger.error("Aliyun OSS umount err:%s"%msg)
return [False,msg]
return [True,""] return [True,""]

View File

@ -82,6 +82,16 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
def stop_and_rm_containers(self,lxcname): def stop_and_rm_containers(self,lxcname):
logger.info("Stop the container with name:"+lxcname) logger.info("Stop the container with name:"+lxcname)
subprocess.run("lxc-stop -k -n %s" % lxcname, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) subprocess.run("lxc-stop -k -n %s" % lxcname, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
lxcpath = "/var/lib/lxc/%s" % lxcname
try:
mount_info = []
for provider in os.listdir(lxcpath+"/oss"):
for bkname in os.listdir(lxcpath+"/oss/"+provider):
mount_info.append(rpc_pb2.Mount(provider=provider,remotePath=bkname))
self.umount_oss(lxcpath+"/oss", mount_info)
except Exception as err:
logger.info(err)
pass
return self.imgmgr.deleteFS(lxcname) return self.imgmgr.deleteFS(lxcname)
def rm_all_batch_containers(self): def rm_all_batch_containers(self):
@ -202,12 +212,13 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED, message=msg) return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED, message=msg)
#mount oss #mount oss
rootfs = "/var/lib/lxc/%s/rootfs" % lxcname lxcpath = "/var/lib/lxc/%s" % lxcname
self.mount_oss("%s/global/users/%s/oss" % (self.fspath,username), mount_list) rootfs = lxcpath + "/rootfs"
conffile = open("/var/lib/lxc/%s/config" % lxcname, 'a+') self.mount_oss(lxcpath + "/oss", mount_list)
mount_str = "lxc.mount.entry = %s/global/users/%s/oss/%s/%s %s/root/oss/%s none bind,rw,create=dir 0 0" conffile = open(lxcpath + "/config", 'a+')
mount_str = "lxc.mount.entry = "+ lxcpath +"/oss/%s/%s %s/root/oss/%s none bind,rw,create=dir 0 0"
for mount in mount_list: for mount in mount_list:
conffile.write("\n"+ mount_str % (self.fspath, username, mount.provider, mount.remotePath, rootfs, mount.remotePath)) conffile.write("\n"+ mount_str % (mount.provider, mount.remotePath, rootfs, mount.remotePath))
conffile.close() conffile.close()
logger.info("Start container %s..." % lxcname) logger.info("Start container %s..." % lxcname)
@ -215,6 +226,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
ret = subprocess.run('lxc-start -n %s'%lxcname,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True) ret = subprocess.run('lxc-start -n %s'%lxcname,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True)
if ret.returncode != 0: if ret.returncode != 0:
logger.error('start container %s failed' % lxcname) logger.error('start container %s failed' % lxcname)
self.umount_oss("/var/lib/lxc/%s/oss" % (lxcname), mount_list)
self.imgmgr.deleteFS(lxcname) self.imgmgr.deleteFS(lxcname)
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Can't start the container(%s)"%lxcname) return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Can't start the container(%s)"%lxcname)
@ -228,6 +240,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
if not success: if not success:
logger.error("Fail to add gpu device. " + msg) logger.error("Fail to add gpu device. " + msg)
container.stop() container.stop()
self.umount_oss("/var/lib/lxc/%s/oss" % (lxcname), mount_list)
self.imgmgr.deleteFS(lxcname) self.imgmgr.deleteFS(lxcname)
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Fail to add gpu device. " + msg) return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Fail to add gpu device. " + msg)
@ -235,6 +248,10 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
cmd = "lxc-attach -n %s -- service ssh start" % lxcname cmd = "lxc-attach -n %s -- service ssh start" % lxcname
ret = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True) ret = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True)
if ret.returncode != 0: if ret.returncode != 0:
logger.error('Fail to start ssh service of container %s' % lxcname)
container.stop()
self.umount_oss("/var/lib/lxc/%s/oss" % (lxcname), mount_list)
self.imgmgr.deleteFS(lxcname)
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Fail to start ssh service. lxc(%s)"%lxcname) return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Fail to start ssh service. lxc(%s)"%lxcname)
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="") return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
@ -289,6 +306,9 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
else: else:
logger.error("stop container %s failed" % lxcname) logger.error("stop container %s failed" % lxcname)
#umount oss
self.umount_oss("/var/lib/lxc/%s/oss" % (lxcname), mount_list)
logger.info("deleting container:%s" % lxcname) logger.info("deleting container:%s" % lxcname)
if self.imgmgr.deleteFS(lxcname): if self.imgmgr.deleteFS(lxcname):
logger.info("delete container %s success" % lxcname) logger.info("delete container %s success" % lxcname)
@ -302,9 +322,6 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
#release gpu #release gpu
self.release_gpu_device(lxcname) self.release_gpu_device(lxcname)
#umount oss
self.umount_oss("%s/global/users/%s/oss" % (self.fspath,username), mount_list)
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="") return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")