New Batch Framework
This commit is contained in:
parent
583bd42e90
commit
e386131cff
|
@ -16,13 +16,14 @@ from protos.rpc_pb2_grpc import MasterServicer, add_MasterServicer_to_server, Wo
|
|||
|
||||
from utils import env
|
||||
|
||||
|
||||
class Task():
|
||||
def __init__(self, info, priority):
|
||||
self.info = info
|
||||
self.status = WAITING
|
||||
self.instance_list = []
|
||||
self.subtask_list = []
|
||||
self.token = ''
|
||||
self.atSameTime = True
|
||||
self.multicommand = True
|
||||
# priority the bigger the better
|
||||
# self.priority the smaller the better
|
||||
self.priority = int(time.time()) / 60 / 60 - priority
|
||||
|
@ -30,7 +31,6 @@ class Task():
|
|||
def __lt__(self, other):
|
||||
return self.priority < other.priority
|
||||
|
||||
|
||||
class TaskReporter(MasterServicer):
|
||||
|
||||
def __init__(self, taskmgr):
|
||||
|
@ -41,7 +41,6 @@ class TaskReporter(MasterServicer):
|
|||
self.taskmgr.on_task_report(task_report)
|
||||
return Reply(status=Reply.ACCEPTED, message='')
|
||||
|
||||
|
||||
class TaskMgr(threading.Thread):
|
||||
|
||||
# load task information from etcd
|
||||
|
@ -55,7 +54,7 @@ class TaskMgr(threading.Thread):
|
|||
self.lazy_append_list = []
|
||||
self.lazy_delete_list = []
|
||||
self.task_queue_lock = threading.Lock()
|
||||
self.user_containers = {}
|
||||
#self.user_containers = {}
|
||||
|
||||
self.scheduler_interval = scheduler_interval
|
||||
self.logger = logger
|
||||
|
@ -87,13 +86,12 @@ class TaskMgr(threading.Thread):
|
|||
self.serve()
|
||||
while not self.thread_stop:
|
||||
self.sort_out_task_queue()
|
||||
task, instance_id, worker = self.task_scheduler()
|
||||
if task is not None and worker is not None:
|
||||
self.task_processor(task, instance_id, worker)
|
||||
task, workers = self.task_scheduler()
|
||||
if task is not None and workers is not None:
|
||||
self.task_processor(task, workers)
|
||||
else:
|
||||
time.sleep(self.scheduler_interval)
|
||||
|
||||
|
||||
def serve(self):
|
||||
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
|
||||
add_MasterServicer_to_server(TaskReporter(self), self.server)
|
||||
|
@ -101,104 +99,11 @@ class TaskMgr(threading.Thread):
|
|||
self.server.start()
|
||||
self.logger.info('[taskmgr_rpc] start rpc server')
|
||||
|
||||
|
||||
def stop(self):
|
||||
self.thread_stop = True
|
||||
self.server.stop(0)
|
||||
self.logger.info('[taskmgr_rpc] stop rpc server')
|
||||
|
||||
|
||||
# this method is called when worker send heart-beat rpc request
|
||||
def on_task_report(self, report):
|
||||
self.logger.info('[on_task_report] receive task report: id %s-%d, status %d' % (report.taskid, report.instanceid, report.instanceStatus))
|
||||
task = self.get_task(report.taskid)
|
||||
if task == None:
|
||||
self.logger.error('[on_task_report] task not found')
|
||||
return
|
||||
|
||||
instance = task.instance_list[report.instanceid]
|
||||
if instance['token'] != report.token:
|
||||
self.logger.warning('[on_task_report] wrong token')
|
||||
return
|
||||
username = task.info.username
|
||||
container_name = username + '-batch-' + task.info.id + '-' + str(report.instanceid) + '-' + report.token
|
||||
self.user_containers[username].remove(container_name)
|
||||
|
||||
if instance['status'] != RUNNING:
|
||||
self.logger.error('[on_task_report] receive task report when instance is not running')
|
||||
|
||||
if instance['status'] == RUNNING and report.instanceStatus != RUNNING:
|
||||
self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu
|
||||
self.gpu_usage[instance['worker']] -= task.info.cluster.instance.gpu
|
||||
|
||||
instance['status'] = report.instanceStatus
|
||||
instance['error_msg'] = report.errmsg
|
||||
|
||||
if report.instanceStatus == COMPLETED:
|
||||
self.check_task_completed(task)
|
||||
elif report.instanceStatus == FAILED or report.instanceStatus == TIMEOUT:
|
||||
if instance['try_count'] > task.info.maxRetryCount:
|
||||
self.check_task_completed(task)
|
||||
else:
|
||||
reason = 'FAILED' if report.instanceStatus == FAILED else 'TIMEOUT'
|
||||
self.task_retrying(task, reason, instance['try_count'])
|
||||
elif report.instanceStatus == OUTPUTERROR:
|
||||
self.task_failed(task,"OUTPUTERROR")
|
||||
|
||||
|
||||
def check_task_completed(self, task):
|
||||
if len(task.instance_list) < task.info.instanceCount:
|
||||
return
|
||||
failed = False
|
||||
reason = "FAILED"
|
||||
for instance in task.instance_list:
|
||||
if instance['status'] == RUNNING or instance['status'] == WAITING:
|
||||
return
|
||||
if instance['status'] == FAILED or instance['status'] == TIMEOUT:
|
||||
if instance['try_count'] > task.info.maxRetryCount:
|
||||
failed = True
|
||||
if instance['status'] == TIMEOUT:
|
||||
reason = "TIMEOUT"
|
||||
else:
|
||||
return
|
||||
if instance['status'] == OUTPUTERROR:
|
||||
failed = True
|
||||
break
|
||||
|
||||
if failed:
|
||||
self.task_failed(task,reason)
|
||||
else:
|
||||
self.task_completed(task)
|
||||
|
||||
|
||||
def task_completed(self, task):
|
||||
task.status = COMPLETED
|
||||
|
||||
if self.jobmgr is None:
|
||||
self.logger.error('[task_completed] jobmgr is None!')
|
||||
else:
|
||||
self.jobmgr.report(task.info.id,'finished')
|
||||
self.logger.info('task %s completed' % task.info.id)
|
||||
self.lazy_delete_list.append(task)
|
||||
|
||||
|
||||
def task_failed(self, task, reason):
|
||||
task.status = FAILED
|
||||
|
||||
if self.jobmgr is None:
|
||||
self.logger.error('[task_failed] jobmgr is None!')
|
||||
else:
|
||||
self.jobmgr.report(task.info.id,'failed', reason, task.info.maxRetryCount+1)
|
||||
self.logger.info('task %s failed' % task.info.id)
|
||||
self.lazy_delete_list.append(task)
|
||||
|
||||
def task_retrying(self, task, reason, tried_times):
|
||||
if self.jobmgr is None:
|
||||
self.logger.error('[task_retrying] jobmgr is None!')
|
||||
else:
|
||||
self.jobmgr.report(task.info.id,'retrying',reason,tried_times)
|
||||
|
||||
|
||||
@queue_lock
|
||||
def sort_out_task_queue(self):
|
||||
while self.lazy_delete_list:
|
||||
|
@ -210,63 +115,15 @@ class TaskMgr(threading.Thread):
|
|||
self.task_queue.append(task)
|
||||
self.task_queue = sorted(self.task_queue, key=lambda x: x.priority)
|
||||
|
||||
|
||||
def task_processor(self, task, instance_id, worker_ip):
|
||||
task.status = RUNNING
|
||||
self.jobmgr.report(task.info.id,'running')
|
||||
|
||||
# properties for transaction
|
||||
task.info.instanceid = instance_id
|
||||
task.info.token = ''.join(random.sample(string.ascii_letters + string.digits, 8))
|
||||
|
||||
instance = task.instance_list[instance_id]
|
||||
instance['status'] = RUNNING
|
||||
instance['try_count'] += 1
|
||||
instance['token'] = task.info.token
|
||||
instance['worker'] = worker_ip
|
||||
|
||||
self.cpu_usage[worker_ip] += task.info.cluster.instance.cpu
|
||||
self.gpu_usage[worker_ip] += task.info.cluster.instance.gpu
|
||||
username = task.info.username
|
||||
container_name = task.info.username + '-batch-' + task.info.id + '-' + str(instance_id) + '-' + task.info.token
|
||||
if not username in self.user_containers.keys():
|
||||
self.user_containers[username] = []
|
||||
self.user_containers[username].append(container_name)
|
||||
|
||||
try:
|
||||
self.logger.info('[task_processor] processing task [%s] instance [%d]' % (task.info.id, task.info.instanceid))
|
||||
channel = grpc.insecure_channel('%s:%s' % (worker_ip, self.worker_port))
|
||||
stub = WorkerStub(channel)
|
||||
response = stub.process_task(task.info)
|
||||
if response.status != Reply.ACCEPTED:
|
||||
raise Exception(response.message)
|
||||
except Exception as e:
|
||||
self.logger.error('[task_processor] rpc error message: %s' % e)
|
||||
instance['status'] = FAILED
|
||||
instance['try_count'] -= 1
|
||||
self.user_containers[username].remove(container_name)
|
||||
|
||||
|
||||
# return task, worker
|
||||
# return task, workers
|
||||
def task_scheduler(self):
|
||||
# simple FIFO with priority
|
||||
self.logger.info('[task_scheduler] scheduling... (%d tasks remains)' % len(self.task_queue))
|
||||
|
||||
# nodes = self.get_all_nodes()
|
||||
# if nodes is None or len(nodes) == 0:
|
||||
# self.logger.info('[task_scheduler] no nodes found')
|
||||
# else:
|
||||
# for worker_ip, worker_info in nodes:
|
||||
# self.logger.info('[task_scheduler] nodes %s' % worker_ip)
|
||||
# for key in worker_info:
|
||||
# if key == 'cpu':
|
||||
# self.logger.info('[task_scheduler] %s: %d/%d' % (key, self.get_cpu_usage(worker_ip), worker_info[key]))
|
||||
# else:
|
||||
# self.logger.info('[task_scheduler] %s: %d' % (key, worker_info[key]))
|
||||
|
||||
for task in self.task_queue:
|
||||
if task in self.lazy_delete_list:
|
||||
continue
|
||||
|
||||
worker = self.find_proper_worker(task)
|
||||
|
||||
for index, instance in enumerate(task.instance_list):
|
||||
|
@ -297,27 +154,35 @@ class TaskMgr(threading.Thread):
|
|||
|
||||
self.check_task_completed(task)
|
||||
|
||||
return None, None, None
|
||||
return None, None
|
||||
|
||||
def find_proper_worker(self, task):
|
||||
def find_proper_workers(self, vnodes_configs):
|
||||
nodes = self.get_all_nodes()
|
||||
if nodes is None or len(nodes) == 0:
|
||||
self.logger.warning('[task_scheduler] running nodes not found')
|
||||
return None
|
||||
|
||||
proper_workers = []
|
||||
for needs in vnodes_configs:
|
||||
for worker_ip, worker_info in nodes:
|
||||
if task.info.cluster.instance.cpu + self.get_cpu_usage(worker_ip) > worker_info['cpu']:
|
||||
if needs['cpu'] + self.get_cpu_usage(worker_ip) > worker_info['cpu']:
|
||||
continue
|
||||
if task.info.cluster.instance.memory > worker_info['memory']:
|
||||
elif needs['memory'] > worker_info['memory']:
|
||||
continue
|
||||
# try not to assign non-gpu task to a worker with gpu
|
||||
if task.info.cluster.instance.gpu == 0 and worker_info['gpu'] > 0:
|
||||
#if needs['gpu'] == 0 and worker_info['gpu'] > 0:
|
||||
#continue
|
||||
elif needs['gpu'] + self.get_gpu_usage(worker_ip) > worker_info['gpu']:
|
||||
continue
|
||||
if task.info.cluster.instance.gpu + self.get_gpu_usage(worker_ip) > worker_info['gpu']:
|
||||
continue
|
||||
return worker_ip
|
||||
else:
|
||||
worker_info['cpu'] -= needs['cpu']
|
||||
worker_info['memory'] -= needs['memory']
|
||||
worker_info['gpu'] -= needs['gpu']
|
||||
proper_workers.append(worker_ip)
|
||||
break
|
||||
else:
|
||||
return None
|
||||
|
||||
return proper_workers
|
||||
|
||||
def get_all_nodes(self):
|
||||
# cache running nodes
|
||||
|
@ -328,12 +193,10 @@ class TaskMgr(threading.Thread):
|
|||
all_nodes = [(node_ip, self.get_worker_resource_info(node_ip)) for node_ip in node_ips]
|
||||
return all_nodes
|
||||
|
||||
|
||||
def is_alive(self, worker):
|
||||
nodes = self.nodemgr.get_batch_nodeips()
|
||||
return worker in nodes
|
||||
|
||||
|
||||
def get_worker_resource_info(self, worker_ip):
|
||||
fetcher = self.monitor_fetcher(worker_ip)
|
||||
worker_info = fetcher.info
|
||||
|
@ -344,27 +207,6 @@ class TaskMgr(threading.Thread):
|
|||
info['gpu'] = len(worker_info['gpuinfo'])
|
||||
return info
|
||||
|
||||
|
||||
def get_cpu_usage(self, worker_ip):
|
||||
try:
|
||||
return self.cpu_usage[worker_ip]
|
||||
except:
|
||||
self.cpu_usage[worker_ip] = 0
|
||||
return 0
|
||||
|
||||
|
||||
def get_gpu_usage(self, worker_ip):
|
||||
try:
|
||||
return self.gpu_usage[worker_ip]
|
||||
except:
|
||||
self.gpu_usage[worker_ip] = 0
|
||||
return 0
|
||||
|
||||
|
||||
def set_jobmgr(self, jobmgr):
|
||||
self.jobmgr = jobmgr
|
||||
|
||||
|
||||
# save the task information into database
|
||||
# called when jobmgr assign task to taskmgr
|
||||
def add_task(self, username, taskid, json_task, task_priority=1):
|
||||
|
|
|
@ -0,0 +1,424 @@
|
|||
import threading
|
||||
import time
|
||||
import string
|
||||
import random
|
||||
import json
|
||||
from functools import wraps
|
||||
|
||||
# must import logger after initlogging, ugly
|
||||
from utils.log import logger
|
||||
|
||||
# grpc
|
||||
from concurrent import futures
|
||||
import grpc
|
||||
from protos.rpc_pb2 import *
|
||||
from protos.rpc_pb2_grpc import MasterServicer, add_MasterServicer_to_server, WorkerStub
|
||||
|
||||
from utils import env
|
||||
|
||||
|
||||
class Task():
|
||||
def __init__(self, info, priority):
|
||||
self.info = info
|
||||
self.status = WAITING
|
||||
self.instance_list = []
|
||||
self.token = ''
|
||||
# priority the bigger the better
|
||||
# self.priority the smaller the better
|
||||
self.priority = int(time.time()) / 60 / 60 - priority
|
||||
|
||||
def __lt__(self, other):
|
||||
return self.priority < other.priority
|
||||
|
||||
|
||||
class TaskReporter(MasterServicer):
|
||||
|
||||
def __init__(self, taskmgr):
|
||||
self.taskmgr = taskmgr
|
||||
|
||||
def report(self, request, context):
|
||||
for task_report in request.taskmsgs:
|
||||
self.taskmgr.on_task_report(task_report)
|
||||
return Reply(status=Reply.ACCEPTED, message='')
|
||||
|
||||
|
||||
class TaskMgr(threading.Thread):
|
||||
|
||||
# load task information from etcd
|
||||
# initial a task queue and task schedueler
|
||||
# taskmgr: a taskmgr instance
|
||||
def __init__(self, nodemgr, monitor_fetcher, scheduler_interval=2, external_logger=None):
|
||||
threading.Thread.__init__(self)
|
||||
self.thread_stop = False
|
||||
self.jobmgr = None
|
||||
self.task_queue = []
|
||||
self.lazy_append_list = []
|
||||
self.lazy_delete_list = []
|
||||
self.task_queue_lock = threading.Lock()
|
||||
self.user_containers = {}
|
||||
|
||||
self.scheduler_interval = scheduler_interval
|
||||
self.logger = logger
|
||||
|
||||
self.master_port = env.getenv('BATCH_MASTER_PORT')
|
||||
self.worker_port = env.getenv('BATCH_WORKER_PORT')
|
||||
|
||||
# nodes
|
||||
self.nodemgr = nodemgr
|
||||
self.monitor_fetcher = monitor_fetcher
|
||||
self.cpu_usage = {}
|
||||
self.gpu_usage = {}
|
||||
# self.all_nodes = None
|
||||
# self.last_nodes_info_update_time = 0
|
||||
# self.nodes_info_update_interval = 30 # (s)
|
||||
|
||||
|
||||
def queue_lock(f):
|
||||
@wraps(f)
|
||||
def new_f(self, *args, **kwargs):
|
||||
self.task_queue_lock.acquire()
|
||||
result = f(self, *args, **kwargs)
|
||||
self.task_queue_lock.release()
|
||||
return result
|
||||
return new_f
|
||||
|
||||
|
||||
def run(self):
|
||||
self.serve()
|
||||
while not self.thread_stop:
|
||||
self.sort_out_task_queue()
|
||||
task, instance_id, worker = self.task_scheduler()
|
||||
if task is not None and worker is not None:
|
||||
self.task_processor(task, instance_id, worker)
|
||||
else:
|
||||
time.sleep(self.scheduler_interval)
|
||||
|
||||
|
||||
def serve(self):
|
||||
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
|
||||
add_MasterServicer_to_server(TaskReporter(self), self.server)
|
||||
self.server.add_insecure_port('[::]:' + self.master_port)
|
||||
self.server.start()
|
||||
self.logger.info('[taskmgr_rpc] start rpc server')
|
||||
|
||||
|
||||
def stop(self):
|
||||
self.thread_stop = True
|
||||
self.server.stop(0)
|
||||
self.logger.info('[taskmgr_rpc] stop rpc server')
|
||||
|
||||
|
||||
# this method is called when worker send heart-beat rpc request
|
||||
def on_task_report(self, report):
|
||||
self.logger.info('[on_task_report] receive task report: id %s-%d, status %d' % (report.taskid, report.instanceid, report.instanceStatus))
|
||||
task = self.get_task(report.taskid)
|
||||
if task == None:
|
||||
self.logger.error('[on_task_report] task not found')
|
||||
return
|
||||
|
||||
instance = task.instance_list[report.instanceid]
|
||||
if instance['token'] != report.token:
|
||||
self.logger.warning('[on_task_report] wrong token')
|
||||
return
|
||||
username = task.info.username
|
||||
container_name = username + '-batch-' + task.info.id + '-' + str(report.instanceid) + '-' + report.token
|
||||
self.user_containers[username].remove(container_name)
|
||||
|
||||
if instance['status'] != RUNNING:
|
||||
self.logger.error('[on_task_report] receive task report when instance is not running')
|
||||
|
||||
if instance['status'] == RUNNING and report.instanceStatus != RUNNING:
|
||||
self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu
|
||||
self.gpu_usage[instance['worker']] -= task.info.cluster.instance.gpu
|
||||
|
||||
instance['status'] = report.instanceStatus
|
||||
instance['error_msg'] = report.errmsg
|
||||
|
||||
if report.instanceStatus == COMPLETED:
|
||||
self.check_task_completed(task)
|
||||
elif report.instanceStatus == FAILED or report.instanceStatus == TIMEOUT:
|
||||
if instance['try_count'] > task.info.maxRetryCount:
|
||||
self.check_task_completed(task)
|
||||
else:
|
||||
reason = 'FAILED' if report.instanceStatus == FAILED else 'TIMEOUT'
|
||||
self.task_retrying(task, reason, instance['try_count'])
|
||||
elif report.instanceStatus == OUTPUTERROR:
|
||||
self.task_failed(task,"OUTPUTERROR")
|
||||
|
||||
|
||||
def check_task_completed(self, task):
|
||||
if len(task.instance_list) < task.info.instanceCount:
|
||||
return
|
||||
failed = False
|
||||
reason = "FAILED"
|
||||
for instance in task.instance_list:
|
||||
if instance['status'] == RUNNING or instance['status'] == WAITING:
|
||||
return
|
||||
if instance['status'] == FAILED or instance['status'] == TIMEOUT:
|
||||
if instance['try_count'] > task.info.maxRetryCount:
|
||||
failed = True
|
||||
if instance['status'] == TIMEOUT:
|
||||
reason = "TIMEOUT"
|
||||
else:
|
||||
return
|
||||
if instance['status'] == OUTPUTERROR:
|
||||
failed = True
|
||||
break
|
||||
|
||||
if failed:
|
||||
self.task_failed(task,reason)
|
||||
else:
|
||||
self.task_completed(task)
|
||||
|
||||
|
||||
def task_completed(self, task):
|
||||
task.status = COMPLETED
|
||||
|
||||
if self.jobmgr is None:
|
||||
self.logger.error('[task_completed] jobmgr is None!')
|
||||
else:
|
||||
self.jobmgr.report(task.info.id,'finished')
|
||||
self.logger.info('task %s completed' % task.info.id)
|
||||
self.lazy_delete_list.append(task)
|
||||
|
||||
|
||||
def task_failed(self, task, reason):
|
||||
task.status = FAILED
|
||||
|
||||
if self.jobmgr is None:
|
||||
self.logger.error('[task_failed] jobmgr is None!')
|
||||
else:
|
||||
self.jobmgr.report(task.info.id,'failed', reason, task.info.maxRetryCount+1)
|
||||
self.logger.info('task %s failed' % task.info.id)
|
||||
self.lazy_delete_list.append(task)
|
||||
|
||||
def task_retrying(self, task, reason, tried_times):
|
||||
if self.jobmgr is None:
|
||||
self.logger.error('[task_retrying] jobmgr is None!')
|
||||
else:
|
||||
self.jobmgr.report(task.info.id,'retrying',reason,tried_times)
|
||||
|
||||
|
||||
@queue_lock
|
||||
def sort_out_task_queue(self):
|
||||
while self.lazy_delete_list:
|
||||
task = self.lazy_delete_list.pop(0)
|
||||
self.task_queue.remove(task)
|
||||
if self.lazy_append_list:
|
||||
while self.lazy_append_list:
|
||||
task = self.lazy_append_list.pop(0)
|
||||
self.task_queue.append(task)
|
||||
self.task_queue = sorted(self.task_queue, key=lambda x: x.priority)
|
||||
|
||||
|
||||
def task_processor(self, task, instance_id, worker_ip):
|
||||
task.status = RUNNING
|
||||
self.jobmgr.report(task.info.id,'running')
|
||||
|
||||
# properties for transaction
|
||||
task.info.instanceid = instance_id
|
||||
task.info.token = ''.join(random.sample(string.ascii_letters + string.digits, 8))
|
||||
|
||||
instance = task.instance_list[instance_id]
|
||||
instance['status'] = RUNNING
|
||||
instance['try_count'] += 1
|
||||
instance['token'] = task.info.token
|
||||
instance['worker'] = worker_ip
|
||||
|
||||
self.cpu_usage[worker_ip] += task.info.cluster.instance.cpu
|
||||
self.gpu_usage[worker_ip] += task.info.cluster.instance.gpu
|
||||
username = task.info.username
|
||||
container_name = task.info.username + '-batch-' + task.info.id + '-' + str(instance_id) + '-' + task.info.token
|
||||
if not username in self.user_containers.keys():
|
||||
self.user_containers[username] = []
|
||||
self.user_containers[username].append(container_name)
|
||||
|
||||
try:
|
||||
self.logger.info('[task_processor] processing task [%s] instance [%d]' % (task.info.id, task.info.instanceid))
|
||||
channel = grpc.insecure_channel('%s:%s' % (worker_ip, self.worker_port))
|
||||
stub = WorkerStub(channel)
|
||||
response = stub.process_task(task.info)
|
||||
if response.status != Reply.ACCEPTED:
|
||||
raise Exception(response.message)
|
||||
except Exception as e:
|
||||
self.logger.error('[task_processor] rpc error message: %s' % e)
|
||||
instance['status'] = FAILED
|
||||
instance['try_count'] -= 1
|
||||
self.user_containers[username].remove(container_name)
|
||||
|
||||
|
||||
# return task, worker
|
||||
def task_scheduler(self):
|
||||
# simple FIFO with priority
|
||||
self.logger.info('[task_scheduler] scheduling... (%d tasks remains)' % len(self.task_queue))
|
||||
|
||||
# nodes = self.get_all_nodes()
|
||||
# if nodes is None or len(nodes) == 0:
|
||||
# self.logger.info('[task_scheduler] no nodes found')
|
||||
# else:
|
||||
# for worker_ip, worker_info in nodes:
|
||||
# self.logger.info('[task_scheduler] nodes %s' % worker_ip)
|
||||
# for key in worker_info:
|
||||
# if key == 'cpu':
|
||||
# self.logger.info('[task_scheduler] %s: %d/%d' % (key, self.get_cpu_usage(worker_ip), worker_info[key]))
|
||||
# else:
|
||||
# self.logger.info('[task_scheduler] %s: %d' % (key, worker_info[key]))
|
||||
|
||||
for task in self.task_queue:
|
||||
if task in self.lazy_delete_list:
|
||||
continue
|
||||
worker = self.find_proper_worker(task)
|
||||
|
||||
for index, instance in enumerate(task.instance_list):
|
||||
# find instance to retry
|
||||
if (instance['status'] == FAILED or instance['status'] == TIMEOUT) and instance['try_count'] <= task.info.maxRetryCount:
|
||||
if worker is not None:
|
||||
self.logger.info('[task_scheduler] retry')
|
||||
return task, index, worker
|
||||
# find timeout instance
|
||||
elif instance['status'] == RUNNING:
|
||||
if not self.is_alive(instance['worker']):
|
||||
instance['status'] = FAILED
|
||||
instance['token'] = ''
|
||||
self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu
|
||||
self.gpu_usage[instance['worker']] -= task.info.cluster.instance.gpu
|
||||
|
||||
self.logger.warning('[task_scheduler] worker dead, retry task [%s] instance [%d]' % (task.info.id, index))
|
||||
if worker is not None:
|
||||
return task, index, worker
|
||||
|
||||
if worker is not None:
|
||||
# start new instance
|
||||
if len(task.instance_list) < task.info.instanceCount:
|
||||
instance = {}
|
||||
instance['try_count'] = 0
|
||||
task.instance_list.append(instance)
|
||||
return task, len(task.instance_list) - 1, worker
|
||||
|
||||
self.check_task_completed(task)
|
||||
|
||||
return None, None, None
|
||||
|
||||
def find_proper_worker(self, task):
|
||||
nodes = self.get_all_nodes()
|
||||
if nodes is None or len(nodes) == 0:
|
||||
self.logger.warning('[task_scheduler] running nodes not found')
|
||||
return None
|
||||
|
||||
for worker_ip, worker_info in nodes:
|
||||
if task.info.cluster.instance.cpu + self.get_cpu_usage(worker_ip) > worker_info['cpu']:
|
||||
continue
|
||||
if task.info.cluster.instance.memory > worker_info['memory']:
|
||||
continue
|
||||
# try not to assign non-gpu task to a worker with gpu
|
||||
if task.info.cluster.instance.gpu == 0 and worker_info['gpu'] > 0:
|
||||
continue
|
||||
if task.info.cluster.instance.gpu + self.get_gpu_usage(worker_ip) > worker_info['gpu']:
|
||||
continue
|
||||
return worker_ip
|
||||
return None
|
||||
|
||||
|
||||
def get_all_nodes(self):
|
||||
# cache running nodes
|
||||
# if self.all_nodes is not None and time.time() - self.last_nodes_info_update_time < self.nodes_info_update_interval:
|
||||
# return self.all_nodes
|
||||
# get running nodes
|
||||
node_ips = self.nodemgr.get_batch_nodeips()
|
||||
all_nodes = [(node_ip, self.get_worker_resource_info(node_ip)) for node_ip in node_ips]
|
||||
return all_nodes
|
||||
|
||||
|
||||
def is_alive(self, worker):
|
||||
nodes = self.nodemgr.get_batch_nodeips()
|
||||
return worker in nodes
|
||||
|
||||
|
||||
def get_worker_resource_info(self, worker_ip):
|
||||
fetcher = self.monitor_fetcher(worker_ip)
|
||||
worker_info = fetcher.info
|
||||
info = {}
|
||||
info['cpu'] = len(worker_info['cpuconfig'])
|
||||
info['memory'] = (worker_info['meminfo']['buffers'] + worker_info['meminfo']['cached'] + worker_info['meminfo']['free']) / 1024 # (Mb)
|
||||
info['disk'] = sum([disk['free'] for disk in worker_info['diskinfo']]) / 1024 / 1024 # (Mb)
|
||||
info['gpu'] = len(worker_info['gpuinfo'])
|
||||
return info
|
||||
|
||||
|
||||
def get_cpu_usage(self, worker_ip):
|
||||
try:
|
||||
return self.cpu_usage[worker_ip]
|
||||
except:
|
||||
self.cpu_usage[worker_ip] = 0
|
||||
return 0
|
||||
|
||||
|
||||
def get_gpu_usage(self, worker_ip):
|
||||
try:
|
||||
return self.gpu_usage[worker_ip]
|
||||
except:
|
||||
self.gpu_usage[worker_ip] = 0
|
||||
return 0
|
||||
|
||||
|
||||
def set_jobmgr(self, jobmgr):
|
||||
self.jobmgr = jobmgr
|
||||
|
||||
|
||||
# save the task information into database
|
||||
# called when jobmgr assign task to taskmgr
|
||||
def add_task(self, username, taskid, json_task, task_priority=1):
|
||||
# decode json string to object defined in grpc
|
||||
self.logger.info('[taskmgr add_task] receive task %s' % taskid)
|
||||
image_dict = {
|
||||
"private": Image.PRIVATE,
|
||||
"base": Image.BASE,
|
||||
"public": Image.PUBLIC
|
||||
}
|
||||
# json_task = json.loads(json_task)
|
||||
task = Task(TaskInfo(
|
||||
id = taskid,
|
||||
username = username,
|
||||
instanceCount = int(json_task['instCount']),
|
||||
maxRetryCount = int(json_task['retryCount']),
|
||||
timeout = int(json_task['expTime']),
|
||||
parameters = Parameters(
|
||||
command = Command(
|
||||
commandLine = json_task['command'],
|
||||
packagePath = json_task['srcAddr'],
|
||||
envVars = {}),
|
||||
stderrRedirectPath = json_task.get('stdErrRedPth',""),
|
||||
stdoutRedirectPath = json_task.get('stdOutRedPth',"")),
|
||||
cluster = Cluster(
|
||||
image = Image(
|
||||
name = json_task['image'].split('_')[0], #json_task['cluster']['image']['name'],
|
||||
type = image_dict[json_task['image'].split('_')[2]], #json_task['cluster']['image']['type'],
|
||||
owner = username if not json_task['image'].split('_')[1] else json_task['image'].split('_')[1]), #json_task['cluster']['image']['owner']),
|
||||
instance = Instance(
|
||||
cpu = int(json_task['cpuSetting']),
|
||||
memory = int(json_task['memorySetting']),
|
||||
disk = int(json_task['diskSetting']),
|
||||
gpu = int(json_task['gpuSetting'])))),
|
||||
priority=task_priority)
|
||||
if 'mapping' in json_task:
|
||||
task.info.cluster.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'],
|
||||
remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir'])
|
||||
for mapping_key in json_task['mapping']])
|
||||
self.lazy_append_list.append(task)
|
||||
|
||||
|
||||
# user: username
|
||||
# get the information of a task, including the status, task description and other information
|
||||
@queue_lock
|
||||
def get_task(self, taskid):
|
||||
for task in self.task_queue:
|
||||
if task.info.id == taskid:
|
||||
return task
|
||||
return None
|
||||
|
||||
# get names of all the batch containers of the user
|
||||
def get_user_batch_containers(self,username):
|
||||
if not username in self.user_containers.keys():
|
||||
return []
|
||||
else:
|
||||
return self.user_containers[username]
|
|
@ -5,8 +5,18 @@ service Master {
|
|||
}
|
||||
|
||||
service Worker {
|
||||
rpc process_task (TaskInfo) returns (Reply) {}
|
||||
rpc stop_tasks (ReportMsg) returns (Reply) {}
|
||||
rpc start_vnode (VNodeInfo) returns (Reply) {}
|
||||
rpc start_task (TaskInfo) returns (Reply) {}
|
||||
rpc stop_task (ReportMsg) returns (Reply) {}
|
||||
rpc stop_vnode (VNodeInfo) returns (Reply) {}
|
||||
}
|
||||
|
||||
message VNodeInfo {
|
||||
string taskid = 1;
|
||||
string username = 2;
|
||||
int32 vnodeid = 3;
|
||||
Parameters parameters = 4; // 参数
|
||||
VNode vnode = 5; // 集群配置
|
||||
}
|
||||
|
||||
message Reply {
|
||||
|
@ -26,8 +36,8 @@ message ReportMsg {
|
|||
message TaskMsg {
|
||||
string taskid = 1;
|
||||
string username = 2;
|
||||
int32 instanceid = 3;
|
||||
Status instanceStatus = 4; // 任务状态
|
||||
int32 vnodeid = 3;
|
||||
Status subTaskStatus = 4; // 任务状态
|
||||
string token = 5;
|
||||
string errmsg = 6;
|
||||
}
|
||||
|
@ -42,16 +52,12 @@ enum Status {
|
|||
}
|
||||
|
||||
message TaskInfo {
|
||||
string id = 1;
|
||||
string taskid = 1;
|
||||
string username = 2;
|
||||
int32 instanceid = 3;
|
||||
int32 instanceCount = 4; // 实例个数
|
||||
int32 maxRetryCount = 5; // 最大重试次数
|
||||
Parameters parameters = 6; // 参数
|
||||
Cluster cluster = 7; // 集群配置
|
||||
int32 timeout = 8; // 超时阈值
|
||||
string token = 9;
|
||||
bool reused = 10; //是否重用
|
||||
int32 vnodeid = 3;
|
||||
Parameters parameters = 4; // 参数
|
||||
int32 timeout = 5; // 超时阈值
|
||||
string token = 6;
|
||||
}
|
||||
|
||||
message Parameters {
|
||||
|
@ -66,10 +72,18 @@ message Command {
|
|||
map<string, string> envVars = 3; // 自定义环境变量
|
||||
}
|
||||
|
||||
message Cluster {
|
||||
message VNode {
|
||||
Image image = 1; // 镜像配置
|
||||
Instance instance = 2; // 实例配置
|
||||
repeated Mount mount = 3; // 挂载配置
|
||||
Network network = 4; //网络配置
|
||||
}
|
||||
|
||||
message Network {
|
||||
string ipaddr = 1;
|
||||
string gateway = 2;
|
||||
string masterip = 3;
|
||||
string brname = 4;
|
||||
}
|
||||
|
||||
message Image {
|
||||
|
|
|
@ -20,7 +20,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
|
|||
name='rpc.proto',
|
||||
package='',
|
||||
syntax='proto3',
|
||||
serialized_pb=_b('\n\trpc.proto\"f\n\x05Reply\x12\"\n\x06status\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"\'\n\tReportMsg\x12\x1a\n\x08taskmsgs\x18\x01 \x03(\x0b\x32\x08.TaskMsg\"\x7f\n\x07TaskMsg\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x12\n\ninstanceid\x18\x03 \x01(\x05\x12\x1f\n\x0einstanceStatus\x18\x04 \x01(\x0e\x32\x07.Status\x12\r\n\x05token\x18\x05 \x01(\t\x12\x0e\n\x06\x65rrmsg\x18\x06 \x01(\t\"\xd6\x01\n\x08TaskInfo\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x12\n\ninstanceid\x18\x03 \x01(\x05\x12\x15\n\rinstanceCount\x18\x04 \x01(\x05\x12\x15\n\rmaxRetryCount\x18\x05 \x01(\x05\x12\x1f\n\nparameters\x18\x06 \x01(\x0b\x32\x0b.Parameters\x12\x19\n\x07\x63luster\x18\x07 \x01(\x0b\x32\x08.Cluster\x12\x0f\n\x07timeout\x18\x08 \x01(\x05\x12\r\n\x05token\x18\t \x01(\t\x12\x0e\n\x06reused\x18\n \x01(\x08\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"T\n\x07\x43luster\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\"t\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\".\n\tImageType\x12\x08\n\x04\x42\x41SE\x10\x00\x12\n\n\x06PUBLIC\x10\x01\x12\x0b\n\x07PRIVATE\x10\x02\"u\n\x05Mount\x12\x10\n\x08provider\x18\x01 \x01(\t\x12\x11\n\tlocalPath\x18\x02 \x01(\t\x12\x12\n\nremotePath\x18\x03 \x01(\t\x12\x11\n\taccessKey\x18\x04 \x01(\t\x12\x11\n\tsecretKey\x18\x05 \x01(\t\x12\r\n\x05other\x18\x06 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05*[\n\x06Status\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x12\x0f\n\x0bOUTPUTERROR\x10\x05\x32(\n\x06Master\x12\x1e\n\x06report\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x32Q\n\x06Worker\x12#\n\x0cprocess_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x12\"\n\nstop_tasks\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x62\x06proto3')
|
||||
serialized_pb=_b('\n\trpc.proto\"v\n\tVNodeInfo\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0f\n\x07vnodeid\x18\x03 \x01(\x05\x12\x1f\n\nparameters\x18\x04 \x01(\x0b\x32\x0b.Parameters\x12\x15\n\x05vnode\x18\x05 \x01(\x0b\x32\x06.VNode\"f\n\x05Reply\x12\"\n\x06status\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"\'\n\tReportMsg\x12\x1a\n\x08taskmsgs\x18\x01 \x03(\x0b\x32\x08.TaskMsg\"{\n\x07TaskMsg\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0f\n\x07vnodeid\x18\x03 \x01(\x05\x12\x1e\n\rsubTaskStatus\x18\x04 \x01(\x0e\x32\x07.Status\x12\r\n\x05token\x18\x05 \x01(\t\x12\x0e\n\x06\x65rrmsg\x18\x06 \x01(\t\"~\n\x08TaskInfo\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0f\n\x07vnodeid\x18\x03 \x01(\x05\x12\x1f\n\nparameters\x18\x04 \x01(\x0b\x32\x0b.Parameters\x12\x0f\n\x07timeout\x18\x05 \x01(\x05\x12\r\n\x05token\x18\x06 \x01(\t\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"m\n\x05VNode\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\x12\x19\n\x07network\x18\x04 \x01(\x0b\x32\x08.Network\"L\n\x07Network\x12\x0e\n\x06ipaddr\x18\x01 \x01(\t\x12\x0f\n\x07gateway\x18\x02 \x01(\t\x12\x10\n\x08masterip\x18\x03 \x01(\t\x12\x0e\n\x06\x62rname\x18\x04 \x01(\t\"t\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\".\n\tImageType\x12\x08\n\x04\x42\x41SE\x10\x00\x12\n\n\x06PUBLIC\x10\x01\x12\x0b\n\x07PRIVATE\x10\x02\"u\n\x05Mount\x12\x10\n\x08provider\x18\x01 \x01(\t\x12\x11\n\tlocalPath\x18\x02 \x01(\t\x12\x12\n\nremotePath\x18\x03 \x01(\t\x12\x11\n\taccessKey\x18\x04 \x01(\t\x12\x11\n\tsecretKey\x18\x05 \x01(\t\x12\r\n\x05other\x18\x06 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05*[\n\x06Status\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x12\x0f\n\x0bOUTPUTERROR\x10\x05\x32(\n\x06Master\x12\x1e\n\x06report\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x32\x97\x01\n\x06Worker\x12#\n\x0bstart_vnode\x12\n.VNodeInfo\x1a\x06.Reply\"\x00\x12!\n\nstart_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x12!\n\tstop_task\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x12\"\n\nstop_vnode\x12\n.VNodeInfo\x1a\x06.Reply\"\x00\x62\x06proto3')
|
||||
)
|
||||
|
||||
_STATUS = _descriptor.EnumDescriptor(
|
||||
|
@ -56,8 +56,8 @@ _STATUS = _descriptor.EnumDescriptor(
|
|||
],
|
||||
containing_type=None,
|
||||
options=None,
|
||||
serialized_start=1134,
|
||||
serialized_end=1225,
|
||||
serialized_start=1264,
|
||||
serialized_end=1355,
|
||||
)
|
||||
_sym_db.RegisterEnumDescriptor(_STATUS)
|
||||
|
||||
|
@ -87,8 +87,8 @@ _REPLY_REPLYSTATUS = _descriptor.EnumDescriptor(
|
|||
],
|
||||
containing_type=None,
|
||||
options=None,
|
||||
serialized_start=75,
|
||||
serialized_end=115,
|
||||
serialized_start=195,
|
||||
serialized_end=235,
|
||||
)
|
||||
_sym_db.RegisterEnumDescriptor(_REPLY_REPLYSTATUS)
|
||||
|
||||
|
@ -113,12 +113,71 @@ _IMAGE_IMAGETYPE = _descriptor.EnumDescriptor(
|
|||
],
|
||||
containing_type=None,
|
||||
options=None,
|
||||
serialized_start=899,
|
||||
serialized_end=945,
|
||||
serialized_start=1029,
|
||||
serialized_end=1075,
|
||||
)
|
||||
_sym_db.RegisterEnumDescriptor(_IMAGE_IMAGETYPE)
|
||||
|
||||
|
||||
_VNODEINFO = _descriptor.Descriptor(
|
||||
name='VNodeInfo',
|
||||
full_name='VNodeInfo',
|
||||
filename=None,
|
||||
file=DESCRIPTOR,
|
||||
containing_type=None,
|
||||
fields=[
|
||||
_descriptor.FieldDescriptor(
|
||||
name='taskid', full_name='VNodeInfo.taskid', index=0,
|
||||
number=1, type=9, cpp_type=9, label=1,
|
||||
has_default_value=False, default_value=_b("").decode('utf-8'),
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='username', full_name='VNodeInfo.username', index=1,
|
||||
number=2, type=9, cpp_type=9, label=1,
|
||||
has_default_value=False, default_value=_b("").decode('utf-8'),
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='vnodeid', full_name='VNodeInfo.vnodeid', index=2,
|
||||
number=3, type=5, cpp_type=1, label=1,
|
||||
has_default_value=False, default_value=0,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='parameters', full_name='VNodeInfo.parameters', index=3,
|
||||
number=4, type=11, cpp_type=10, label=1,
|
||||
has_default_value=False, default_value=None,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='vnode', full_name='VNodeInfo.vnode', index=4,
|
||||
number=5, type=11, cpp_type=10, label=1,
|
||||
has_default_value=False, default_value=None,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
],
|
||||
extensions=[
|
||||
],
|
||||
nested_types=[],
|
||||
enum_types=[
|
||||
],
|
||||
options=None,
|
||||
is_extendable=False,
|
||||
syntax='proto3',
|
||||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=13,
|
||||
serialized_end=131,
|
||||
)
|
||||
|
||||
|
||||
_REPLY = _descriptor.Descriptor(
|
||||
name='Reply',
|
||||
full_name='Reply',
|
||||
|
@ -153,8 +212,8 @@ _REPLY = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=13,
|
||||
serialized_end=115,
|
||||
serialized_start=133,
|
||||
serialized_end=235,
|
||||
)
|
||||
|
||||
|
||||
|
@ -184,8 +243,8 @@ _REPORTMSG = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=117,
|
||||
serialized_end=156,
|
||||
serialized_start=237,
|
||||
serialized_end=276,
|
||||
)
|
||||
|
||||
|
||||
|
@ -211,14 +270,14 @@ _TASKMSG = _descriptor.Descriptor(
|
|||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='instanceid', full_name='TaskMsg.instanceid', index=2,
|
||||
name='vnodeid', full_name='TaskMsg.vnodeid', index=2,
|
||||
number=3, type=5, cpp_type=1, label=1,
|
||||
has_default_value=False, default_value=0,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='instanceStatus', full_name='TaskMsg.instanceStatus', index=3,
|
||||
name='subTaskStatus', full_name='TaskMsg.subTaskStatus', index=3,
|
||||
number=4, type=14, cpp_type=8, label=1,
|
||||
has_default_value=False, default_value=0,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
|
@ -250,8 +309,8 @@ _TASKMSG = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=158,
|
||||
serialized_end=285,
|
||||
serialized_start=278,
|
||||
serialized_end=401,
|
||||
)
|
||||
|
||||
|
||||
|
@ -263,7 +322,7 @@ _TASKINFO = _descriptor.Descriptor(
|
|||
containing_type=None,
|
||||
fields=[
|
||||
_descriptor.FieldDescriptor(
|
||||
name='id', full_name='TaskInfo.id', index=0,
|
||||
name='taskid', full_name='TaskInfo.taskid', index=0,
|
||||
number=1, type=9, cpp_type=9, label=1,
|
||||
has_default_value=False, default_value=_b("").decode('utf-8'),
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
|
@ -277,61 +336,33 @@ _TASKINFO = _descriptor.Descriptor(
|
|||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='instanceid', full_name='TaskInfo.instanceid', index=2,
|
||||
name='vnodeid', full_name='TaskInfo.vnodeid', index=2,
|
||||
number=3, type=5, cpp_type=1, label=1,
|
||||
has_default_value=False, default_value=0,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='instanceCount', full_name='TaskInfo.instanceCount', index=3,
|
||||
number=4, type=5, cpp_type=1, label=1,
|
||||
has_default_value=False, default_value=0,
|
||||
name='parameters', full_name='TaskInfo.parameters', index=3,
|
||||
number=4, type=11, cpp_type=10, label=1,
|
||||
has_default_value=False, default_value=None,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='maxRetryCount', full_name='TaskInfo.maxRetryCount', index=4,
|
||||
name='timeout', full_name='TaskInfo.timeout', index=4,
|
||||
number=5, type=5, cpp_type=1, label=1,
|
||||
has_default_value=False, default_value=0,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='parameters', full_name='TaskInfo.parameters', index=5,
|
||||
number=6, type=11, cpp_type=10, label=1,
|
||||
has_default_value=False, default_value=None,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='cluster', full_name='TaskInfo.cluster', index=6,
|
||||
number=7, type=11, cpp_type=10, label=1,
|
||||
has_default_value=False, default_value=None,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='timeout', full_name='TaskInfo.timeout', index=7,
|
||||
number=8, type=5, cpp_type=1, label=1,
|
||||
has_default_value=False, default_value=0,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='token', full_name='TaskInfo.token', index=8,
|
||||
number=9, type=9, cpp_type=9, label=1,
|
||||
name='token', full_name='TaskInfo.token', index=5,
|
||||
number=6, type=9, cpp_type=9, label=1,
|
||||
has_default_value=False, default_value=_b("").decode('utf-8'),
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='reused', full_name='TaskInfo.reused', index=9,
|
||||
number=10, type=8, cpp_type=7, label=1,
|
||||
has_default_value=False, default_value=False,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
],
|
||||
extensions=[
|
||||
],
|
||||
|
@ -344,8 +375,8 @@ _TASKINFO = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=288,
|
||||
serialized_end=502,
|
||||
serialized_start=403,
|
||||
serialized_end=529,
|
||||
)
|
||||
|
||||
|
||||
|
@ -389,8 +420,8 @@ _PARAMETERS = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=504,
|
||||
serialized_end=599,
|
||||
serialized_start=531,
|
||||
serialized_end=626,
|
||||
)
|
||||
|
||||
|
||||
|
@ -427,8 +458,8 @@ _COMMAND_ENVVARSENTRY = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=695,
|
||||
serialized_end=741,
|
||||
serialized_start=722,
|
||||
serialized_end=768,
|
||||
)
|
||||
|
||||
_COMMAND = _descriptor.Descriptor(
|
||||
|
@ -471,39 +502,46 @@ _COMMAND = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=602,
|
||||
serialized_end=741,
|
||||
serialized_start=629,
|
||||
serialized_end=768,
|
||||
)
|
||||
|
||||
|
||||
_CLUSTER = _descriptor.Descriptor(
|
||||
name='Cluster',
|
||||
full_name='Cluster',
|
||||
_VNODE = _descriptor.Descriptor(
|
||||
name='VNode',
|
||||
full_name='VNode',
|
||||
filename=None,
|
||||
file=DESCRIPTOR,
|
||||
containing_type=None,
|
||||
fields=[
|
||||
_descriptor.FieldDescriptor(
|
||||
name='image', full_name='Cluster.image', index=0,
|
||||
name='image', full_name='VNode.image', index=0,
|
||||
number=1, type=11, cpp_type=10, label=1,
|
||||
has_default_value=False, default_value=None,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='instance', full_name='Cluster.instance', index=1,
|
||||
name='instance', full_name='VNode.instance', index=1,
|
||||
number=2, type=11, cpp_type=10, label=1,
|
||||
has_default_value=False, default_value=None,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='mount', full_name='Cluster.mount', index=2,
|
||||
name='mount', full_name='VNode.mount', index=2,
|
||||
number=3, type=11, cpp_type=10, label=3,
|
||||
has_default_value=False, default_value=[],
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='network', full_name='VNode.network', index=3,
|
||||
number=4, type=11, cpp_type=10, label=1,
|
||||
has_default_value=False, default_value=None,
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
],
|
||||
extensions=[
|
||||
],
|
||||
|
@ -516,8 +554,60 @@ _CLUSTER = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=743,
|
||||
serialized_end=827,
|
||||
serialized_start=770,
|
||||
serialized_end=879,
|
||||
)
|
||||
|
||||
|
||||
_NETWORK = _descriptor.Descriptor(
|
||||
name='Network',
|
||||
full_name='Network',
|
||||
filename=None,
|
||||
file=DESCRIPTOR,
|
||||
containing_type=None,
|
||||
fields=[
|
||||
_descriptor.FieldDescriptor(
|
||||
name='ipaddr', full_name='Network.ipaddr', index=0,
|
||||
number=1, type=9, cpp_type=9, label=1,
|
||||
has_default_value=False, default_value=_b("").decode('utf-8'),
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='gateway', full_name='Network.gateway', index=1,
|
||||
number=2, type=9, cpp_type=9, label=1,
|
||||
has_default_value=False, default_value=_b("").decode('utf-8'),
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='masterip', full_name='Network.masterip', index=2,
|
||||
number=3, type=9, cpp_type=9, label=1,
|
||||
has_default_value=False, default_value=_b("").decode('utf-8'),
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
_descriptor.FieldDescriptor(
|
||||
name='brname', full_name='Network.brname', index=3,
|
||||
number=4, type=9, cpp_type=9, label=1,
|
||||
has_default_value=False, default_value=_b("").decode('utf-8'),
|
||||
message_type=None, enum_type=None, containing_type=None,
|
||||
is_extension=False, extension_scope=None,
|
||||
options=None, file=DESCRIPTOR),
|
||||
],
|
||||
extensions=[
|
||||
],
|
||||
nested_types=[],
|
||||
enum_types=[
|
||||
],
|
||||
options=None,
|
||||
is_extendable=False,
|
||||
syntax='proto3',
|
||||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=881,
|
||||
serialized_end=957,
|
||||
)
|
||||
|
||||
|
||||
|
@ -562,8 +652,8 @@ _IMAGE = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=829,
|
||||
serialized_end=945,
|
||||
serialized_start=959,
|
||||
serialized_end=1075,
|
||||
)
|
||||
|
||||
|
||||
|
@ -628,8 +718,8 @@ _MOUNT = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=947,
|
||||
serialized_end=1064,
|
||||
serialized_start=1077,
|
||||
serialized_end=1194,
|
||||
)
|
||||
|
||||
|
||||
|
@ -680,37 +770,48 @@ _INSTANCE = _descriptor.Descriptor(
|
|||
extension_ranges=[],
|
||||
oneofs=[
|
||||
],
|
||||
serialized_start=1066,
|
||||
serialized_end=1132,
|
||||
serialized_start=1196,
|
||||
serialized_end=1262,
|
||||
)
|
||||
|
||||
_VNODEINFO.fields_by_name['parameters'].message_type = _PARAMETERS
|
||||
_VNODEINFO.fields_by_name['vnode'].message_type = _VNODE
|
||||
_REPLY.fields_by_name['status'].enum_type = _REPLY_REPLYSTATUS
|
||||
_REPLY_REPLYSTATUS.containing_type = _REPLY
|
||||
_REPORTMSG.fields_by_name['taskmsgs'].message_type = _TASKMSG
|
||||
_TASKMSG.fields_by_name['instanceStatus'].enum_type = _STATUS
|
||||
_TASKMSG.fields_by_name['subTaskStatus'].enum_type = _STATUS
|
||||
_TASKINFO.fields_by_name['parameters'].message_type = _PARAMETERS
|
||||
_TASKINFO.fields_by_name['cluster'].message_type = _CLUSTER
|
||||
_PARAMETERS.fields_by_name['command'].message_type = _COMMAND
|
||||
_COMMAND_ENVVARSENTRY.containing_type = _COMMAND
|
||||
_COMMAND.fields_by_name['envVars'].message_type = _COMMAND_ENVVARSENTRY
|
||||
_CLUSTER.fields_by_name['image'].message_type = _IMAGE
|
||||
_CLUSTER.fields_by_name['instance'].message_type = _INSTANCE
|
||||
_CLUSTER.fields_by_name['mount'].message_type = _MOUNT
|
||||
_VNODE.fields_by_name['image'].message_type = _IMAGE
|
||||
_VNODE.fields_by_name['instance'].message_type = _INSTANCE
|
||||
_VNODE.fields_by_name['mount'].message_type = _MOUNT
|
||||
_VNODE.fields_by_name['network'].message_type = _NETWORK
|
||||
_IMAGE.fields_by_name['type'].enum_type = _IMAGE_IMAGETYPE
|
||||
_IMAGE_IMAGETYPE.containing_type = _IMAGE
|
||||
DESCRIPTOR.message_types_by_name['VNodeInfo'] = _VNODEINFO
|
||||
DESCRIPTOR.message_types_by_name['Reply'] = _REPLY
|
||||
DESCRIPTOR.message_types_by_name['ReportMsg'] = _REPORTMSG
|
||||
DESCRIPTOR.message_types_by_name['TaskMsg'] = _TASKMSG
|
||||
DESCRIPTOR.message_types_by_name['TaskInfo'] = _TASKINFO
|
||||
DESCRIPTOR.message_types_by_name['Parameters'] = _PARAMETERS
|
||||
DESCRIPTOR.message_types_by_name['Command'] = _COMMAND
|
||||
DESCRIPTOR.message_types_by_name['Cluster'] = _CLUSTER
|
||||
DESCRIPTOR.message_types_by_name['VNode'] = _VNODE
|
||||
DESCRIPTOR.message_types_by_name['Network'] = _NETWORK
|
||||
DESCRIPTOR.message_types_by_name['Image'] = _IMAGE
|
||||
DESCRIPTOR.message_types_by_name['Mount'] = _MOUNT
|
||||
DESCRIPTOR.message_types_by_name['Instance'] = _INSTANCE
|
||||
DESCRIPTOR.enum_types_by_name['Status'] = _STATUS
|
||||
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
|
||||
|
||||
VNodeInfo = _reflection.GeneratedProtocolMessageType('VNodeInfo', (_message.Message,), dict(
|
||||
DESCRIPTOR = _VNODEINFO,
|
||||
__module__ = 'rpc_pb2'
|
||||
# @@protoc_insertion_point(class_scope:VNodeInfo)
|
||||
))
|
||||
_sym_db.RegisterMessage(VNodeInfo)
|
||||
|
||||
Reply = _reflection.GeneratedProtocolMessageType('Reply', (_message.Message,), dict(
|
||||
DESCRIPTOR = _REPLY,
|
||||
__module__ = 'rpc_pb2'
|
||||
|
@ -761,12 +862,19 @@ Command = _reflection.GeneratedProtocolMessageType('Command', (_message.Message,
|
|||
_sym_db.RegisterMessage(Command)
|
||||
_sym_db.RegisterMessage(Command.EnvVarsEntry)
|
||||
|
||||
Cluster = _reflection.GeneratedProtocolMessageType('Cluster', (_message.Message,), dict(
|
||||
DESCRIPTOR = _CLUSTER,
|
||||
VNode = _reflection.GeneratedProtocolMessageType('VNode', (_message.Message,), dict(
|
||||
DESCRIPTOR = _VNODE,
|
||||
__module__ = 'rpc_pb2'
|
||||
# @@protoc_insertion_point(class_scope:Cluster)
|
||||
# @@protoc_insertion_point(class_scope:VNode)
|
||||
))
|
||||
_sym_db.RegisterMessage(Cluster)
|
||||
_sym_db.RegisterMessage(VNode)
|
||||
|
||||
Network = _reflection.GeneratedProtocolMessageType('Network', (_message.Message,), dict(
|
||||
DESCRIPTOR = _NETWORK,
|
||||
__module__ = 'rpc_pb2'
|
||||
# @@protoc_insertion_point(class_scope:Network)
|
||||
))
|
||||
_sym_db.RegisterMessage(Network)
|
||||
|
||||
Image = _reflection.GeneratedProtocolMessageType('Image', (_message.Message,), dict(
|
||||
DESCRIPTOR = _IMAGE,
|
||||
|
@ -799,8 +907,8 @@ _MASTER = _descriptor.ServiceDescriptor(
|
|||
file=DESCRIPTOR,
|
||||
index=0,
|
||||
options=None,
|
||||
serialized_start=1227,
|
||||
serialized_end=1267,
|
||||
serialized_start=1357,
|
||||
serialized_end=1397,
|
||||
methods=[
|
||||
_descriptor.MethodDescriptor(
|
||||
name='report',
|
||||
|
@ -823,27 +931,45 @@ _WORKER = _descriptor.ServiceDescriptor(
|
|||
file=DESCRIPTOR,
|
||||
index=1,
|
||||
options=None,
|
||||
serialized_start=1269,
|
||||
serialized_end=1350,
|
||||
serialized_start=1400,
|
||||
serialized_end=1551,
|
||||
methods=[
|
||||
_descriptor.MethodDescriptor(
|
||||
name='process_task',
|
||||
full_name='Worker.process_task',
|
||||
name='start_vnode',
|
||||
full_name='Worker.start_vnode',
|
||||
index=0,
|
||||
containing_service=None,
|
||||
input_type=_VNODEINFO,
|
||||
output_type=_REPLY,
|
||||
options=None,
|
||||
),
|
||||
_descriptor.MethodDescriptor(
|
||||
name='start_task',
|
||||
full_name='Worker.start_task',
|
||||
index=1,
|
||||
containing_service=None,
|
||||
input_type=_TASKINFO,
|
||||
output_type=_REPLY,
|
||||
options=None,
|
||||
),
|
||||
_descriptor.MethodDescriptor(
|
||||
name='stop_tasks',
|
||||
full_name='Worker.stop_tasks',
|
||||
index=1,
|
||||
name='stop_task',
|
||||
full_name='Worker.stop_task',
|
||||
index=2,
|
||||
containing_service=None,
|
||||
input_type=_REPORTMSG,
|
||||
output_type=_REPLY,
|
||||
options=None,
|
||||
),
|
||||
_descriptor.MethodDescriptor(
|
||||
name='stop_vnode',
|
||||
full_name='Worker.stop_vnode',
|
||||
index=3,
|
||||
containing_service=None,
|
||||
input_type=_VNODEINFO,
|
||||
output_type=_REPLY,
|
||||
options=None,
|
||||
),
|
||||
])
|
||||
_sym_db.RegisterServiceDescriptor(_WORKER)
|
||||
|
||||
|
|
|
@ -0,0 +1,151 @@
|
|||
import sys
|
||||
if sys.path[0].endswith("worker"):
|
||||
sys.path[0] = sys.path[0][:-6]
|
||||
from utils import env, tools
|
||||
config = env.getenv("CONFIG")
|
||||
#config = "/opt/docklet/local/docklet-running.conf"
|
||||
tools.loadenv(config)
|
||||
from utils.log import initlogging
|
||||
initlogging("docklet-taskworker")
|
||||
from utils.log import logger
|
||||
|
||||
from concurrent import futures
|
||||
import grpc
|
||||
#from utils.log import logger
|
||||
#from utils import env
|
||||
import json,lxc,subprocess,threading,os,time,traceback
|
||||
from utils import imagemgr,etcdlib,gputools
|
||||
from utils.lvmtool import sys_run
|
||||
from worker import ossmounter
|
||||
from protos import rpc_pb2, rpc_pb2_grpc
|
||||
|
||||
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
|
||||
MAX_RUNNING_TIME = _ONE_DAY_IN_SECONDS
|
||||
|
||||
class TaskWorker(rpc_pb2_grpc.WorkerServicer):
|
||||
|
||||
def __init__(self):
|
||||
rpc_pb2_grpc.WorkerServicer.__init__(self)
|
||||
etcdaddr = env.getenv("ETCD")
|
||||
logger.info ("using ETCD %s" % etcdaddr )
|
||||
|
||||
clustername = env.getenv("CLUSTER_NAME")
|
||||
logger.info ("using CLUSTER_NAME %s" % clustername )
|
||||
|
||||
# init etcdlib client
|
||||
try:
|
||||
self.etcdclient = etcdlib.Client(etcdaddr, prefix = clustername)
|
||||
except Exception:
|
||||
logger.error ("connect etcd failed, maybe etcd address not correct...")
|
||||
sys.exit(1)
|
||||
else:
|
||||
logger.info("etcd connected")
|
||||
|
||||
# get master ip and report port
|
||||
[success,masterip] = self.etcdclient.getkey("service/master")
|
||||
if not success:
|
||||
logger.error("Fail to get master ip address.")
|
||||
sys.exit(1)
|
||||
else:
|
||||
self.master_ip = masterip
|
||||
logger.info("Get master ip address: %s" % (self.master_ip))
|
||||
self.master_port = env.getenv('BATCH_MASTER_PORT')
|
||||
|
||||
self.imgmgr = imagemgr.ImageMgr()
|
||||
self.fspath = env.getenv('FS_PREFIX')
|
||||
self.confpath = env.getenv('DOCKLET_CONF')
|
||||
|
||||
self.taskmsgs = []
|
||||
self.msgslock = threading.Lock()
|
||||
self.report_interval = 2
|
||||
|
||||
self.lock = threading.Lock()
|
||||
self.mount_lock = threading.Lock()
|
||||
|
||||
self.gpu_lock = threading.Lock()
|
||||
self.gpu_status = {}
|
||||
gpus = gputools.get_gpu_status()
|
||||
for gpu in gpus:
|
||||
self.gpu_status[gpu['id']] = ""
|
||||
|
||||
self.start_report()
|
||||
logger.info('TaskWorker init success')
|
||||
|
||||
def start_vnode(self, request, context):
|
||||
logger.info('start vnode with config: ' + str(request))
|
||||
taskid = request.taskid
|
||||
vnodeid = request.vnodeid
|
||||
|
||||
envs = {}
|
||||
envs['taskid'] = str(taskid)
|
||||
envs['vnodeid'] = str(vnodeid)
|
||||
image = {}
|
||||
image['name'] = request.vnode.image.name
|
||||
if request.vnode.image.type == rpc_pb2.Image.PRIVATE:
|
||||
image['type'] = 'private'
|
||||
elif request.vnode.image.type == rpc_pb2.Image.PUBLIC:
|
||||
image['type'] = 'public'
|
||||
else:
|
||||
image['type'] = 'base'
|
||||
image['owner'] = request.vnode.image.owner
|
||||
username = request.username
|
||||
lxcname = '%s-batch-%s-%s' % (username,taskid,str(vnodeid))
|
||||
instance_type = request.vnode.instance
|
||||
mount_list = request.vnode.mount
|
||||
gpu_need = int(request.vnode.instance.gpu)
|
||||
|
||||
def start_task(self, request, context):
|
||||
pass
|
||||
|
||||
def stop_task(self, request, context):
|
||||
pass
|
||||
|
||||
def stop_vnode(self, request, context):
|
||||
pass
|
||||
|
||||
def add_msg(self,taskid,username,vnodeid,status,token,errmsg):
|
||||
self.msgslock.acquire()
|
||||
try:
|
||||
self.taskmsgs.append(rpc_pb2.TaskMsg(taskid=str(taskid),username=username,vnodeid=int(vnodeid),subTaskStatus=status,token=token,errmsg=errmsg))
|
||||
except Exception as err:
|
||||
logger.error(traceback.format_exc())
|
||||
self.msgslock.release()
|
||||
|
||||
def report_msg(self):
|
||||
channel = grpc.insecure_channel(self.master_ip+":"+self.master_port)
|
||||
stub = rpc_pb2_grpc.MasterStub(channel)
|
||||
while True:
|
||||
self.msgslock.acquire()
|
||||
reportmsg = rpc_pb2.ReportMsg(taskmsgs = self.taskmsgs)
|
||||
try:
|
||||
response = stub.report(reportmsg)
|
||||
logger.info("Response from master by reporting: "+str(response.status)+" "+response.message)
|
||||
except Exception as err:
|
||||
logger.error(traceback.format_exc())
|
||||
self.taskmsgs = []
|
||||
self.msgslock.release()
|
||||
time.sleep(self.report_interval)
|
||||
|
||||
def start_report(self):
|
||||
thread = threading.Thread(target = self.report_msg, args=())
|
||||
thread.setDaemon(True)
|
||||
thread.start()
|
||||
logger.info("Start to report task messages to master every %d seconds." % self.report_interval)
|
||||
|
||||
def TaskWorkerServe():
|
||||
max_threads = int(env.getenv('BATCH_MAX_THREAD_WORKER'))
|
||||
worker_port = int(env.getenv('BATCH_WORKER_PORT'))
|
||||
logger.info("Max Threads on a worker is %d" % max_threads)
|
||||
server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_threads))
|
||||
rpc_pb2_grpc.add_WorkerServicer_to_server(TaskWorker(), server)
|
||||
server.add_insecure_port('[::]:'+str(worker_port))
|
||||
server.start()
|
||||
logger.info("Start TaskWorker Servicer on port:%d" % worker_port)
|
||||
try:
|
||||
while True:
|
||||
time.sleep(_ONE_DAY_IN_SECONDS)
|
||||
except KeyboardInterrupt:
|
||||
server.stop(0)
|
||||
|
||||
if __name__ == "__main__":
|
||||
TaskControllerServe()
|
Loading…
Reference in New Issue