Fix many bugs & beautifify web pages

This commit is contained in:
zhuyj17 2019-03-06 01:39:05 +08:00
parent ba53661e38
commit bbb96db7c9
14 changed files with 72 additions and 52 deletions

View File

@ -32,8 +32,8 @@ WEB_PORT=8888
USER_PORT=9100
#cluster net, default is 172.16.0.1/16
CLUSTER_NET="172.16.0.1/16"
# ip addresses range of containers for batch job, default is 10.0.3.0/24
BATCH_NET="10.0.3.0/24"
# ip addresses range of containers for batch job, default is 10.16.0.0/16
BATCH_NET="10.16.0.0/16"
#configurable-http-proxy public port, default is 8000
PROXY_PORT=8000
#configurable-http-proxy api port, default is 8001

View File

@ -36,8 +36,8 @@ WEB_PORT=8888
USER_PORT=9100
#cluster net, default is 172.16.0.1/16
CLUSTER_NET="172.16.0.1/16"
# ip addresses range of containers for batch job, default is 10.0.3.0/24
BATCH_NET="10.0.3.0/24"
# ip addresses range of containers for batch job, default is 10.16.0.0/16
BATCH_NET="10.16.0.0/16"
. $DOCKLET_CONF/docklet.conf

View File

@ -20,8 +20,8 @@ FS_PREFIX=/opt/docklet
# cluster net ip range, default is 172.16.0.1/16
CLUSTER_NET="172.16.0.1/16"
# ip addresses range of containers for batch job, default is 10.0.3.0/24
BATCH_NET="10.0.3.0/24"
# ip addresses range of containers for batch job, default is 10.16.0.0/16
BATCH_NET="10.16.0.0/16"
#configurable-http-proxy public port, default is 8000
PROXY_PORT=8000
#configurable-http-proxy api port, default is 8001

View File

@ -45,7 +45,7 @@ lxc.cgroup.cpu.cfs_quota_us = %CONTAINER_CPU%
lxc.cap.drop = sys_admin net_admin mac_admin mac_override sys_time sys_module
lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/data %ROOTFS%/root/nfs none bind,rw,create=dir 0 0
#lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/hosts/batch-%TASKID%.hosts %ROOTFS%/etc/hosts none bind,ro,create=file 0 0
lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/hosts/batch-%TASKID%.hosts %ROOTFS%/etc/hosts none bind,ro,create=file 0 0
lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/ssh %ROOTFS%/root/.ssh none bind,ro,create=dir 0 0
lxc.mount.entry = %FS_PREFIX%/local/temp/%LXCNAME%/ %ROOTFS%/tmp none bind,rw,create=dir 0 0

View File

@ -885,11 +885,11 @@ def get_output(user,beans,form):
global G_jobmgr
jobid = form.get("jobid","")
taskid = form.get("taskid","")
instid = form.get("instid","")
vnodeid = form.get("vnodeid","")
issue = form.get("issue","")
result = {
'success': 'true',
'data': G_jobmgr.get_output(user,jobid,taskid,instid,issue)
'data': G_jobmgr.get_output(user,jobid,taskid,vnodeid,issue)
}
return json.dumps(result)
@ -1099,7 +1099,7 @@ if __name__ == '__main__':
# server = http.server.HTTPServer((masterip, masterport), DockletHttpHandler)
logger.info("starting master server")
G_taskmgr = taskmgr.TaskMgr(G_nodemgr, monitor.Fetcher)
G_taskmgr = taskmgr.TaskMgr(G_nodemgr, monitor.Fetcher, ipaddr)
G_jobmgr = jobmgr.JobMgr(G_taskmgr)
G_taskmgr.set_jobmgr(G_jobmgr)
G_taskmgr.start()

View File

@ -69,7 +69,7 @@ class BatchJob(object):
self.tasks_cnt['pending'] -= 1
self.tasks_cnt['scheduling'] += 1
self.tasks[task_idx]['status'] = 'scheduling'
task_name = self.user + '_' + self.job_id + '_' + task_idx
task_name = self.job_id + '_' + task_idx
ret_tasks.append([task_name, self.tasks[task_idx]['config'], self.job_priority])
self.log_status()
return ret_tasks
@ -124,7 +124,7 @@ class BatchJob(object):
self.tasks_cnt['pending'] -= 1
self.tasks_cnt['scheduling'] += 1
self.tasks[out_idx]['status'] = 'scheduling'
task_name = self.user + '_' + self.job_id + '_' + out_idx
task_name = self.job_id + '_' + out_idx
ret_tasks.append([task_name, self.tasks[out_idx]['config'], self.job_priority])
self.log_status()
return ret_tasks
@ -250,16 +250,16 @@ class JobMgr():
return self.add_task_taskmgr(job.user, tasks)
# report task status from taskmgr when running, failed and finished
# task_name: user + '_' + job_id + '_' + task_idx
# task_name: job_id + '_' + task_idx
# status: 'running', 'finished', 'retrying', 'failed'
# reason: reason for failure or retrying, such as "FAILED", "TIMEOUT", "OUTPUTERROR"
# tried_times: how many times the task has been tried.
def report(self, task_name, status, reason="", tried_times=1):
def report(self, user, task_name, status, reason="", tried_times=1):
split_task_name = task_name.split('_')
if len(split_task_name) != 3:
if len(split_task_name) != 2:
logger.error("Illegal task_name(%s) report from taskmgr" % task_name)
return
user, job_id, task_idx = split_task_name
job_id, task_idx = split_task_name
job = self.job_map[job_id]
if status == "running":
job.update_task_running(task_idx)
@ -274,8 +274,8 @@ class JobMgr():
job.update_task_failed(task_idx, reason, tried_times)
# Get Batch job stdout or stderr from its file
def get_output(self, username, jobid, taskid, instid, issue):
filename = username + "_" + jobid + "_" + taskid + "_" + instid + "_" + issue + ".txt"
def get_output(self, username, jobid, taskid, vnodeid, issue):
filename = jobid + "_" + taskid + "_" + vnodeid + "_" + issue + ".txt"
fpath = "%s/global/users/%s/data/batch_%s/%s" % (self.fspath,username,jobid,filename)
logger.info("Get output from:%s" % fpath)
try:

View File

@ -59,14 +59,14 @@ class Task():
def gen_hosts(self):
username = self.username
taskid = self.id
# logger.info("Generate hosts for user(%s) task(%s) base_ip(%s)"%(username,taskid,str(self.task_base_ip)))
logger.info("Generate hosts for user(%s) task(%s) base_ip(%s)"%(username,taskid,str(self.task_base_ip)))
fspath = env.getenv('FS_PREFIX')
if not os.path.isdir("%s/global/users/%s" % (fspath,username)):
path = env.getenv('DOCKLET_LIB')
subprocess.call([path+"/master/userinit.sh", username])
# logger.info("user %s directory not found, create it" % username)
logger.info("user %s directory not found, create it" % username)
hosts_file = open("%s/global/users/%s/%s.hosts" % (fspath,username,"batch-"+taskid),"w")
hosts_file = open("%s/global/users/%s/hosts/%s.hosts" % (fspath,username,"batch-"+taskid),"w")
hosts_file.write("127.0.0.1 localhost\n")
i = 0
for ip in self.ips:
@ -124,7 +124,7 @@ class TaskMgr(threading.Thread):
#self.user_containers = {}
self.scheduler_interval = scheduler_interval
self.logger = external_logger
self.logger = logger
self.master_port = env.getenv('BATCH_MASTER_PORT')
self.worker_port = env.getenv('BATCH_WORKER_PORT')
@ -288,16 +288,17 @@ class TaskMgr(threading.Thread):
taskid = task.id
username = task.username
brname = "docklet-batch-%s-%s"%(username, taskid)
gwname = "Batch-%s-%s"%(username, taskid)
gwname = taskid
if task.task_base_ip == None:
return [False, "task.task_base_ip is None!"]
gatewayip = int_to_ip(self.base_ip + task.task_base_ip + 1)
gatewayipcidr = "/" + str(32-self.task_cidr)
gatewayipcidr = gatewayip + "/" + str(32-self.task_cidr)
netcontrol.new_bridge(brname)
netcontrol.setup_gw(brname,gwname,gatewayipcidr,0,0)
for wip in workers:
netcontrol.setup_gre(brname,wip)
if wip != self.master_ip:
netcontrol.setup_gre(brname,wip)
return [True, gatewayip]
def remove_tasknet(self, task):
@ -336,13 +337,14 @@ class TaskMgr(threading.Thread):
#if not username in self.user_containers.keys():
#self.user_containers[username] = []
#self.user_containers[username].append(container_name)
ipaddr = task.ips[vnode_info.vnodeid % task.max_size]
ipaddr = task.ips[vnode_info.vnodeid % task.max_size] + "/" + str(32-self.task_cidr)
brname = "docklet-batch-%s-%s" % (username, sub_task.root_task.id)
networkinfo = Network(ipaddr=ipaddr, gateway=gwip, masterip=self.master_ip, brname=brname)
vnode_info.vnode.network.CopyFrom(networkinfo)
placed_workers.append(sub_task.worker)
if not self.start_vnode(sub_task):
[success,msg] = self.start_vnode(sub_task)
if not success:
sub_task.waiting_for_retry()
sub_task.worker = None
start_all_vnode_success = False
@ -371,6 +373,7 @@ class TaskMgr(threading.Thread):
self.stop_task(sub_task)
if sub_task.vnode_started:
self.stop_vnode(sub_task)
#pass
def check_task_completed(self, task):
if task.status == RUNNING or task.status == WAITING:
@ -381,6 +384,12 @@ class TaskMgr(threading.Thread):
if task.at_same_time and task.status == FAILED:
self.clear_sub_tasks(task.subtask_list)
# TODO report to jobmgr
if self.jobmgr is None:
self.logger.error('[task_completed] jobmgr is None!')
else:
username = task.username
taskid = task.id
self.jobmgr.report(username,taskid,'finished')
self.lazy_delete_list.append(task)
return True
@ -401,7 +410,7 @@ class TaskMgr(threading.Thread):
# self.user_containers[username].remove(container_name)
if sub_task.status != RUNNING:
self.logger.error('[on_task_report] receive task report when instance is not running')
self.logger.error('[on_task_report] receive task report when vnode is not running')
sub_task.status = report.subTaskStatus
sub_task.status_reason = report.errmsg
@ -461,8 +470,11 @@ class TaskMgr(threading.Thread):
proper_workers.append(sub_task.worker)
continue
needs = sub_task.vnode_info.vnode.instance
#logger.info(needs)
proper_worker = None
for worker_ip, worker_info in nodes:
#logger.info(worker_info)
#logger.info(self.get_cpu_usage(worker_ip))
if needs.cpu + self.get_cpu_usage(worker_ip) > worker_info['cpu']:
continue
elif needs.memory > worker_info['memory']:
@ -542,7 +554,7 @@ class TaskMgr(threading.Thread):
task_id = taskid,
username = username,
# all vnode must be started at the same time
at_same_time = json_task['at_same_time'],
at_same_time = 'atSameTime' in json_task.keys(),
priority = task_priority,
max_size = (1 << self.task_cidr) - 2,
task_infos = [{
@ -579,8 +591,8 @@ class TaskMgr(threading.Thread):
timeout = int(json_task['expTime'])
# commands are executed in all vnodes / only excuted in the first vnode
# if in traditional mode, commands will be executed in all vnodes
) if (not json_task['at_same_time'] or json_task['multicommand'] or instance_index == 0) else None
} for instance_index in range(json_task['instCount'])])
) if (not 'atSameTime' in json_task.keys() or json_task['runon'] == 'all' or vnode_index == 0) else None
} for vnode_index in range(int(json_task['vnodeCount']))])
self.lazy_append_list.append(task)

View File

@ -18,7 +18,7 @@ def run():
inst = rpc_pb2.Instance(cpu=1, memory=1000, disk=1000, gpu=0)
mnt = rpc_pb2.Mount(localPath="",provider='aliyun',remotePath="test-for-docklet",other="oss-cn-beijing.aliyuncs.com",accessKey="LTAIdl7gmmIhfqA9",secretKey="")
network = rpc_pb2.Network(ipaddr="10.0.4.2/24",gateway="10.0.4.1",masterip="192.168.0.1",brname="batch-root-test")
vnode = rpc_pb2.VNode(image=img, instance=inst, mount=[],network=network)
vnode = rpc_pb2.VNode(image=img, instance=inst, mount=[],network=network,hostname="batch-5")
vnodeinfo = rpc_pb2.VNodeInfo(taskid="test",username="root",vnodeid=1,vnode=vnode)
#task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=1,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=60000,token=''.join(random.sample(string.ascii_letters + string.digits, 8)))
@ -59,8 +59,8 @@ def start_task():
if __name__ == '__main__':
#for i in range(10):
run()
#run()
#start_task()
#stop_vnode()
stop_vnode()
#time.sleep(4)
#stop_task()

View File

@ -75,7 +75,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
for gpu in gpus:
self.gpu_status[gpu['id']] = ""
#self.start_report()
self.start_report()
logger.info('TaskWorker init success')
def add_gpu_device(self, lxcname, gpu_need):
@ -194,7 +194,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
conffile.close()
logger.info("Start container %s..." % lxcname)
#container = lxc.Container(lxcname)
container = lxc.Container(lxcname)
ret = subprocess.run('lxc-start -n %s'%lxcname,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True)
if ret.returncode != 0:
logger.error('start container %s failed' % lxcname)
@ -355,15 +355,16 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
logger.error("Fail to write script file with taskid(%s) vnodeid(%s)" % (str(taskid),str(vnodeid)))
else:
try:
job_id = taskid.split('_')[1]
job_id = taskid.split('_')[0]
except Exception as e:
logger.error(traceback.format_exc())
job_id = "_none"
jobdir = "batch_" + job_id
logdir = "%s/global/users/%s/data/" % (self.fspath,username) + jobdir
if not os.path.exists(logdir):
logger.info("Directory:%s not exists, create it." % logdir)
try:
os.mkdir(logdir)
except Exception as e:
logger.info("Error when creating logdir :%s "+str(e))
stdoutname = str(taskid)+"_"+str(vnodeid)+"_stdout.txt"
stderrname = str(taskid)+"_"+str(vnodeid)+"_stderr.txt"
try:

View File

@ -184,6 +184,13 @@
+'</div>'
+'<label class="col-sm-2 control-label">Command</label>'
+'<div class="col-sm-3"><input type="text" class="form-control" name="command_' + task_number + '" id="command_' + task_number + '" />'
+'</div></div>'
+'<div class="form-group">'
+'<label class="col-sm-2 control-label">Run on: </label>'
+'<div class="col-sm-3"><input type="radio" name="runon_' + task_number + '" value="all" checked="checked"/>All vnodes &nbsp'
+' <input type="radio" name="runon_' + task_number + '" value="master" />One vnode(master)</div>'
+'<label class="col-sm-2 control-label">Start at the Same Time</label>'
+'<div class="col-sm-3"><input type="checkbox" name="atSameTime_' + task_number + '" checked="checked"/>'
+'</div></div>'
+'<div class="form-group"><label class="col-sm-2 control-label">Image Choose</label>'
+'<div class="col-sm-10">'

View File

@ -44,7 +44,7 @@
<thead>
<tr>
<th>Task ID</th>
<th>Instance ID</th>
<th>Vnode ID</th>
<th>Stdout</th>
<th>Stderr</th>
</tr>

View File

@ -3,7 +3,7 @@
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<title>Docklet | Batch {{ issue }}: {{ jobid }}/{{ taskid }}/{{ instid }}</title>
<title>Docklet | Batch {{ issue }}: {{ jobid }}/{{ taskid }}/{{ vnodeid }}</title>
<!-- Tell the browser to be responsive to screen width -->
<meta content="width=device-width, initial-scale=1, maximum-scale=1, user-scalable=no" name="viewport">
<link rel="shortcut icon" href="/static/img/favicon.ico">
@ -29,7 +29,7 @@
<body>
<h3>Jobid: {{ jobid }}</h3>
<h3>Taskid: {{ taskid }}</h3>
<h3>Instanceid: {{ instid }}</h3>
<h3>VNodeid: {{ vnodeid }}</h3>
<h4><small>The output of {{ issue }} will be updated in every 2 seconds.</small></h4>
<hr>
<pre id="output">{{ output }}</pre>
@ -51,7 +51,7 @@
function updateOutput()
{
var host = window.location.host;
url = "//" + host + "/batch/job/output/" + "{{ masterip }}" + "/" + "{{ jobid }}" + "/" + "{{ taskid }}" + "/" + "{{ instid }}" + "/" + "{{ issue }}" + "/";
url = "//" + host + "/batch/job/output/" + "{{ masterip }}" + "/" + "{{ jobid }}" + "/" + "{{ taskid }}" + "/" + "{{ vnodeid }}" + "/" + "{{ issue }}" + "/";
$.post(url,{},function(data){
$("#output").text(String(data.data));
},"json");

View File

@ -150,22 +150,22 @@ def add_batch_job(masterip):
def state_batch_job():
return stateBatchJobView().as_view()
@app.route("/batch_job/output/<jobid>/<taskid>/<instid>/<issue>/", methods=['GET'])
@app.route("/batch_job/output/<jobid>/<taskid>/<vnodeid>/<issue>/", methods=['GET'])
@login_required
def output_batch_job(jobid, taskid, instid, issue):
def output_batch_job(jobid, taskid, vnodeid, issue):
outputBatchJobView.jobid = jobid
outputBatchJobView.taskid = taskid
outputBatchJobView.instid = instid
outputBatchJobView.vnodeid = vnodeid
outputBatchJobView.issue = issue
return outputBatchJobView().as_view()
@app.route("/batch/job/output/<masterip>/<jobid>/<taskid>/<instid>/<issue>/", methods=['POST'])
@app.route("/batch/job/output/<masterip>/<jobid>/<taskid>/<vnodeid>/<issue>/", methods=['POST'])
@login_required
def output_batch_job_request(masterip, jobid, taskid, instid, issue):
def output_batch_job_request(masterip, jobid, taskid, vnodeid, issue):
data = {
'jobid':jobid,
'taskid':taskid,
'instid':instid,
'vnodeid':vnodeid,
'issue':issue
}
result = dockletRequest.post("/batch/job/output/",data,masterip)

View File

@ -56,7 +56,7 @@ class outputBatchJobView(normalView):
template_path = "batch/batch_output.html"
jobid = ""
taskid = ""
instid = ""
vnodeid = ""
issue = ""
@classmethod
@ -65,7 +65,7 @@ class outputBatchJobView(normalView):
data = {
'jobid':self.jobid,
'taskid':self.taskid,
'instid':self.instid,
'vnodeid':self.vnodeid,
'issue':self.issue
}
result = dockletRequest.post("/batch/job/output/",data,masterips[0].split("@")[0])
@ -73,6 +73,6 @@ class outputBatchJobView(normalView):
#logger.debug("job_list: %s" % job_list)
if result.get('success',"") == "true":
return self.render(self.template_path, masterip=masterips[0].split("@")[0], jobid=self.jobid,
taskid=self.taskid, instid=self.instid, issue=self.issue, output=output)
taskid=self.taskid, vnodeid=self.vnodeid, issue=self.issue, output=output)
else:
return self.error()