Merge pull request #311 from GallenShao/batch

update taskmgr
This commit is contained in:
GallenShao 2018-07-20 14:08:59 +08:00 committed by GitHub
commit 6b894d26e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 57 additions and 28 deletions

View File

@ -25,7 +25,7 @@ class TaskReporter(MasterServicer):
def report(self, request, context):
self.taskmgr.on_task_report(request)
return Reply(message=Reply.ACCEPTED)
return Reply(status=Reply.ACCEPTED, message='')
class TaskMgr(threading.Thread):
@ -42,9 +42,10 @@ class TaskMgr(threading.Thread):
# nodes
self.nodemgr = nodemgr
self.all_nodes = None
self.last_nodes_info_update_time = 0
self.nodes_info_update_interval = 30 # (s)
self.cpu_usage = {}
# self.all_nodes = None
# self.last_nodes_info_update_time = 0
# self.nodes_info_update_interval = 30 # (s)
def run(self):
@ -84,12 +85,15 @@ class TaskMgr(threading.Thread):
logger.warning('[on_task_report] wrong token')
return
if instance['status'] == Status.RUNNING and report.instanceStatus != Status.RUNNING:
self.cpu_usage[instance['worker']] -= task.cluster.instance.cpu
instance['status'] = report.instanceStatus
if report.instanceStatus == Status.RUNNING:
instance['last_update_time'] = time.time()
elif report.instanceStatus == Status.COMPLETED:
instance['last_update_time'] = time.time()
if report.instanceStatus == Status.COMPLETED:
check_task_completed(task)
elif report.instanceStatus == Status.FAILED || report.instanceStatus == Status.TIMEOUT:
elif report.instanceStatus == Status.FAILED or report.instanceStatus == Status.TIMEOUT:
if instance['try_count'] > task.maxRetryCount:
check_task_completed(task)
else:
@ -101,9 +105,9 @@ class TaskMgr(threading.Thread):
return
failed = False
for instance in task.instance_list:
if instance['status'] == Status.RUNNING || instance['status'] == Status.WAITING:
if instance['status'] == Status.RUNNING or instance['status'] == Status.WAITING:
return
if instance['status'] == Status.FAILED || instance['status'] == Status.TIMEOUT:
if instance['status'] == Status.FAILED or instance['status'] == Status.TIMEOUT:
if instance['try_count'] > task.maxRetryCount:
failed = True
else:
@ -133,15 +137,19 @@ class TaskMgr(threading.Thread):
instance['last_update_time'] = time.time()
instance['try_count'] += 1
instance['token'] = task.token
instance['worker'] = worker
self.cpu_usage[worker] += task.cluster.instance.cpu
try:
logger.info('[task_processor] processing %s' % task.id)
channel = grpc.insecure_channel('%s:50052' % worker)
stub = WorkerStub(channel)
response = stub.process_task(task)
logger.info('[task_processor] worker response: %d' response.message)
if response.status != Reply.ACCEPTED:
raise Exception(response.message)
except Exception as e:
logger.error('[task_processor] rpc error message: %s' e)
logger.error('[task_processor] rpc error message: %s' % e)
instance['status'] = Status.FAILED
instance['try_count'] -= 1
@ -154,7 +162,7 @@ class TaskMgr(threading.Thread):
if worker is not None:
# find instance to retry
for instance, index in enumerate(task.instance_list):
if (instance['status'] == Status.FAILED || instance['status'] == Status.TIMEOUT) and instance['try_count'] <= task.maxRetryCount:
if (instance['status'] == Status.FAILED or instance['status'] == Status.TIMEOUT) and instance['try_count'] <= task.maxRetryCount:
return task, index, worker
elif instance['status'] == Status.RUNNING:
if time.time() - instance['last_update_time'] > self.heart_beat_timeout:
@ -177,24 +185,46 @@ class TaskMgr(threading.Thread):
logger.warning('[task_scheduler] running nodes not found')
return None
for node in nodes:
# TODO
if True:
return node[0]
for worker_ip, worker_info in nodes:
if task.cluster.instance.cpu + get_cpu_usage(worker_ip) > worker_info['cpu']:
continue
if task.cluster.instance.memory > worker_info['memory']:
continue
if task.cluster.instance.disk > worker_info['disk']:
continue
if task.cluster.instance.gpu > worker_info['gpu']:
continue
return worker_ip
return None
def get_all_nodes(self):
# cache running nodes
if self.all_nodes is not None and time.time() - self.last_nodes_info_update_time < self.nodes_info_update_interval:
return self.all_nodes
# if self.all_nodes is not None and time.time() - self.last_nodes_info_update_time < self.nodes_info_update_interval:
# return self.all_nodes
# get running nodes
node_ips = self.nodemgr.get_nodeips()
self.all_nodes = []
for node_ip in node_ips:
fetcher = master.monitor.Fetcher(node_ip)
self.all_nodes.append((node_ip, fetcher.info))
return self.all_nodes
all_nodes = [(node_ip, get_worker_resource_info(node_ip)) for node_ip in node_ips]
return all_nodes
def get_worker_resource_info(self, worker_ip):
fetcher = master.monitor.Fetcher(worker_ip)
worker_info = fetcher.info
info = {}
info['cpu'] = len(worker_info['cpuconfig'])
info['memory'] = worker_info['meminfo']['free'] / 1024 # (Mb)
info['disk'] = sum([disk['free'] for disk in worker_info['diskinfo']]) / 1024 / 1024 # (Mb)
info['gpu'] = 0 # not support yet
return info
def get_cpu_usage(self, worker_ip):
try:
return self.cpu_usage[worker_ip]
except:
self.cpu_usage[worker_ip] = 0
return 0
def set_jobmgr(self, jobmgr):
@ -222,7 +252,7 @@ class TaskMgr(threading.Thread):
stderrRedirectPath = json_task['parameters']['stderrRedirectPath'],
stdoutRedirectPath = json_task['parameters']['stdoutRedirectPath']),
cluster = Cluster(
,image = Image(
image = Image(
name = json_task['cluster']['image']['name'],
type = json_task['cluster']['image']['type'],
owner = json_task['cluster']['image']['owner']),
@ -231,9 +261,8 @@ class TaskMgr(threading.Thread):
memory = json_task['cluster']['instance']['memory'],
disk = json_task['cluster']['instance']['disk'],
gpu = json_task['cluster']['instance']['gpu'])))
task.cluster.mount = []
for mount in json_task['cluster']['mount']:
task.cluster.mount.append(Mount(localPath=mount['localPath'], remotePath=mount['remotePath']))
task.cluster.mount = [Mount(localPath=mount['localPath'], remotePath=mount['remotePath'])
for mount in json_task['cluster']['mount']]
# local properties
task.status = Status.WAITING