diff --git a/src/master/httprest.py b/src/master/httprest.py index 6d6c603..b17d0ff 100755 --- a/src/master/httprest.py +++ b/src/master/httprest.py @@ -792,6 +792,7 @@ def resetall_system(user, beans, form): @app.route("/batch/job/add/", methods=['POST']) @login_required +@beans_check def add_job(user,beans,form): global G_jobmgr job_data = form.to_dict() diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py index 09d39ff..5049bba 100644 --- a/src/master/jobmgr.py +++ b/src/master/jobmgr.py @@ -1,4 +1,4 @@ -import time, threading, random, string, os, traceback +import time, threading, random, string, os, traceback, requests import master.monitor import subprocess,json from functools import wraps @@ -30,7 +30,7 @@ class BatchJob(object): self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0} #init self.tasks & self.dependency_out & self.tasks_cnt - logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.job_db.user, self.job_db.name, str(self.job_db.create_time))) + logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.job_db.username, self.job_db.name, str(self.job_db.create_time))) raw_tasks = job_info["tasks"] self.tasks_cnt['pending'] = len(raw_tasks.keys()) for task_idx in raw_tasks.keys(): @@ -38,6 +38,7 @@ class BatchJob(object): task_db = Batchtask(jobid+"_"+task_idx, task_idx, task_info) self.job_db.tasks.append(task_db) self.tasks[task_idx] = {} + self.tasks[task_idx]['id'] = jobid+"_"+task_idx self.tasks[task_idx]['config'] = task_info self.tasks[task_idx]['db'] = task_db self.tasks[task_idx]['status'] = 'pending' @@ -83,6 +84,7 @@ class BatchJob(object): if update_status: self.tasks_cnt['pending'] -= 1 self.tasks_cnt['scheduling'] += 1 + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) self.tasks[task_idx]['db'].status = 'scheduling' self.tasks[task_idx]['status'] = 'scheduling' task_name = self.tasks[task_idx]['db'].id @@ -93,10 +95,12 @@ class BatchJob(object): @data_lock def stop_job(self): - self.job_db.status = 'stopped' for task_idx in self.tasks.keys(): self.tasks[task_idx]['status'] = 'stopped' + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) self.tasks[task_idx]['db'].status = 'stopped' + self.job_db = Batchjob.query.get(self.job_id) + self.job_db.status = 'stopped' db_commit() # update status of this job based @@ -119,8 +123,10 @@ class BatchJob(object): old_status = self.tasks[task_idx]['status'] self.tasks_cnt[old_status] -= 1 self.tasks[task_idx]['status'] = 'running' + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) self.tasks[task_idx]['db'].status = 'running' self.tasks_cnt['running'] += 1 + self.job_db = Batchjob.query.get(self.job_id) self._update_job_status() self.log_status() @@ -130,20 +136,22 @@ class BatchJob(object): if task_idx not in self.tasks.keys(): logger.error('Task_idx %s not in job. user:%s job_name:%s job_id:%s'%(task_idx, self.user, self.job_name, self.job_id)) return [] - logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished. Update dependency..." % (task_idx, self.job_id)) + logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished(running_time=%d,billing=%d). Update dependency..." % (task_idx, self.job_id, running_time, billing)) old_status = self.tasks[task_idx]['status'] self.tasks_cnt[old_status] -= 1 self.tasks[task_idx]['status'] = 'finished' + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) self.tasks[task_idx]['db'].status = 'finished' self.tasks[task_idx]['db'].tried_times += 1 self.tasks[task_idx]['db'].running_time = running_time self.tasks[task_idx]['db'].billing = billing + self.job_db = Batchjob.query.get(self.job_id) self.job_db.billing += billing self.tasks_cnt['finished'] += 1 if task_idx not in self.dependency_out.keys(): - self.log_status() self._update_job_status() + self.log_status() return [] ret_tasks = [] for out_idx in self.dependency_out[task_idx]: @@ -157,11 +165,12 @@ class BatchJob(object): self.tasks_cnt['pending'] -= 1 self.tasks_cnt['scheduling'] += 1 self.tasks[out_idx]['status'] = 'scheduling' - self.tasks[task_idx]['db'].status = 'scheduling' + self.tasks[out_idx]['db'] = Batchtask.query.get(self.tasks[out_idx]['id']) + self.tasks[out_idx]['db'].status = 'scheduling' task_name = self.job_id + '_' + out_idx ret_tasks.append([task_name, self.tasks[out_idx]['config'], self.job_priority]) - self.log_status() self._update_job_status() + self.log_status() return ret_tasks # update retrying status of task @@ -171,10 +180,12 @@ class BatchJob(object): old_status = self.tasks[task_idx]['status'] self.tasks_cnt[old_status] -= 1 self.tasks_cnt['retrying'] += 1 + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) self.tasks[task_idx]['db'].status = 'retrying' self.tasks[task_idx]['db'].failed_reason = reason self.tasks[task_idx]['db'].tried_times += 1 self.tasks[task_idx]['status'] = 'retrying' + self.job_db = Batchjob.query.get(self.job_id) self._update_job_status() self.log_status() @@ -186,11 +197,13 @@ class BatchJob(object): self.tasks_cnt[old_status] -= 1 self.tasks_cnt['failed'] += 1 self.tasks[task_idx]['status'] = 'failed' + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) self.tasks[task_idx]['db'].status = 'failed' self.tasks[task_idx]['db'].failed_reason = reason self.tasks[task_idx]['db'].tried_times += 1 self.tasks[task_idx]['db'].running_time = running_time self.tasks[task_idx]['db'].billing = billing + self.job_db = Batchjob.query.get(self.job_id) self.job_db.billing += billing self._update_job_status() self.log_status() @@ -219,6 +232,13 @@ class JobMgr(): self.taskmgr = taskmgr self.fspath = env.getenv('FS_PREFIX') self.lock = threading.Lock() + self.userpoint = "http://" + env.getenv('USER_IP') + ":" + str(env.getenv('USER_PORT')) + self.auth_key = env.getenv('AUTH_KEY') + + def charge_beans(self,username,billing): + data = {"owner_name":username,"billing":billing, "auth_key":self.auth_key} + url = "/billing/beans/" + return requests.post(self.userpoint+url,data=data).json() def add_lock(f): @wraps(f) @@ -234,7 +254,7 @@ class JobMgr(): return new_f @add_lock - def create_job(self, user, job_info) + def create_job(self, user, job_info): jobid = self.gen_jobid() job = BatchJob(jobid, user, job_info) return job @@ -244,7 +264,7 @@ class JobMgr(): # user submit a new job, add this job to queue and database def add_job(self, user, job_info): try: - job = create_job(user, job_info) + job = self.create_job(user, job_info) self.job_map[job.job_id] = job self.process_job(job) except ValueError as err: @@ -260,6 +280,8 @@ class JobMgr(): # jobid: the id of job def stop_job(self, user, job_id): logger.info("[jobmgr] stop job(id:%s) user(%s)"%(job_id, user)) + if job_id not in self.job_map.keys(): + return [False,"Job id %s does not exists! Maybe it has been finished."] try: job = self.job_map[job_id] if job.job_db.status == 'done' or job.job_db.status == 'failed': @@ -290,6 +312,7 @@ class JobMgr(): for t in tasks: tasks_vnodeCount[t.idx] = int(json.loads(t.config)['vnodeCount']) jobdata['tasks_vnodeCount'] = tasks_vnodeCount + res.append(jobdata) return res # user: username @@ -343,17 +366,21 @@ class JobMgr(): return job = self.job_map[job_id] if status == "running": + #logger.debug(str(job.job_db)) job.update_task_running(task_idx) + #logger.debug(str(job.job_db)) elif status == "finished": + #logger.debug(str(job.job_db)) + self.charge_beans(user, billing) next_tasks = job.finish_task(task_idx, running_time, billing) - if len(next_tasks) == 0: - del self.job_map[job_id] - return ret = self.add_task_taskmgr(user, next_tasks) + #logger.debug(str(job.job_db)) elif status == "retrying": job.update_task_retrying(task_idx, reason, tried_times) elif status == "failed": + self.charge_beans(user, billing) job.update_task_failed(task_idx, reason, tried_times, running_time, billing) + if job.job_db.status == 'done' or job.job_db.status == 'failed': del self.job_map[job_id] # Get Batch job stdout or stderr from its file diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index ec3b285..3ccbd7b 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -47,7 +47,7 @@ class Task(): max_retry_count = task_info['max_retry_count'] ) for (index, task_info) in enumerate(task_infos)] - def get_billing(self,running_time): + def get_billing(self): billing_beans = 0 running_time = 0 cpu_price = 1 / 3600.0 # /core*s @@ -60,6 +60,8 @@ class Task(): mem_beans = subtask.vnode_info.vnode.instance.memory / 1024.0 * tmp_time * mem_price disk_beans = subtask.vnode_info.vnode.instance.disk / 1024.0 * tmp_time * disk_price gpu_beans = subtask.vnode_info.vnode.instance.gpu * tmp_time * gpu_price + logger.info("subtask:%s running_time=%f beans for: cpu=%f mem_beans=%f disk_beans=%f gpu_beans=%f" + %(self.id, tmp_time, cpu_beans, mem_beans, disk_beans, gpu_beans )) beans = math.ceil(cpu_beans + mem_beans + disk_beans + gpu_beans) running_time += tmp_time billing_beans += beans @@ -426,6 +428,7 @@ class TaskMgr(threading.Thread): self.logger.info('task %s finished, status %d, subtasks: %s' % (task.id, task.status, str([sub_task.status for sub_task in task.subtask_list]))) self.stop_remove_task(task) running_time, billing = task.get_billing() + self.logger.info('task %s running_time:%s billing:%d'%(task.id, str(running_time), billing)) running_time = math.ceil(running_time) if task.status == FAILED: self.jobmgr.report(task.username,task.id,"failed",task.failed_reason,task.subtask_list[0].max_retry_count+1, running_time, billing)