update rpc protos

This commit is contained in:
Firmlyzhu 2018-07-19 15:26:52 +08:00
parent 2b1e59af8f
commit f13cf4c47d
5 changed files with 71 additions and 68 deletions

View File

@ -23,6 +23,7 @@ apt-get install -y nodejs nodejs-legacy npm
apt-get install -y etcd apt-get install -y etcd
apt-get install -y glusterfs-client attr apt-get install -y glusterfs-client attr
apt-get install -y nginx apt-get install -y nginx
pip3 install grpcio grpcio-tools googleapis-common-protos
#add ip forward #add ip forward
echo "net.ipv4.ip_forward=1" >>/etc/sysctl.conf echo "net.ipv4.ip_forward=1" >>/etc/sysctl.conf

View File

@ -5,7 +5,7 @@ service Master {
} }
service Worker { service Worker {
rpc add_task (Task) returns (Reply) {} //rpc add_task (Task) returns (Reply) {}
rpc process_task (Task) returns (Reply) {} rpc process_task (Task) returns (Reply) {}
} }

View File

@ -1,5 +1,5 @@
# Generated by the protocol buffer compiler. DO NOT EDIT! # Generated by the protocol buffer compiler. DO NOT EDIT!
# source: protos/rpc.proto # source: rpc.proto
import sys import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) _b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
@ -16,10 +16,10 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor( DESCRIPTOR = _descriptor.FileDescriptor(
name='protos/rpc.proto', name='rpc.proto',
package='', package='',
syntax='proto3', syntax='proto3',
serialized_pb=_b('\n\x10protos/rpc.proto\"V\n\x05Reply\x12#\n\x07message\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"\xff\x01\n\x04Task\x12\n\n\x02id\x18\x01 \x01(\t\x12 \n\x06status\x18\x02 \x01(\x0e\x32\x10.Task.TaskStatus\x12\x15\n\rinstanceCount\x18\x03 \x01(\x05\x12\x15\n\rmaxRetryCount\x18\x04 \x01(\x05\x12\x1f\n\nparameters\x18\x05 \x01(\x0b\x32\x0b.Parameters\x12\x19\n\x07\x63luster\x18\x06 \x01(\x0b\x32\x08.Cluster\x12\x0f\n\x07Timeout\x18\x07 \x01(\x05\"N\n\nTaskStatus\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"T\n\x07\x43luster\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\"j\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\"$\n\tImageType\x12\n\n\x06PUBLIC\x10\x00\x12\x0b\n\x07PRIVATE\x10\x01\".\n\x05Mount\x12\x11\n\tlocalPath\x18\x01 \x01(\t\x12\x12\n\nremotePath\x18\x02 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05\x32#\n\x06Master\x12\x19\n\x06report\x12\x05.Task\x1a\x06.Reply\"\x00\x32%\n\x06Worker\x12\x1b\n\x08\x61\x64\x64_task\x12\x05.Task\x1a\x06.Reply\"\x00\x62\x06proto3') serialized_pb=_b('\n\trpc.proto\"V\n\x05Reply\x12#\n\x07message\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"\xff\x01\n\x04Task\x12\n\n\x02id\x18\x01 \x01(\t\x12 \n\x06status\x18\x02 \x01(\x0e\x32\x10.Task.TaskStatus\x12\x15\n\rinstanceCount\x18\x03 \x01(\x05\x12\x15\n\rmaxRetryCount\x18\x04 \x01(\x05\x12\x1f\n\nparameters\x18\x05 \x01(\x0b\x32\x0b.Parameters\x12\x19\n\x07\x63luster\x18\x06 \x01(\x0b\x32\x08.Cluster\x12\x0f\n\x07Timeout\x18\x07 \x01(\x05\"N\n\nTaskStatus\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"T\n\x07\x43luster\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\"j\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\"$\n\tImageType\x12\n\n\x06PUBLIC\x10\x00\x12\x0b\n\x07PRIVATE\x10\x01\".\n\x05Mount\x12\x11\n\tlocalPath\x18\x01 \x01(\t\x12\x12\n\nremotePath\x18\x02 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05\x32#\n\x06Master\x12\x19\n\x06report\x12\x05.Task\x1a\x06.Reply\"\x00\x32)\n\x06Worker\x12\x1f\n\x0cprocess_task\x12\x05.Task\x1a\x06.Reply\"\x00\x62\x06proto3')
) )
@ -41,8 +41,8 @@ _REPLY_REPLYSTATUS = _descriptor.EnumDescriptor(
], ],
containing_type=None, containing_type=None,
options=None, options=None,
serialized_start=66, serialized_start=59,
serialized_end=106, serialized_end=99,
) )
_sym_db.RegisterEnumDescriptor(_REPLY_REPLYSTATUS) _sym_db.RegisterEnumDescriptor(_REPLY_REPLYSTATUS)
@ -75,8 +75,8 @@ _TASK_TASKSTATUS = _descriptor.EnumDescriptor(
], ],
containing_type=None, containing_type=None,
options=None, options=None,
serialized_start=286, serialized_start=279,
serialized_end=364, serialized_end=357,
) )
_sym_db.RegisterEnumDescriptor(_TASK_TASKSTATUS) _sym_db.RegisterEnumDescriptor(_TASK_TASKSTATUS)
@ -97,8 +97,8 @@ _IMAGE_IMAGETYPE = _descriptor.EnumDescriptor(
], ],
containing_type=None, containing_type=None,
options=None, options=None,
serialized_start=761, serialized_start=754,
serialized_end=797, serialized_end=790,
) )
_sym_db.RegisterEnumDescriptor(_IMAGE_IMAGETYPE) _sym_db.RegisterEnumDescriptor(_IMAGE_IMAGETYPE)
@ -130,8 +130,8 @@ _REPLY = _descriptor.Descriptor(
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=20, serialized_start=13,
serialized_end=106, serialized_end=99,
) )
@ -204,8 +204,8 @@ _TASK = _descriptor.Descriptor(
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=109, serialized_start=102,
serialized_end=364, serialized_end=357,
) )
@ -249,8 +249,8 @@ _PARAMETERS = _descriptor.Descriptor(
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=366, serialized_start=359,
serialized_end=461, serialized_end=454,
) )
@ -287,8 +287,8 @@ _COMMAND_ENVVARSENTRY = _descriptor.Descriptor(
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=557, serialized_start=550,
serialized_end=603, serialized_end=596,
) )
_COMMAND = _descriptor.Descriptor( _COMMAND = _descriptor.Descriptor(
@ -331,8 +331,8 @@ _COMMAND = _descriptor.Descriptor(
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=464, serialized_start=457,
serialized_end=603, serialized_end=596,
) )
@ -376,8 +376,8 @@ _CLUSTER = _descriptor.Descriptor(
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=605, serialized_start=598,
serialized_end=689, serialized_end=682,
) )
@ -422,8 +422,8 @@ _IMAGE = _descriptor.Descriptor(
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=691, serialized_start=684,
serialized_end=797, serialized_end=790,
) )
@ -460,8 +460,8 @@ _MOUNT = _descriptor.Descriptor(
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=799, serialized_start=792,
serialized_end=845, serialized_end=838,
) )
@ -512,8 +512,8 @@ _INSTANCE = _descriptor.Descriptor(
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=847, serialized_start=840,
serialized_end=913, serialized_end=906,
) )
_REPLY.fields_by_name['message'].enum_type = _REPLY_REPLYSTATUS _REPLY.fields_by_name['message'].enum_type = _REPLY_REPLYSTATUS
@ -542,21 +542,21 @@ _sym_db.RegisterFileDescriptor(DESCRIPTOR)
Reply = _reflection.GeneratedProtocolMessageType('Reply', (_message.Message,), dict( Reply = _reflection.GeneratedProtocolMessageType('Reply', (_message.Message,), dict(
DESCRIPTOR = _REPLY, DESCRIPTOR = _REPLY,
__module__ = 'protos.rpc_pb2' __module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Reply) # @@protoc_insertion_point(class_scope:Reply)
)) ))
_sym_db.RegisterMessage(Reply) _sym_db.RegisterMessage(Reply)
Task = _reflection.GeneratedProtocolMessageType('Task', (_message.Message,), dict( Task = _reflection.GeneratedProtocolMessageType('Task', (_message.Message,), dict(
DESCRIPTOR = _TASK, DESCRIPTOR = _TASK,
__module__ = 'protos.rpc_pb2' __module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Task) # @@protoc_insertion_point(class_scope:Task)
)) ))
_sym_db.RegisterMessage(Task) _sym_db.RegisterMessage(Task)
Parameters = _reflection.GeneratedProtocolMessageType('Parameters', (_message.Message,), dict( Parameters = _reflection.GeneratedProtocolMessageType('Parameters', (_message.Message,), dict(
DESCRIPTOR = _PARAMETERS, DESCRIPTOR = _PARAMETERS,
__module__ = 'protos.rpc_pb2' __module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Parameters) # @@protoc_insertion_point(class_scope:Parameters)
)) ))
_sym_db.RegisterMessage(Parameters) _sym_db.RegisterMessage(Parameters)
@ -565,12 +565,12 @@ Command = _reflection.GeneratedProtocolMessageType('Command', (_message.Message,
EnvVarsEntry = _reflection.GeneratedProtocolMessageType('EnvVarsEntry', (_message.Message,), dict( EnvVarsEntry = _reflection.GeneratedProtocolMessageType('EnvVarsEntry', (_message.Message,), dict(
DESCRIPTOR = _COMMAND_ENVVARSENTRY, DESCRIPTOR = _COMMAND_ENVVARSENTRY,
__module__ = 'protos.rpc_pb2' __module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Command.EnvVarsEntry) # @@protoc_insertion_point(class_scope:Command.EnvVarsEntry)
)) ))
, ,
DESCRIPTOR = _COMMAND, DESCRIPTOR = _COMMAND,
__module__ = 'protos.rpc_pb2' __module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Command) # @@protoc_insertion_point(class_scope:Command)
)) ))
_sym_db.RegisterMessage(Command) _sym_db.RegisterMessage(Command)
@ -578,28 +578,28 @@ _sym_db.RegisterMessage(Command.EnvVarsEntry)
Cluster = _reflection.GeneratedProtocolMessageType('Cluster', (_message.Message,), dict( Cluster = _reflection.GeneratedProtocolMessageType('Cluster', (_message.Message,), dict(
DESCRIPTOR = _CLUSTER, DESCRIPTOR = _CLUSTER,
__module__ = 'protos.rpc_pb2' __module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Cluster) # @@protoc_insertion_point(class_scope:Cluster)
)) ))
_sym_db.RegisterMessage(Cluster) _sym_db.RegisterMessage(Cluster)
Image = _reflection.GeneratedProtocolMessageType('Image', (_message.Message,), dict( Image = _reflection.GeneratedProtocolMessageType('Image', (_message.Message,), dict(
DESCRIPTOR = _IMAGE, DESCRIPTOR = _IMAGE,
__module__ = 'protos.rpc_pb2' __module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Image) # @@protoc_insertion_point(class_scope:Image)
)) ))
_sym_db.RegisterMessage(Image) _sym_db.RegisterMessage(Image)
Mount = _reflection.GeneratedProtocolMessageType('Mount', (_message.Message,), dict( Mount = _reflection.GeneratedProtocolMessageType('Mount', (_message.Message,), dict(
DESCRIPTOR = _MOUNT, DESCRIPTOR = _MOUNT,
__module__ = 'protos.rpc_pb2' __module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Mount) # @@protoc_insertion_point(class_scope:Mount)
)) ))
_sym_db.RegisterMessage(Mount) _sym_db.RegisterMessage(Mount)
Instance = _reflection.GeneratedProtocolMessageType('Instance', (_message.Message,), dict( Instance = _reflection.GeneratedProtocolMessageType('Instance', (_message.Message,), dict(
DESCRIPTOR = _INSTANCE, DESCRIPTOR = _INSTANCE,
__module__ = 'protos.rpc_pb2' __module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Instance) # @@protoc_insertion_point(class_scope:Instance)
)) ))
_sym_db.RegisterMessage(Instance) _sym_db.RegisterMessage(Instance)
@ -614,8 +614,8 @@ _MASTER = _descriptor.ServiceDescriptor(
file=DESCRIPTOR, file=DESCRIPTOR,
index=0, index=0,
options=None, options=None,
serialized_start=915, serialized_start=908,
serialized_end=950, serialized_end=943,
methods=[ methods=[
_descriptor.MethodDescriptor( _descriptor.MethodDescriptor(
name='report', name='report',
@ -638,12 +638,12 @@ _WORKER = _descriptor.ServiceDescriptor(
file=DESCRIPTOR, file=DESCRIPTOR,
index=1, index=1,
options=None, options=None,
serialized_start=952, serialized_start=945,
serialized_end=989, serialized_end=986,
methods=[ methods=[
_descriptor.MethodDescriptor( _descriptor.MethodDescriptor(
name='add_task', name='process_task',
full_name='Worker.add_task', full_name='Worker.process_task',
index=0, index=0,
containing_service=None, containing_service=None,
input_type=_TASK, input_type=_TASK,

View File

@ -1,7 +1,7 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc import grpc
from protos import rpc_pb2 as protos_dot_rpc__pb2 import rpc_pb2 as rpc__pb2
class MasterStub(object): class MasterStub(object):
@ -16,8 +16,8 @@ class MasterStub(object):
""" """
self.report = channel.unary_unary( self.report = channel.unary_unary(
'/Master/report', '/Master/report',
request_serializer=protos_dot_rpc__pb2.Task.SerializeToString, request_serializer=rpc__pb2.Task.SerializeToString,
response_deserializer=protos_dot_rpc__pb2.Reply.FromString, response_deserializer=rpc__pb2.Reply.FromString,
) )
@ -37,8 +37,8 @@ def add_MasterServicer_to_server(servicer, server):
rpc_method_handlers = { rpc_method_handlers = {
'report': grpc.unary_unary_rpc_method_handler( 'report': grpc.unary_unary_rpc_method_handler(
servicer.report, servicer.report,
request_deserializer=protos_dot_rpc__pb2.Task.FromString, request_deserializer=rpc__pb2.Task.FromString,
response_serializer=protos_dot_rpc__pb2.Reply.SerializeToString, response_serializer=rpc__pb2.Reply.SerializeToString,
), ),
} }
generic_handler = grpc.method_handlers_generic_handler( generic_handler = grpc.method_handlers_generic_handler(
@ -56,10 +56,10 @@ class WorkerStub(object):
Args: Args:
channel: A grpc.Channel. channel: A grpc.Channel.
""" """
self.add_task = channel.unary_unary( self.process_task = channel.unary_unary(
'/Worker/add_task', '/Worker/process_task',
request_serializer=protos_dot_rpc__pb2.Task.SerializeToString, request_serializer=rpc__pb2.Task.SerializeToString,
response_deserializer=protos_dot_rpc__pb2.Reply.FromString, response_deserializer=rpc__pb2.Reply.FromString,
) )
@ -67,9 +67,9 @@ class WorkerServicer(object):
# missing associated documentation comment in .proto file # missing associated documentation comment in .proto file
pass pass
def add_task(self, request, context): def process_task(self, request, context):
# missing associated documentation comment in .proto file """rpc add_task (Task) returns (Reply) {}
pass """
context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!') context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!') raise NotImplementedError('Method not implemented!')
@ -77,10 +77,10 @@ class WorkerServicer(object):
def add_WorkerServicer_to_server(servicer, server): def add_WorkerServicer_to_server(servicer, server):
rpc_method_handlers = { rpc_method_handlers = {
'add_task': grpc.unary_unary_rpc_method_handler( 'process_task': grpc.unary_unary_rpc_method_handler(
servicer.add_task, servicer.process_task,
request_deserializer=protos_dot_rpc__pb2.Task.FromString, request_deserializer=rpc__pb2.Task.FromString,
response_serializer=protos_dot_rpc__pb2.Reply.SerializeToString, response_serializer=rpc__pb2.Reply.SerializeToString,
), ),
} }
generic_handler = grpc.method_handlers_generic_handler( generic_handler = grpc.method_handlers_generic_handler(

View File

@ -1,23 +1,25 @@
#!/usr/bin/python3 #!/usr/bin/python3
import xmlrpc.client from concurrent import futures
from log import logger import grpc
import env from utils.log import logger
from utils import env
import json,lxc,subprocess,threading,os import json,lxc,subprocess,threading,os
import imagemgr from utils import imagemgr
from protos import rpc_pb2, rpc_pb2_grpc
class TaskController(object): class TaskController(rpc_pb2_grpc.WorkerServicer):
def __init__(self): def __init__(self):
self.imgmgr = imagemgr.ImageMgr() self.imgmgr = imagemgr.ImageMgr()
self.fspath = env.getenv('FS_PREFIX') self.fspath = env.getenv('FS_PREFIX')
self.confpath = env.getenv('DOCKLET_CONF') self.confpath = env.getenv('DOCKLET_CONF')
self.masterip = '162.105.88.190' #self.masterip = '162.105.88.190'
self.masterport = 9002 #self.masterport = 9002
self.masterrpc = xmlrpc.client.ServerProxy("http://%s:%s" % (self.masterip,self.masterport)) #self.masterrpc = xmlrpc.client.ServerProxy("http://%s:%s" % (self.masterip,self.masterport))
logger.info('TaskController init success') logger.info('TaskController init success')
def process_task(self, parameter): def process_task(self, request, context):
logger.info('excute task with parameter: ' + parameter) logger.info('excute task with parameter: ' + parameter)
parameter = json.loads(parameter) parameter = json.loads(parameter)
jobid = parameter['JobId'] jobid = parameter['JobId']