billing beans when complete a task
This commit is contained in:
parent
75e60fc93c
commit
8c1996dc31
|
@ -126,7 +126,7 @@ class BatchJob(object):
|
|||
|
||||
# a task has finished, update dependency and return tasks without dependencies
|
||||
@data_lock
|
||||
def finish_task(self, task_idx):
|
||||
def finish_task(self, task_idx, running_time, billing):
|
||||
if task_idx not in self.tasks.keys():
|
||||
logger.error('Task_idx %s not in job. user:%s job_name:%s job_id:%s'%(task_idx, self.user, self.job_name, self.job_id))
|
||||
return []
|
||||
|
@ -136,6 +136,9 @@ class BatchJob(object):
|
|||
self.tasks[task_idx]['status'] = 'finished'
|
||||
self.tasks[task_idx]['db'].status = 'finished'
|
||||
self.tasks[task_idx]['db'].tried_times += 1
|
||||
self.tasks[task_idx]['db'].running_time = running_time
|
||||
self.tasks[task_idx]['db'].billing = billing
|
||||
self.job_db.billing += billing
|
||||
self.tasks_cnt['finished'] += 1
|
||||
|
||||
if task_idx not in self.dependency_out.keys():
|
||||
|
@ -177,7 +180,7 @@ class BatchJob(object):
|
|||
|
||||
# update failed status of task
|
||||
@data_lock
|
||||
def update_task_failed(self, task_idx, reason, tried_times):
|
||||
def update_task_failed(self, task_idx, reason, tried_times, running_time, billing):
|
||||
logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) failed. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times)))
|
||||
old_status = self.tasks[task_idx]['status']
|
||||
self.tasks_cnt[old_status] -= 1
|
||||
|
@ -186,6 +189,9 @@ class BatchJob(object):
|
|||
self.tasks[task_idx]['db'].status = 'failed'
|
||||
self.tasks[task_idx]['db'].failed_reason = reason
|
||||
self.tasks[task_idx]['db'].tried_times += 1
|
||||
self.tasks[task_idx]['db'].running_time = running_time
|
||||
self.tasks[task_idx]['db'].billing = billing
|
||||
self.job_db.billing += billing
|
||||
self._update_job_status()
|
||||
self.log_status()
|
||||
|
||||
|
@ -326,7 +332,7 @@ class JobMgr():
|
|||
# status: 'running', 'finished', 'retrying', 'failed'
|
||||
# reason: reason for failure or retrying, such as "FAILED", "TIMEOUT", "OUTPUTERROR"
|
||||
# tried_times: how many times the task has been tried.
|
||||
def report(self, user, task_name, status, reason="", tried_times=1):
|
||||
def report(self, user, task_name, status, reason="", tried_times=1, running_time=0, billing=0):
|
||||
split_task_name = task_name.split('_')
|
||||
if len(split_task_name) != 2:
|
||||
logger.error("[jobmgr report]Illegal task_name(%s) report from taskmgr" % task_name)
|
||||
|
@ -339,7 +345,7 @@ class JobMgr():
|
|||
if status == "running":
|
||||
job.update_task_running(task_idx)
|
||||
elif status == "finished":
|
||||
next_tasks = job.finish_task(task_idx)
|
||||
next_tasks = job.finish_task(task_idx, running_time, billing)
|
||||
if len(next_tasks) == 0:
|
||||
del self.job_map[job_id]
|
||||
return
|
||||
|
@ -347,7 +353,7 @@ class JobMgr():
|
|||
elif status == "retrying":
|
||||
job.update_task_retrying(task_idx, reason, tried_times)
|
||||
elif status == "failed":
|
||||
job.update_task_failed(task_idx, reason, tried_times)
|
||||
job.update_task_failed(task_idx, reason, tried_times, running_time, billing)
|
||||
del self.job_map[job_id]
|
||||
|
||||
# Get Batch job stdout or stderr from its file
|
||||
|
|
|
@ -3,7 +3,7 @@ import time
|
|||
import string
|
||||
import os
|
||||
import random, copy, subprocess
|
||||
import json
|
||||
import json, math
|
||||
from functools import wraps
|
||||
|
||||
# must import logger after initlogging, ugly
|
||||
|
@ -38,7 +38,6 @@ class Task():
|
|||
self.task_base_ip = None
|
||||
self.ips = None
|
||||
self.max_size = max_size
|
||||
self.subtask_running_time_sum = 0
|
||||
|
||||
self.subtask_list = [SubTask(
|
||||
idx = index,
|
||||
|
@ -48,6 +47,24 @@ class Task():
|
|||
max_retry_count = task_info['max_retry_count']
|
||||
) for (index, task_info) in enumerate(task_infos)]
|
||||
|
||||
def get_billing(self,running_time):
|
||||
billing_beans = 0
|
||||
running_time = 0
|
||||
cpu_price = 1 / 3600.0 # /core*s
|
||||
mem_price = 1 / 3600.0 # /GB*s
|
||||
disk_price = 1 / 3600.0 # /GB*s
|
||||
gpu_price = 100 / 3600.0 # /core*s
|
||||
for subtask in self.subtask_list:
|
||||
tmp_time = subtask.running_time
|
||||
cpu_beans = subtask.vnode_info.vnode.instance.cpu * tmp_time * cpu_price
|
||||
mem_beans = subtask.vnode_info.vnode.instance.memory / 1024.0 * tmp_time * mem_price
|
||||
disk_beans = subtask.vnode_info.vnode.instance.disk / 1024.0 * tmp_time * disk_price
|
||||
gpu_beans = subtask.vnode_info.vnode.instance.gpu * tmp_time * gpu_price
|
||||
beans = math.ceil(cpu_beans + mem_beans + disk_beans + gpu_beans)
|
||||
running_time += tmp_time
|
||||
billing_beans += beans
|
||||
return running_time, billing_beans
|
||||
|
||||
def __lt__(self, other):
|
||||
return self.priority < other.priority
|
||||
|
||||
|
@ -89,6 +106,7 @@ class SubTask():
|
|||
self.task_started = False
|
||||
self.start_at = 0
|
||||
self.end_at = 0
|
||||
self.running_time = 0
|
||||
self.status = WAITING
|
||||
self.status_reason = ''
|
||||
self.try_count = 0
|
||||
|
@ -243,6 +261,7 @@ class TaskMgr(threading.Thread):
|
|||
return [False, e]
|
||||
subtask.vnode_started = False
|
||||
subtask.end_at = time.time()
|
||||
subtask.running_time += subtask.end_at - subtask.start_at
|
||||
self.cpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.cpu
|
||||
self.gpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.gpu
|
||||
return [True, '']
|
||||
|
@ -405,11 +424,13 @@ class TaskMgr(threading.Thread):
|
|||
if sub_task.command_info != None and (sub_task.status == RUNNING or sub_task.status == WAITING):
|
||||
return False
|
||||
self.logger.info('task %s finished, status %d, subtasks: %s' % (task.id, task.status, str([sub_task.status for sub_task in task.subtask_list])))
|
||||
if task.status == FAILED:
|
||||
self.jobmgr.report(task.username,task.id,"failed",task.failed_reason,task.subtask_list[0].max_retry_count+1)
|
||||
else:
|
||||
self.jobmgr.report(task.username,task.id,'finished')
|
||||
self.stop_remove_task(task)
|
||||
running_time, billing = task.get_billing()
|
||||
running_time = math.ceil(running_time)
|
||||
if task.status == FAILED:
|
||||
self.jobmgr.report(task.username,task.id,"failed",task.failed_reason,task.subtask_list[0].max_retry_count+1, running_time, billing)
|
||||
else:
|
||||
self.jobmgr.report(task.username,task.id,'finished',running_time=running_time,billing=billing)
|
||||
return True
|
||||
|
||||
# this method is called when worker send heart-beat rpc request
|
||||
|
|
|
@ -474,9 +474,6 @@ class Batchjob(db.Model):
|
|||
info['end_time'] = "------"
|
||||
else:
|
||||
info['end_time'] = self.end_time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
if self.billing == 0:
|
||||
info['billing'] = '--'
|
||||
else:
|
||||
info['billing'] = self.billing
|
||||
return json.dumps(info)
|
||||
|
||||
|
@ -487,8 +484,9 @@ class Batchtask(db.Model):
|
|||
jobid = db.Column(db.String(9), db.ForeignKey('batchjob.id'))
|
||||
status = db.Column(db.String(15))
|
||||
failed_reason = db.Column(db.Text)
|
||||
schedueled_time = db.Column(db.DateTime)
|
||||
start_time = db.Column(db.DateTime)
|
||||
end_time = db.Column(db.DateTime)
|
||||
running_time = db.Column(db.Integer)
|
||||
billing = db.Column(db.Integer)
|
||||
config = db.Column(db.Text)
|
||||
tried_times = db.Column(db.Integer)
|
||||
|
@ -498,8 +496,9 @@ class Batchtask(db.Model):
|
|||
self.idx = idx
|
||||
self.status = "pending"
|
||||
self.failed_reason = ""
|
||||
self.schedueled_time = None
|
||||
self.start_time = None
|
||||
self.end_time = None
|
||||
self.running_time = 0
|
||||
self.billing = 0
|
||||
self.config = json.dumps(config)
|
||||
self.tried_times = 0
|
||||
|
@ -511,17 +510,15 @@ class Batchtask(db.Model):
|
|||
info['jobid'] = self.jobid
|
||||
info['status'] = self.status
|
||||
info['failed_reason'] = self.failed_reason
|
||||
if self.schedueled_time is None:
|
||||
info['schedueled_time'] = "------"
|
||||
if self.start_time is None:
|
||||
info['start_time'] = "------"
|
||||
else:
|
||||
info['schedueled_time'] = self.schedueled_time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
info['start_time'] = self.start_time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
if self.end_time is None:
|
||||
info['end_time'] = "------"
|
||||
else:
|
||||
info['end_time'] = self.end_time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
if self.billing == 0:
|
||||
info['billing'] = "--"
|
||||
else:
|
||||
info['running_time'] = self.running_time
|
||||
info['billing'] = self.billing
|
||||
info['config'] = json.loads(self.config)
|
||||
info['tried_times'] = self.tried_times
|
||||
|
|
Loading…
Reference in New Issue