add network into taskmgr

This commit is contained in:
zhuyj17 2019-03-05 00:15:23 +08:00
parent 9129a75537
commit 782baee617
9 changed files with 128 additions and 29 deletions

View File

@ -201,12 +201,11 @@
# default: 50051
# BATCH_WORKER_PORT=50051
# BATCH_GATEWAY: the ip address of gateway for the containers processing
# batch jobs. default: 10.0.3.1
# BATCH_GATEWAY=10.0.3.1
# BATCH_NET: ip addresses range of containers for batch job, default is 10.16.0.0/16
# BATCH_NET=10.16.0.0/16
# BATCH_NET: ip addresses range of containers for batch job, default is 10.0.3.0/24
# BATCH_NET=10.0.3.0/24
# BATCH_TASK_CIDR: 2^(BATCH_TASK_CIDR) is the number of ip addresses for a task, default is 4
# BATCH_TASK_CIDR=4
# BATCH_MAX_THREAD_WORKER: the maximun number of threads of the rpc server on
# the batch job worker. default:5

View File

@ -13,11 +13,18 @@ from concurrent import futures
import grpc
from protos.rpc_pb2 import *
from protos.rpc_pb2_grpc import MasterServicer, add_MasterServicer_to_server, WorkerStub
from utils.nettools import netcontrol
from utils import env
def ip_to_int(addr):
[a, b, c, d] = addr.split('.')
return (int(a)<<24) + (int(b)<<16) + (int(c)<<8) + int(d)
def int_to_ip(num):
return str((num>>24)&255)+"."+str((num>>16)&255)+"."+str((num>>8)&255)+"."+str(num&255)
class Task():
def __init__(self, configinfo, vnodeinfo, taskinfo, priority):
def __init__(self, configinfo, vnodeinfo, taskinfo, priority, max_size):
self.vnodeinfo = vnodeinfo
self.taskinfo = taskinfo
self.status = WAITING
@ -30,6 +37,8 @@ class Task():
# priority the bigger the better
# self.priority the smaller the better
self.priority = int(time.time()) / 60 / 60 - priority
self.network = None
self.max_size = max_size
for i in range(self.vnode_nums):
self.subtask_list.append({'status':'WAITING','try_count':0})
@ -59,10 +68,11 @@ class TaskMgr(threading.Thread):
# load task information from etcd
# initial a task queue and task schedueler
# taskmgr: a taskmgr instance
def __init__(self, nodemgr, monitor_fetcher, scheduler_interval=2, external_logger=None):
def __init__(self, nodemgr, monitor_fetcher, master_ip, scheduler_interval=2, external_logger=None):
threading.Thread.__init__(self)
self.thread_stop = False
self.jobmgr = None
self.master_ip = self.master_ip
self.task_queue = []
self.lazy_append_list = []
self.lazy_delete_list = []
@ -84,6 +94,19 @@ class TaskMgr(threading.Thread):
# self.last_nodes_info_update_time = 0
# self.nodes_info_update_interval = 30 # (s)
self.network_lock = threading.Lock()
batch_net = env.getenv('BATCH_NET')
self.batch_cidr = int(batch_net.split('/')[1])
batch_net = batch_net.split('/')[0]
task_cidr = int(env.getenv('BATCH_TASK_CIDR'))
task_cidr = min(task_cidr,31-self.batch_cidr)
self.task_cidr = max(task_cidr,2)
self.base_ip = ip_to_int(batch_net)
self.free_nets = []
for i in range((1 << self.task_cidr), (1 << (32-self.batch_cidr)) - 1):
self.free_nets.append(i)
logger.info("Free nets addresses pool %s" % str(self.free_nets))
logger.info("Each Batch Net CIDR:%s"%(str(self.task_cidr)))
def queue_lock(f):
@wraps(f)
@ -94,6 +117,14 @@ class TaskMgr(threading.Thread):
return result
return new_f
def net_lock(f):
@wraps(f)
def new_f(self, *args, **kwargs):
self.network_lock.acquire()
result = f(self, *args, **kwargs)
self.network_lock.release()
return result
return new_f
def run(self):
self.serve()
@ -128,14 +159,73 @@ class TaskMgr(threading.Thread):
self.task_queue.append(task)
self.task_queue = sorted(self.task_queue, key=lambda x: x.priority)
def stop_vnode(self, worker, task, vnodeid):
vnodeinfo = copy.copy(task.vnodeinfo)
vnodeinfo.vnodeid = vnodeid
try:
self.logger.info('[task_processor] Stopping vnode for task [%s] vnode [%d]' % (task.vnodeinfo.id, vnodeid))
channel = grpc.insecure_channel('%s:%s' % (worker, self.worker_port))
stub = WorkerStub(channel)
response = stub.stop_vnode(vnodeinfo)
if response.status != Reply.ACCEPTED:
raise Exception(response.message)
except Exception as e:
self.logger.error('[task_processor] rpc error message: %s' % e)
return [False, e]
return [True, ""]
@net_lock
def acquire_task_net(self, task):
self.logger.info("[acquire_task_net] user(%s) task(%s) net(%s)"%(task.taskinfo.username, task.taskinfo.taskid, str(task.network)))
if task.network == None:
task.network = self.free_nets.pop(0)
return task.network
@net_lock
def release_task_net(self,task):
self.logger.info("[release_task_net] user(%s) task(%s) net(%s)"%(task.taskinfo.username, task.taskinfo.taskid, str(task.network)))
if task.network == None:
return
self.free_nets.append(task.network)
self.logger.error('[release task_net] %s'%str(e))
def setup_tasknet(self, task, workers=None):
taskid = task.taskinfo.taskid
username = task.taskinfo.username
brname = "docklet-batch-%s-%s"%(username, taskid)
gwname = "Batch-%s-%s"%(username, taskid)
if task.network == None:
return [False, "task.network is None!"]
gatewayip = int_to_ip(self.base_ip + task.network + 1)
gatewayipcidr += "/" + str(32-self.task_cidr)
netcontrol.new_bridge(brname)
netcontrol.setup_gw(brname,gwname,gatewayipcidr,0,0)
for wip in workers:
netcontrol.setup_gre(brname,wip)
return [True, gatewayip]
def remove_tasknet(self, task):
taskid = task.taskinfo.taskid
username = task.taskinfo.username
brname = "docklet-batch-%s-%s"%(username, taskid)
netcontrol.del_bridge(brname)
def task_processor(self, task, vnodes_workers):
task.status = RUNNING
self.jobmgr.report(task.taskinfo.taskid,'running')
# properties for transaction
# properties for transactio
self.acquire_task_net(task)
[success, gwip] = self.setup_tasknet(task,[w[1] for w in vnodes_workers])
if not success:
return [False, gwip]
token = ''.join(random.sample(string.ascii_letters + string.digits, 8))
placed_workers = []
# start vc
for vid, worker in vnodes_workers:
vnodeinfo = copy.copy(task.vnodeinfo)
vnodeinfo.vnodeid = vid
@ -152,21 +242,30 @@ class TaskMgr(threading.Thread):
#if not username in self.user_containers.keys():
#self.user_containers[username] = []
#self.user_containers[username].append(container_name)
ipaddr = int_to_ip(self.base_ip + task.network + vid%task.max_size + 2)
brname = "docklet-batch-%s-%s"%(username, taskid)
networkinfo = Network(ipaddr=ipaddr, gateway=gwip, masterip=self.masterip, brname=brname)
vnode.network = networkinfo
try:
self.logger.info('[task_processor] starting vnode for task [%s] instance [%d]' % (task.vnodeinfo.id, vid))
channel = grpc.insecure_channel('%s:%s' % (worker, self.worker_port))
stub = WorkerStub(channel)
response = stub.start_vnode(vnodeinfo)
placed_workers.append(worker)
if response.status != Reply.ACCEPTED:
raise Exception(response.message)
except Exception as e:
self.logger.error('[task_processor] rpc error message: %s' % e)
task.status = FAILED
vnode['status'] = FAILED
vnode['try_count'] -= 1
for pl_worker in placed_workers:
pass
return
#self.user_containers[username].remove(container_name)
# start tasks
for vid, worker in vnodes_workers:
taskinfo = copy.copy(task.taskinfo)
taskinfo.vnodeid = vid
@ -181,8 +280,7 @@ class TaskMgr(threading.Thread):
raise Exception(response.message)
except Exception as e:
self.logger.error('[task_processor] rpc error message: %s' % e)
vnode['status'] = FAILED
vnode['try_count'] -= 1
task.status = FAILED
# return task, workers
def task_scheduler(self):
@ -347,7 +445,7 @@ class TaskMgr(threading.Thread):
stdoutRedirectPath = json_task.get('stdOutRedPth',"")),
timeout = int(json_task['expTime']),
),
priority=task_priority)
priority=task_priority,max_size=(1<<self.task_cidr)-2)
if 'mapping' in json_task:
task.vnodeinfo.vnode.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'],
remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir'])

View File

@ -59,7 +59,7 @@ def start_task():
if __name__ == '__main__':
#for i in range(10):
#run()
start_task()
#stop_vnode()
#start_task()
stop_vnode()
#time.sleep(4)
#stop_task()

View File

@ -7,7 +7,7 @@ service Master {
service Worker {
rpc start_vnode (VNodeInfo) returns (Reply) {}
rpc start_task (TaskInfo) returns (Reply) {}
rpc stop_task (ReportMsg) returns (Reply) {}
rpc stop_task (TaskInfo) returns (Reply) {}
rpc stop_vnode (VNodeInfo) returns (Reply) {}
}

View File

@ -20,7 +20,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='rpc.proto',
package='',
syntax='proto3',
serialized_pb=_b('\n\trpc.proto\"U\n\tVNodeInfo\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0f\n\x07vnodeid\x18\x03 \x01(\x05\x12\x15\n\x05vnode\x18\x04 \x01(\x0b\x32\x06.VNode\"f\n\x05Reply\x12\"\n\x06status\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"\'\n\tReportMsg\x12\x1a\n\x08taskmsgs\x18\x01 \x03(\x0b\x32\x08.TaskMsg\"{\n\x07TaskMsg\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0f\n\x07vnodeid\x18\x03 \x01(\x05\x12\x1e\n\rsubTaskStatus\x18\x04 \x01(\x0e\x32\x07.Status\x12\r\n\x05token\x18\x05 \x01(\t\x12\x0e\n\x06\x65rrmsg\x18\x06 \x01(\t\"~\n\x08TaskInfo\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0f\n\x07vnodeid\x18\x03 \x01(\x05\x12\x1f\n\nparameters\x18\x04 \x01(\x0b\x32\x0b.Parameters\x12\x0f\n\x07timeout\x18\x05 \x01(\x05\x12\r\n\x05token\x18\x06 \x01(\t\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"m\n\x05VNode\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\x12\x19\n\x07network\x18\x04 \x01(\x0b\x32\x08.Network\"L\n\x07Network\x12\x0e\n\x06ipaddr\x18\x01 \x01(\t\x12\x0f\n\x07gateway\x18\x02 \x01(\t\x12\x10\n\x08masterip\x18\x03 \x01(\t\x12\x0e\n\x06\x62rname\x18\x04 \x01(\t\"t\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\".\n\tImageType\x12\x08\n\x04\x42\x41SE\x10\x00\x12\n\n\x06PUBLIC\x10\x01\x12\x0b\n\x07PRIVATE\x10\x02\"u\n\x05Mount\x12\x10\n\x08provider\x18\x01 \x01(\t\x12\x11\n\tlocalPath\x18\x02 \x01(\t\x12\x12\n\nremotePath\x18\x03 \x01(\t\x12\x11\n\taccessKey\x18\x04 \x01(\t\x12\x11\n\tsecretKey\x18\x05 \x01(\t\x12\r\n\x05other\x18\x06 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05*[\n\x06Status\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x12\x0f\n\x0bOUTPUTERROR\x10\x05\x32(\n\x06Master\x12\x1e\n\x06report\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x32\x97\x01\n\x06Worker\x12#\n\x0bstart_vnode\x12\n.VNodeInfo\x1a\x06.Reply\"\x00\x12!\n\nstart_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x12!\n\tstop_task\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x12\"\n\nstop_vnode\x12\n.VNodeInfo\x1a\x06.Reply\"\x00\x62\x06proto3')
serialized_pb=_b('\n\trpc.proto\"U\n\tVNodeInfo\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0f\n\x07vnodeid\x18\x03 \x01(\x05\x12\x15\n\x05vnode\x18\x04 \x01(\x0b\x32\x06.VNode\"f\n\x05Reply\x12\"\n\x06status\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"\'\n\tReportMsg\x12\x1a\n\x08taskmsgs\x18\x01 \x03(\x0b\x32\x08.TaskMsg\"{\n\x07TaskMsg\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0f\n\x07vnodeid\x18\x03 \x01(\x05\x12\x1e\n\rsubTaskStatus\x18\x04 \x01(\x0e\x32\x07.Status\x12\r\n\x05token\x18\x05 \x01(\t\x12\x0e\n\x06\x65rrmsg\x18\x06 \x01(\t\"~\n\x08TaskInfo\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0f\n\x07vnodeid\x18\x03 \x01(\x05\x12\x1f\n\nparameters\x18\x04 \x01(\x0b\x32\x0b.Parameters\x12\x0f\n\x07timeout\x18\x05 \x01(\x05\x12\r\n\x05token\x18\x06 \x01(\t\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"m\n\x05VNode\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\x12\x19\n\x07network\x18\x04 \x01(\x0b\x32\x08.Network\"L\n\x07Network\x12\x0e\n\x06ipaddr\x18\x01 \x01(\t\x12\x0f\n\x07gateway\x18\x02 \x01(\t\x12\x10\n\x08masterip\x18\x03 \x01(\t\x12\x0e\n\x06\x62rname\x18\x04 \x01(\t\"t\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\".\n\tImageType\x12\x08\n\x04\x42\x41SE\x10\x00\x12\n\n\x06PUBLIC\x10\x01\x12\x0b\n\x07PRIVATE\x10\x02\"u\n\x05Mount\x12\x10\n\x08provider\x18\x01 \x01(\t\x12\x11\n\tlocalPath\x18\x02 \x01(\t\x12\x12\n\nremotePath\x18\x03 \x01(\t\x12\x11\n\taccessKey\x18\x04 \x01(\t\x12\x11\n\tsecretKey\x18\x05 \x01(\t\x12\r\n\x05other\x18\x06 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05*[\n\x06Status\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x12\x0f\n\x0bOUTPUTERROR\x10\x05\x32(\n\x06Master\x12\x1e\n\x06report\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x32\x96\x01\n\x06Worker\x12#\n\x0bstart_vnode\x12\n.VNodeInfo\x1a\x06.Reply\"\x00\x12!\n\nstart_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x12 \n\tstop_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x12\"\n\nstop_vnode\x12\n.VNodeInfo\x1a\x06.Reply\"\x00\x62\x06proto3')
)
_STATUS = _descriptor.EnumDescriptor(
@ -924,7 +924,7 @@ _WORKER = _descriptor.ServiceDescriptor(
index=1,
options=None,
serialized_start=1367,
serialized_end=1518,
serialized_end=1517,
methods=[
_descriptor.MethodDescriptor(
name='start_vnode',
@ -949,7 +949,7 @@ _WORKER = _descriptor.ServiceDescriptor(
full_name='Worker.stop_task',
index=2,
containing_service=None,
input_type=_REPORTMSG,
input_type=_TASKINFO,
output_type=_REPLY,
options=None,
),

View File

@ -68,7 +68,7 @@ class WorkerStub(object):
)
self.stop_task = channel.unary_unary(
'/Worker/stop_task',
request_serializer=rpc__pb2.ReportMsg.SerializeToString,
request_serializer=rpc__pb2.TaskInfo.SerializeToString,
response_deserializer=rpc__pb2.Reply.FromString,
)
self.stop_vnode = channel.unary_unary(
@ -125,7 +125,7 @@ def add_WorkerServicer_to_server(servicer, server):
),
'stop_task': grpc.unary_unary_rpc_method_handler(
servicer.stop_task,
request_deserializer=rpc__pb2.ReportMsg.FromString,
request_deserializer=rpc__pb2.TaskInfo.FromString,
response_serializer=rpc__pb2.Reply.SerializeToString,
),
'stop_vnode': grpc.unary_unary_rpc_method_handler(

View File

@ -85,10 +85,10 @@ def getenv(key):
return os.environ.get("BATCH_MASTER_PORT","50050")
elif key == "BATCH_WORKER_PORT":
return os.environ.get("BATCH_WORKER_PORT","50051")
elif key == "BATCH_GATEWAY":
return os.environ.get("BATCH_GATEWAY","10.0.3.1")
elif key == "BATCH_TASK_CIDR":
return os.environ.get("BATCH_TASK_CIDR","4")
elif key == "BATCH_NET":
return os.environ.get("BATCH_NET","10.0.3.0/24")
return os.environ.get("BATCH_NET","10.16.0.0/16")
elif key == "BATCH_MAX_THREAD_WORKER":
return os.environ.get("BATCH_MAX_THREAD_WORKER","5")
else:

View File

@ -211,7 +211,7 @@ class ovscontrol(object):
@staticmethod
def add_port_gre(bridge, port, remote):
try:
subprocess.run(['ovs-vsctl', 'add-port', str(bridge), str(port), '--', 'set', 'interface', str(port), 'type=gre', 'options:remote_ip='+str(remote)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
subprocess.run(['ovs-vsctl', '--may-exist', 'add-port', str(bridge), str(port), '--', 'set', 'interface', str(port), 'type=gre', 'options:remote_ip='+str(remote)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, str(port)]
except subprocess.CalledProcessError as suberror:
return [False, "add port failed : %s" % suberror.stdout.decode('utf-8')]

View File

@ -228,10 +228,12 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
def stop_task(self, request, context):
for msg in request.taskmsgs:
lxcname = '%s-batch-%s-%s-%s' % (msg.username,msg.taskid,str(msg.vnodeid),msg.token)
logger.info("Stop the task with lxc:"+lxcname)
subprocess.run("lxc-stop -k -n %s" % lxcname, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
taskid = request.taskid
username = request.username
vnodeid = request.vnodeid
lxcname = '%s-batch-%s-%s' % (username,taskid,str(vnodeid))
logger.info("Stop the task with lxc:"+lxcname)
subprocess.run("lxc-stop -k -n %s" % lxcname, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
# stop and remove container