Fix some bugs

This commit is contained in:
Firmlyzhu 2019-03-28 19:55:35 +08:00
parent 8c1996dc31
commit 16a49b7ad4
3 changed files with 44 additions and 13 deletions

View File

@ -792,6 +792,7 @@ def resetall_system(user, beans, form):
@app.route("/batch/job/add/", methods=['POST']) @app.route("/batch/job/add/", methods=['POST'])
@login_required @login_required
@beans_check
def add_job(user,beans,form): def add_job(user,beans,form):
global G_jobmgr global G_jobmgr
job_data = form.to_dict() job_data = form.to_dict()

View File

@ -1,4 +1,4 @@
import time, threading, random, string, os, traceback import time, threading, random, string, os, traceback, requests
import master.monitor import master.monitor
import subprocess,json import subprocess,json
from functools import wraps from functools import wraps
@ -30,7 +30,7 @@ class BatchJob(object):
self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0} self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0}
#init self.tasks & self.dependency_out & self.tasks_cnt #init self.tasks & self.dependency_out & self.tasks_cnt
logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.job_db.user, self.job_db.name, str(self.job_db.create_time))) logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.job_db.username, self.job_db.name, str(self.job_db.create_time)))
raw_tasks = job_info["tasks"] raw_tasks = job_info["tasks"]
self.tasks_cnt['pending'] = len(raw_tasks.keys()) self.tasks_cnt['pending'] = len(raw_tasks.keys())
for task_idx in raw_tasks.keys(): for task_idx in raw_tasks.keys():
@ -38,6 +38,7 @@ class BatchJob(object):
task_db = Batchtask(jobid+"_"+task_idx, task_idx, task_info) task_db = Batchtask(jobid+"_"+task_idx, task_idx, task_info)
self.job_db.tasks.append(task_db) self.job_db.tasks.append(task_db)
self.tasks[task_idx] = {} self.tasks[task_idx] = {}
self.tasks[task_idx]['id'] = jobid+"_"+task_idx
self.tasks[task_idx]['config'] = task_info self.tasks[task_idx]['config'] = task_info
self.tasks[task_idx]['db'] = task_db self.tasks[task_idx]['db'] = task_db
self.tasks[task_idx]['status'] = 'pending' self.tasks[task_idx]['status'] = 'pending'
@ -83,6 +84,7 @@ class BatchJob(object):
if update_status: if update_status:
self.tasks_cnt['pending'] -= 1 self.tasks_cnt['pending'] -= 1
self.tasks_cnt['scheduling'] += 1 self.tasks_cnt['scheduling'] += 1
self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'scheduling' self.tasks[task_idx]['db'].status = 'scheduling'
self.tasks[task_idx]['status'] = 'scheduling' self.tasks[task_idx]['status'] = 'scheduling'
task_name = self.tasks[task_idx]['db'].id task_name = self.tasks[task_idx]['db'].id
@ -93,10 +95,12 @@ class BatchJob(object):
@data_lock @data_lock
def stop_job(self): def stop_job(self):
self.job_db.status = 'stopped'
for task_idx in self.tasks.keys(): for task_idx in self.tasks.keys():
self.tasks[task_idx]['status'] = 'stopped' self.tasks[task_idx]['status'] = 'stopped'
self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'stopped' self.tasks[task_idx]['db'].status = 'stopped'
self.job_db = Batchjob.query.get(self.job_id)
self.job_db.status = 'stopped'
db_commit() db_commit()
# update status of this job based # update status of this job based
@ -119,8 +123,10 @@ class BatchJob(object):
old_status = self.tasks[task_idx]['status'] old_status = self.tasks[task_idx]['status']
self.tasks_cnt[old_status] -= 1 self.tasks_cnt[old_status] -= 1
self.tasks[task_idx]['status'] = 'running' self.tasks[task_idx]['status'] = 'running'
self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'running' self.tasks[task_idx]['db'].status = 'running'
self.tasks_cnt['running'] += 1 self.tasks_cnt['running'] += 1
self.job_db = Batchjob.query.get(self.job_id)
self._update_job_status() self._update_job_status()
self.log_status() self.log_status()
@ -130,20 +136,22 @@ class BatchJob(object):
if task_idx not in self.tasks.keys(): if task_idx not in self.tasks.keys():
logger.error('Task_idx %s not in job. user:%s job_name:%s job_id:%s'%(task_idx, self.user, self.job_name, self.job_id)) logger.error('Task_idx %s not in job. user:%s job_name:%s job_id:%s'%(task_idx, self.user, self.job_name, self.job_id))
return [] return []
logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished. Update dependency..." % (task_idx, self.job_id)) logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished(running_time=%d,billing=%d). Update dependency..." % (task_idx, self.job_id, running_time, billing))
old_status = self.tasks[task_idx]['status'] old_status = self.tasks[task_idx]['status']
self.tasks_cnt[old_status] -= 1 self.tasks_cnt[old_status] -= 1
self.tasks[task_idx]['status'] = 'finished' self.tasks[task_idx]['status'] = 'finished'
self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'finished' self.tasks[task_idx]['db'].status = 'finished'
self.tasks[task_idx]['db'].tried_times += 1 self.tasks[task_idx]['db'].tried_times += 1
self.tasks[task_idx]['db'].running_time = running_time self.tasks[task_idx]['db'].running_time = running_time
self.tasks[task_idx]['db'].billing = billing self.tasks[task_idx]['db'].billing = billing
self.job_db = Batchjob.query.get(self.job_id)
self.job_db.billing += billing self.job_db.billing += billing
self.tasks_cnt['finished'] += 1 self.tasks_cnt['finished'] += 1
if task_idx not in self.dependency_out.keys(): if task_idx not in self.dependency_out.keys():
self.log_status()
self._update_job_status() self._update_job_status()
self.log_status()
return [] return []
ret_tasks = [] ret_tasks = []
for out_idx in self.dependency_out[task_idx]: for out_idx in self.dependency_out[task_idx]:
@ -157,11 +165,12 @@ class BatchJob(object):
self.tasks_cnt['pending'] -= 1 self.tasks_cnt['pending'] -= 1
self.tasks_cnt['scheduling'] += 1 self.tasks_cnt['scheduling'] += 1
self.tasks[out_idx]['status'] = 'scheduling' self.tasks[out_idx]['status'] = 'scheduling'
self.tasks[task_idx]['db'].status = 'scheduling' self.tasks[out_idx]['db'] = Batchtask.query.get(self.tasks[out_idx]['id'])
self.tasks[out_idx]['db'].status = 'scheduling'
task_name = self.job_id + '_' + out_idx task_name = self.job_id + '_' + out_idx
ret_tasks.append([task_name, self.tasks[out_idx]['config'], self.job_priority]) ret_tasks.append([task_name, self.tasks[out_idx]['config'], self.job_priority])
self.log_status()
self._update_job_status() self._update_job_status()
self.log_status()
return ret_tasks return ret_tasks
# update retrying status of task # update retrying status of task
@ -171,10 +180,12 @@ class BatchJob(object):
old_status = self.tasks[task_idx]['status'] old_status = self.tasks[task_idx]['status']
self.tasks_cnt[old_status] -= 1 self.tasks_cnt[old_status] -= 1
self.tasks_cnt['retrying'] += 1 self.tasks_cnt['retrying'] += 1
self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'retrying' self.tasks[task_idx]['db'].status = 'retrying'
self.tasks[task_idx]['db'].failed_reason = reason self.tasks[task_idx]['db'].failed_reason = reason
self.tasks[task_idx]['db'].tried_times += 1 self.tasks[task_idx]['db'].tried_times += 1
self.tasks[task_idx]['status'] = 'retrying' self.tasks[task_idx]['status'] = 'retrying'
self.job_db = Batchjob.query.get(self.job_id)
self._update_job_status() self._update_job_status()
self.log_status() self.log_status()
@ -186,11 +197,13 @@ class BatchJob(object):
self.tasks_cnt[old_status] -= 1 self.tasks_cnt[old_status] -= 1
self.tasks_cnt['failed'] += 1 self.tasks_cnt['failed'] += 1
self.tasks[task_idx]['status'] = 'failed' self.tasks[task_idx]['status'] = 'failed'
self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'failed' self.tasks[task_idx]['db'].status = 'failed'
self.tasks[task_idx]['db'].failed_reason = reason self.tasks[task_idx]['db'].failed_reason = reason
self.tasks[task_idx]['db'].tried_times += 1 self.tasks[task_idx]['db'].tried_times += 1
self.tasks[task_idx]['db'].running_time = running_time self.tasks[task_idx]['db'].running_time = running_time
self.tasks[task_idx]['db'].billing = billing self.tasks[task_idx]['db'].billing = billing
self.job_db = Batchjob.query.get(self.job_id)
self.job_db.billing += billing self.job_db.billing += billing
self._update_job_status() self._update_job_status()
self.log_status() self.log_status()
@ -219,6 +232,13 @@ class JobMgr():
self.taskmgr = taskmgr self.taskmgr = taskmgr
self.fspath = env.getenv('FS_PREFIX') self.fspath = env.getenv('FS_PREFIX')
self.lock = threading.Lock() self.lock = threading.Lock()
self.userpoint = "http://" + env.getenv('USER_IP') + ":" + str(env.getenv('USER_PORT'))
self.auth_key = env.getenv('AUTH_KEY')
def charge_beans(self,username,billing):
data = {"owner_name":username,"billing":billing, "auth_key":self.auth_key}
url = "/billing/beans/"
return requests.post(self.userpoint+url,data=data).json()
def add_lock(f): def add_lock(f):
@wraps(f) @wraps(f)
@ -234,7 +254,7 @@ class JobMgr():
return new_f return new_f
@add_lock @add_lock
def create_job(self, user, job_info) def create_job(self, user, job_info):
jobid = self.gen_jobid() jobid = self.gen_jobid()
job = BatchJob(jobid, user, job_info) job = BatchJob(jobid, user, job_info)
return job return job
@ -244,7 +264,7 @@ class JobMgr():
# user submit a new job, add this job to queue and database # user submit a new job, add this job to queue and database
def add_job(self, user, job_info): def add_job(self, user, job_info):
try: try:
job = create_job(user, job_info) job = self.create_job(user, job_info)
self.job_map[job.job_id] = job self.job_map[job.job_id] = job
self.process_job(job) self.process_job(job)
except ValueError as err: except ValueError as err:
@ -260,6 +280,8 @@ class JobMgr():
# jobid: the id of job # jobid: the id of job
def stop_job(self, user, job_id): def stop_job(self, user, job_id):
logger.info("[jobmgr] stop job(id:%s) user(%s)"%(job_id, user)) logger.info("[jobmgr] stop job(id:%s) user(%s)"%(job_id, user))
if job_id not in self.job_map.keys():
return [False,"Job id %s does not exists! Maybe it has been finished."]
try: try:
job = self.job_map[job_id] job = self.job_map[job_id]
if job.job_db.status == 'done' or job.job_db.status == 'failed': if job.job_db.status == 'done' or job.job_db.status == 'failed':
@ -290,6 +312,7 @@ class JobMgr():
for t in tasks: for t in tasks:
tasks_vnodeCount[t.idx] = int(json.loads(t.config)['vnodeCount']) tasks_vnodeCount[t.idx] = int(json.loads(t.config)['vnodeCount'])
jobdata['tasks_vnodeCount'] = tasks_vnodeCount jobdata['tasks_vnodeCount'] = tasks_vnodeCount
res.append(jobdata)
return res return res
# user: username # user: username
@ -343,17 +366,21 @@ class JobMgr():
return return
job = self.job_map[job_id] job = self.job_map[job_id]
if status == "running": if status == "running":
#logger.debug(str(job.job_db))
job.update_task_running(task_idx) job.update_task_running(task_idx)
#logger.debug(str(job.job_db))
elif status == "finished": elif status == "finished":
#logger.debug(str(job.job_db))
self.charge_beans(user, billing)
next_tasks = job.finish_task(task_idx, running_time, billing) next_tasks = job.finish_task(task_idx, running_time, billing)
if len(next_tasks) == 0:
del self.job_map[job_id]
return
ret = self.add_task_taskmgr(user, next_tasks) ret = self.add_task_taskmgr(user, next_tasks)
#logger.debug(str(job.job_db))
elif status == "retrying": elif status == "retrying":
job.update_task_retrying(task_idx, reason, tried_times) job.update_task_retrying(task_idx, reason, tried_times)
elif status == "failed": elif status == "failed":
self.charge_beans(user, billing)
job.update_task_failed(task_idx, reason, tried_times, running_time, billing) job.update_task_failed(task_idx, reason, tried_times, running_time, billing)
if job.job_db.status == 'done' or job.job_db.status == 'failed':
del self.job_map[job_id] del self.job_map[job_id]
# Get Batch job stdout or stderr from its file # Get Batch job stdout or stderr from its file

View File

@ -47,7 +47,7 @@ class Task():
max_retry_count = task_info['max_retry_count'] max_retry_count = task_info['max_retry_count']
) for (index, task_info) in enumerate(task_infos)] ) for (index, task_info) in enumerate(task_infos)]
def get_billing(self,running_time): def get_billing(self):
billing_beans = 0 billing_beans = 0
running_time = 0 running_time = 0
cpu_price = 1 / 3600.0 # /core*s cpu_price = 1 / 3600.0 # /core*s
@ -60,6 +60,8 @@ class Task():
mem_beans = subtask.vnode_info.vnode.instance.memory / 1024.0 * tmp_time * mem_price mem_beans = subtask.vnode_info.vnode.instance.memory / 1024.0 * tmp_time * mem_price
disk_beans = subtask.vnode_info.vnode.instance.disk / 1024.0 * tmp_time * disk_price disk_beans = subtask.vnode_info.vnode.instance.disk / 1024.0 * tmp_time * disk_price
gpu_beans = subtask.vnode_info.vnode.instance.gpu * tmp_time * gpu_price gpu_beans = subtask.vnode_info.vnode.instance.gpu * tmp_time * gpu_price
logger.info("subtask:%s running_time=%f beans for: cpu=%f mem_beans=%f disk_beans=%f gpu_beans=%f"
%(self.id, tmp_time, cpu_beans, mem_beans, disk_beans, gpu_beans ))
beans = math.ceil(cpu_beans + mem_beans + disk_beans + gpu_beans) beans = math.ceil(cpu_beans + mem_beans + disk_beans + gpu_beans)
running_time += tmp_time running_time += tmp_time
billing_beans += beans billing_beans += beans
@ -426,6 +428,7 @@ class TaskMgr(threading.Thread):
self.logger.info('task %s finished, status %d, subtasks: %s' % (task.id, task.status, str([sub_task.status for sub_task in task.subtask_list]))) self.logger.info('task %s finished, status %d, subtasks: %s' % (task.id, task.status, str([sub_task.status for sub_task in task.subtask_list])))
self.stop_remove_task(task) self.stop_remove_task(task)
running_time, billing = task.get_billing() running_time, billing = task.get_billing()
self.logger.info('task %s running_time:%s billing:%d'%(task.id, str(running_time), billing))
running_time = math.ceil(running_time) running_time = math.ceil(running_time)
if task.status == FAILED: if task.status == FAILED:
self.jobmgr.report(task.username,task.id,"failed",task.failed_reason,task.subtask_list[0].max_retry_count+1, running_time, billing) self.jobmgr.report(task.username,task.id,"failed",task.failed_reason,task.subtask_list[0].max_retry_count+1, running_time, billing)