From bbb96db7c9cbd53edc944ad440ac1f3951da800e Mon Sep 17 00:00:00 2001 From: zhuyj17 Date: Wed, 6 Mar 2019 01:39:05 +0800 Subject: [PATCH] Fix many bugs & beautifify web pages --- bin/docklet-master | 4 +-- bin/docklet-supermaster | 4 +-- bin/docklet-worker | 4 +-- conf/container.batch.conf | 2 +- src/master/httprest.py | 6 ++--- src/master/jobmgr.py | 16 +++++------ src/master/taskmgr.py | 38 ++++++++++++++++++--------- src/master/testTaskWorker.py | 6 ++--- src/worker/taskworker.py | 11 ++++---- web/templates/batch/batch_create.html | 7 +++++ web/templates/batch/batch_list.html | 2 +- web/templates/batch/batch_output.html | 6 ++--- web/web.py | 12 ++++----- web/webViews/batch.py | 6 ++--- 14 files changed, 72 insertions(+), 52 deletions(-) diff --git a/bin/docklet-master b/bin/docklet-master index efe1a92..f516595 100755 --- a/bin/docklet-master +++ b/bin/docklet-master @@ -32,8 +32,8 @@ WEB_PORT=8888 USER_PORT=9100 #cluster net, default is 172.16.0.1/16 CLUSTER_NET="172.16.0.1/16" -# ip addresses range of containers for batch job, default is 10.0.3.0/24 -BATCH_NET="10.0.3.0/24" +# ip addresses range of containers for batch job, default is 10.16.0.0/16 +BATCH_NET="10.16.0.0/16" #configurable-http-proxy public port, default is 8000 PROXY_PORT=8000 #configurable-http-proxy api port, default is 8001 diff --git a/bin/docklet-supermaster b/bin/docklet-supermaster index b9ae006..3934846 100755 --- a/bin/docklet-supermaster +++ b/bin/docklet-supermaster @@ -36,8 +36,8 @@ WEB_PORT=8888 USER_PORT=9100 #cluster net, default is 172.16.0.1/16 CLUSTER_NET="172.16.0.1/16" -# ip addresses range of containers for batch job, default is 10.0.3.0/24 -BATCH_NET="10.0.3.0/24" +# ip addresses range of containers for batch job, default is 10.16.0.0/16 +BATCH_NET="10.16.0.0/16" . $DOCKLET_CONF/docklet.conf diff --git a/bin/docklet-worker b/bin/docklet-worker index 990191e..f7efb91 100755 --- a/bin/docklet-worker +++ b/bin/docklet-worker @@ -20,8 +20,8 @@ FS_PREFIX=/opt/docklet # cluster net ip range, default is 172.16.0.1/16 CLUSTER_NET="172.16.0.1/16" -# ip addresses range of containers for batch job, default is 10.0.3.0/24 -BATCH_NET="10.0.3.0/24" +# ip addresses range of containers for batch job, default is 10.16.0.0/16 +BATCH_NET="10.16.0.0/16" #configurable-http-proxy public port, default is 8000 PROXY_PORT=8000 #configurable-http-proxy api port, default is 8001 diff --git a/conf/container.batch.conf b/conf/container.batch.conf index 7d8dd18..1746f18 100644 --- a/conf/container.batch.conf +++ b/conf/container.batch.conf @@ -45,7 +45,7 @@ lxc.cgroup.cpu.cfs_quota_us = %CONTAINER_CPU% lxc.cap.drop = sys_admin net_admin mac_admin mac_override sys_time sys_module lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/data %ROOTFS%/root/nfs none bind,rw,create=dir 0 0 -#lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/hosts/batch-%TASKID%.hosts %ROOTFS%/etc/hosts none bind,ro,create=file 0 0 +lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/hosts/batch-%TASKID%.hosts %ROOTFS%/etc/hosts none bind,ro,create=file 0 0 lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/ssh %ROOTFS%/root/.ssh none bind,ro,create=dir 0 0 lxc.mount.entry = %FS_PREFIX%/local/temp/%LXCNAME%/ %ROOTFS%/tmp none bind,rw,create=dir 0 0 diff --git a/src/master/httprest.py b/src/master/httprest.py index d71c974..d12582b 100755 --- a/src/master/httprest.py +++ b/src/master/httprest.py @@ -885,11 +885,11 @@ def get_output(user,beans,form): global G_jobmgr jobid = form.get("jobid","") taskid = form.get("taskid","") - instid = form.get("instid","") + vnodeid = form.get("vnodeid","") issue = form.get("issue","") result = { 'success': 'true', - 'data': G_jobmgr.get_output(user,jobid,taskid,instid,issue) + 'data': G_jobmgr.get_output(user,jobid,taskid,vnodeid,issue) } return json.dumps(result) @@ -1099,7 +1099,7 @@ if __name__ == '__main__': # server = http.server.HTTPServer((masterip, masterport), DockletHttpHandler) logger.info("starting master server") - G_taskmgr = taskmgr.TaskMgr(G_nodemgr, monitor.Fetcher) + G_taskmgr = taskmgr.TaskMgr(G_nodemgr, monitor.Fetcher, ipaddr) G_jobmgr = jobmgr.JobMgr(G_taskmgr) G_taskmgr.set_jobmgr(G_jobmgr) G_taskmgr.start() diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py index d8fb644..c8ca5b9 100644 --- a/src/master/jobmgr.py +++ b/src/master/jobmgr.py @@ -69,7 +69,7 @@ class BatchJob(object): self.tasks_cnt['pending'] -= 1 self.tasks_cnt['scheduling'] += 1 self.tasks[task_idx]['status'] = 'scheduling' - task_name = self.user + '_' + self.job_id + '_' + task_idx + task_name = self.job_id + '_' + task_idx ret_tasks.append([task_name, self.tasks[task_idx]['config'], self.job_priority]) self.log_status() return ret_tasks @@ -124,7 +124,7 @@ class BatchJob(object): self.tasks_cnt['pending'] -= 1 self.tasks_cnt['scheduling'] += 1 self.tasks[out_idx]['status'] = 'scheduling' - task_name = self.user + '_' + self.job_id + '_' + out_idx + task_name = self.job_id + '_' + out_idx ret_tasks.append([task_name, self.tasks[out_idx]['config'], self.job_priority]) self.log_status() return ret_tasks @@ -250,16 +250,16 @@ class JobMgr(): return self.add_task_taskmgr(job.user, tasks) # report task status from taskmgr when running, failed and finished - # task_name: user + '_' + job_id + '_' + task_idx + # task_name: job_id + '_' + task_idx # status: 'running', 'finished', 'retrying', 'failed' # reason: reason for failure or retrying, such as "FAILED", "TIMEOUT", "OUTPUTERROR" # tried_times: how many times the task has been tried. - def report(self, task_name, status, reason="", tried_times=1): + def report(self, user, task_name, status, reason="", tried_times=1): split_task_name = task_name.split('_') - if len(split_task_name) != 3: + if len(split_task_name) != 2: logger.error("Illegal task_name(%s) report from taskmgr" % task_name) return - user, job_id, task_idx = split_task_name + job_id, task_idx = split_task_name job = self.job_map[job_id] if status == "running": job.update_task_running(task_idx) @@ -274,8 +274,8 @@ class JobMgr(): job.update_task_failed(task_idx, reason, tried_times) # Get Batch job stdout or stderr from its file - def get_output(self, username, jobid, taskid, instid, issue): - filename = username + "_" + jobid + "_" + taskid + "_" + instid + "_" + issue + ".txt" + def get_output(self, username, jobid, taskid, vnodeid, issue): + filename = jobid + "_" + taskid + "_" + vnodeid + "_" + issue + ".txt" fpath = "%s/global/users/%s/data/batch_%s/%s" % (self.fspath,username,jobid,filename) logger.info("Get output from:%s" % fpath) try: diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 0ff9ae0..0c5ec1a 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -59,14 +59,14 @@ class Task(): def gen_hosts(self): username = self.username taskid = self.id - # logger.info("Generate hosts for user(%s) task(%s) base_ip(%s)"%(username,taskid,str(self.task_base_ip))) + logger.info("Generate hosts for user(%s) task(%s) base_ip(%s)"%(username,taskid,str(self.task_base_ip))) fspath = env.getenv('FS_PREFIX') if not os.path.isdir("%s/global/users/%s" % (fspath,username)): path = env.getenv('DOCKLET_LIB') subprocess.call([path+"/master/userinit.sh", username]) - # logger.info("user %s directory not found, create it" % username) + logger.info("user %s directory not found, create it" % username) - hosts_file = open("%s/global/users/%s/%s.hosts" % (fspath,username,"batch-"+taskid),"w") + hosts_file = open("%s/global/users/%s/hosts/%s.hosts" % (fspath,username,"batch-"+taskid),"w") hosts_file.write("127.0.0.1 localhost\n") i = 0 for ip in self.ips: @@ -124,7 +124,7 @@ class TaskMgr(threading.Thread): #self.user_containers = {} self.scheduler_interval = scheduler_interval - self.logger = external_logger + self.logger = logger self.master_port = env.getenv('BATCH_MASTER_PORT') self.worker_port = env.getenv('BATCH_WORKER_PORT') @@ -288,16 +288,17 @@ class TaskMgr(threading.Thread): taskid = task.id username = task.username brname = "docklet-batch-%s-%s"%(username, taskid) - gwname = "Batch-%s-%s"%(username, taskid) + gwname = taskid if task.task_base_ip == None: return [False, "task.task_base_ip is None!"] gatewayip = int_to_ip(self.base_ip + task.task_base_ip + 1) - gatewayipcidr = "/" + str(32-self.task_cidr) + gatewayipcidr = gatewayip + "/" + str(32-self.task_cidr) netcontrol.new_bridge(brname) netcontrol.setup_gw(brname,gwname,gatewayipcidr,0,0) for wip in workers: - netcontrol.setup_gre(brname,wip) + if wip != self.master_ip: + netcontrol.setup_gre(brname,wip) return [True, gatewayip] def remove_tasknet(self, task): @@ -336,13 +337,14 @@ class TaskMgr(threading.Thread): #if not username in self.user_containers.keys(): #self.user_containers[username] = [] #self.user_containers[username].append(container_name) - ipaddr = task.ips[vnode_info.vnodeid % task.max_size] + ipaddr = task.ips[vnode_info.vnodeid % task.max_size] + "/" + str(32-self.task_cidr) brname = "docklet-batch-%s-%s" % (username, sub_task.root_task.id) networkinfo = Network(ipaddr=ipaddr, gateway=gwip, masterip=self.master_ip, brname=brname) vnode_info.vnode.network.CopyFrom(networkinfo) placed_workers.append(sub_task.worker) - if not self.start_vnode(sub_task): + [success,msg] = self.start_vnode(sub_task) + if not success: sub_task.waiting_for_retry() sub_task.worker = None start_all_vnode_success = False @@ -371,6 +373,7 @@ class TaskMgr(threading.Thread): self.stop_task(sub_task) if sub_task.vnode_started: self.stop_vnode(sub_task) + #pass def check_task_completed(self, task): if task.status == RUNNING or task.status == WAITING: @@ -381,6 +384,12 @@ class TaskMgr(threading.Thread): if task.at_same_time and task.status == FAILED: self.clear_sub_tasks(task.subtask_list) # TODO report to jobmgr + if self.jobmgr is None: + self.logger.error('[task_completed] jobmgr is None!') + else: + username = task.username + taskid = task.id + self.jobmgr.report(username,taskid,'finished') self.lazy_delete_list.append(task) return True @@ -401,7 +410,7 @@ class TaskMgr(threading.Thread): # self.user_containers[username].remove(container_name) if sub_task.status != RUNNING: - self.logger.error('[on_task_report] receive task report when instance is not running') + self.logger.error('[on_task_report] receive task report when vnode is not running') sub_task.status = report.subTaskStatus sub_task.status_reason = report.errmsg @@ -461,8 +470,11 @@ class TaskMgr(threading.Thread): proper_workers.append(sub_task.worker) continue needs = sub_task.vnode_info.vnode.instance + #logger.info(needs) proper_worker = None for worker_ip, worker_info in nodes: + #logger.info(worker_info) + #logger.info(self.get_cpu_usage(worker_ip)) if needs.cpu + self.get_cpu_usage(worker_ip) > worker_info['cpu']: continue elif needs.memory > worker_info['memory']: @@ -542,7 +554,7 @@ class TaskMgr(threading.Thread): task_id = taskid, username = username, # all vnode must be started at the same time - at_same_time = json_task['at_same_time'], + at_same_time = 'atSameTime' in json_task.keys(), priority = task_priority, max_size = (1 << self.task_cidr) - 2, task_infos = [{ @@ -579,8 +591,8 @@ class TaskMgr(threading.Thread): timeout = int(json_task['expTime']) # commands are executed in all vnodes / only excuted in the first vnode # if in traditional mode, commands will be executed in all vnodes - ) if (not json_task['at_same_time'] or json_task['multicommand'] or instance_index == 0) else None - } for instance_index in range(json_task['instCount'])]) + ) if (not 'atSameTime' in json_task.keys() or json_task['runon'] == 'all' or vnode_index == 0) else None + } for vnode_index in range(int(json_task['vnodeCount']))]) self.lazy_append_list.append(task) diff --git a/src/master/testTaskWorker.py b/src/master/testTaskWorker.py index d444884..96fb9cc 100644 --- a/src/master/testTaskWorker.py +++ b/src/master/testTaskWorker.py @@ -18,7 +18,7 @@ def run(): inst = rpc_pb2.Instance(cpu=1, memory=1000, disk=1000, gpu=0) mnt = rpc_pb2.Mount(localPath="",provider='aliyun',remotePath="test-for-docklet",other="oss-cn-beijing.aliyuncs.com",accessKey="LTAIdl7gmmIhfqA9",secretKey="") network = rpc_pb2.Network(ipaddr="10.0.4.2/24",gateway="10.0.4.1",masterip="192.168.0.1",brname="batch-root-test") - vnode = rpc_pb2.VNode(image=img, instance=inst, mount=[],network=network) + vnode = rpc_pb2.VNode(image=img, instance=inst, mount=[],network=network,hostname="batch-5") vnodeinfo = rpc_pb2.VNodeInfo(taskid="test",username="root",vnodeid=1,vnode=vnode) #task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=1,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=60000,token=''.join(random.sample(string.ascii_letters + string.digits, 8))) @@ -59,8 +59,8 @@ def start_task(): if __name__ == '__main__': #for i in range(10): - run() + #run() #start_task() - #stop_vnode() + stop_vnode() #time.sleep(4) #stop_task() diff --git a/src/worker/taskworker.py b/src/worker/taskworker.py index 3bbdbb4..248ab46 100755 --- a/src/worker/taskworker.py +++ b/src/worker/taskworker.py @@ -75,7 +75,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): for gpu in gpus: self.gpu_status[gpu['id']] = "" - #self.start_report() + self.start_report() logger.info('TaskWorker init success') def add_gpu_device(self, lxcname, gpu_need): @@ -194,7 +194,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): conffile.close() logger.info("Start container %s..." % lxcname) - #container = lxc.Container(lxcname) + container = lxc.Container(lxcname) ret = subprocess.run('lxc-start -n %s'%lxcname,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True) if ret.returncode != 0: logger.error('start container %s failed' % lxcname) @@ -355,15 +355,16 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): logger.error("Fail to write script file with taskid(%s) vnodeid(%s)" % (str(taskid),str(vnodeid))) else: try: - job_id = taskid.split('_')[1] + job_id = taskid.split('_')[0] except Exception as e: logger.error(traceback.format_exc()) job_id = "_none" jobdir = "batch_" + job_id logdir = "%s/global/users/%s/data/" % (self.fspath,username) + jobdir - if not os.path.exists(logdir): - logger.info("Directory:%s not exists, create it." % logdir) + try: os.mkdir(logdir) + except Exception as e: + logger.info("Error when creating logdir :%s "+str(e)) stdoutname = str(taskid)+"_"+str(vnodeid)+"_stdout.txt" stderrname = str(taskid)+"_"+str(vnodeid)+"_stderr.txt" try: diff --git a/web/templates/batch/batch_create.html b/web/templates/batch/batch_create.html index af07034..359b7ba 100644 --- a/web/templates/batch/batch_create.html +++ b/web/templates/batch/batch_create.html @@ -184,6 +184,13 @@ +'' +'' +'
' + +'
' + +'
' + +'' + +'
All vnodes  ' + +' One vnode(master)
' + +'' + +'
' +'
' +'
' +'
' diff --git a/web/templates/batch/batch_list.html b/web/templates/batch/batch_list.html index 3eb1b3a..1d92a9c 100644 --- a/web/templates/batch/batch_list.html +++ b/web/templates/batch/batch_list.html @@ -44,7 +44,7 @@ Task ID - Instance ID + Vnode ID Stdout Stderr diff --git a/web/templates/batch/batch_output.html b/web/templates/batch/batch_output.html index f22a61e..a17a8e4 100644 --- a/web/templates/batch/batch_output.html +++ b/web/templates/batch/batch_output.html @@ -3,7 +3,7 @@ - Docklet | Batch {{ issue }}: {{ jobid }}/{{ taskid }}/{{ instid }} + Docklet | Batch {{ issue }}: {{ jobid }}/{{ taskid }}/{{ vnodeid }} @@ -29,7 +29,7 @@

Jobid: {{ jobid }}

Taskid: {{ taskid }}

-

Instanceid: {{ instid }}

+

VNodeid: {{ vnodeid }}

The output of {{ issue }} will be updated in every 2 seconds.


{{ output }}
@@ -51,7 +51,7 @@ function updateOutput() { var host = window.location.host; - url = "//" + host + "/batch/job/output/" + "{{ masterip }}" + "/" + "{{ jobid }}" + "/" + "{{ taskid }}" + "/" + "{{ instid }}" + "/" + "{{ issue }}" + "/"; + url = "//" + host + "/batch/job/output/" + "{{ masterip }}" + "/" + "{{ jobid }}" + "/" + "{{ taskid }}" + "/" + "{{ vnodeid }}" + "/" + "{{ issue }}" + "/"; $.post(url,{},function(data){ $("#output").text(String(data.data)); },"json"); diff --git a/web/web.py b/web/web.py index a4ce826..d74c287 100755 --- a/web/web.py +++ b/web/web.py @@ -150,22 +150,22 @@ def add_batch_job(masterip): def state_batch_job(): return stateBatchJobView().as_view() -@app.route("/batch_job/output/////", methods=['GET']) +@app.route("/batch_job/output/////", methods=['GET']) @login_required -def output_batch_job(jobid, taskid, instid, issue): +def output_batch_job(jobid, taskid, vnodeid, issue): outputBatchJobView.jobid = jobid outputBatchJobView.taskid = taskid - outputBatchJobView.instid = instid + outputBatchJobView.vnodeid = vnodeid outputBatchJobView.issue = issue return outputBatchJobView().as_view() -@app.route("/batch/job/output//////", methods=['POST']) +@app.route("/batch/job/output//////", methods=['POST']) @login_required -def output_batch_job_request(masterip, jobid, taskid, instid, issue): +def output_batch_job_request(masterip, jobid, taskid, vnodeid, issue): data = { 'jobid':jobid, 'taskid':taskid, - 'instid':instid, + 'vnodeid':vnodeid, 'issue':issue } result = dockletRequest.post("/batch/job/output/",data,masterip) diff --git a/web/webViews/batch.py b/web/webViews/batch.py index 20afd44..9699560 100644 --- a/web/webViews/batch.py +++ b/web/webViews/batch.py @@ -56,7 +56,7 @@ class outputBatchJobView(normalView): template_path = "batch/batch_output.html" jobid = "" taskid = "" - instid = "" + vnodeid = "" issue = "" @classmethod @@ -65,7 +65,7 @@ class outputBatchJobView(normalView): data = { 'jobid':self.jobid, 'taskid':self.taskid, - 'instid':self.instid, + 'vnodeid':self.vnodeid, 'issue':self.issue } result = dockletRequest.post("/batch/job/output/",data,masterips[0].split("@")[0]) @@ -73,6 +73,6 @@ class outputBatchJobView(normalView): #logger.debug("job_list: %s" % job_list) if result.get('success',"") == "true": return self.render(self.template_path, masterip=masterips[0].split("@")[0], jobid=self.jobid, - taskid=self.taskid, instid=self.instid, issue=self.issue, output=output) + taskid=self.taskid, vnodeid=self.vnodeid, issue=self.issue, output=output) else: return self.error()