diff --git a/src/protos/rpc.proto b/src/protos/rpc.proto index 9472b37..a3b05a0 100644 --- a/src/protos/rpc.proto +++ b/src/protos/rpc.proto @@ -1,7 +1,7 @@ syntax = "proto3"; service Master { - rpc report (TaskMsg) returns (Reply) {}; + rpc report (ReportMsg) returns (Reply) {}; } service Worker { @@ -18,11 +18,16 @@ message Reply { } } +message ReportMsg { + repeated TaskMsg taskmsgs = 1; +} + message TaskMsg { string taskid = 1; int32 instanceid = 2; Status instanceStatus = 3; // 任务状态 string token = 4; + string errmsg = 5; } enum Status { @@ -31,6 +36,7 @@ enum Status { COMPLETED = 2; FAILED = 3; TIMEOUT = 4; + OUTPUTERROR = 5; } message TaskInfo { diff --git a/src/protos/rpc_pb2.py b/src/protos/rpc_pb2.py index c46f25a..e9b6a51 100644 --- a/src/protos/rpc_pb2.py +++ b/src/protos/rpc_pb2.py @@ -1,5 +1,5 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! -# source: protos/rpc.proto +# source: rpc.proto import sys _b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) @@ -17,10 +17,10 @@ _sym_db = _symbol_database.Default() DESCRIPTOR = _descriptor.FileDescriptor( - name='protos/rpc.proto', + name='rpc.proto', package='', syntax='proto3', - serialized_pb=_b('\n\x10protos/rpc.proto\"f\n\x05Reply\x12\"\n\x06status\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"]\n\x07TaskMsg\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x12\n\ninstanceid\x18\x02 \x01(\x05\x12\x1f\n\x0einstanceStatus\x18\x03 \x01(\x0e\x32\x07.Status\x12\r\n\x05token\x18\x04 \x01(\t\"\xc6\x01\n\x08TaskInfo\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x12\n\ninstanceid\x18\x03 \x01(\x05\x12\x15\n\rinstanceCount\x18\x04 \x01(\x05\x12\x15\n\rmaxRetryCount\x18\x05 \x01(\x05\x12\x1f\n\nparameters\x18\x06 \x01(\x0b\x32\x0b.Parameters\x12\x19\n\x07\x63luster\x18\x07 \x01(\x0b\x32\x08.Cluster\x12\x0f\n\x07timeout\x18\x08 \x01(\x05\x12\r\n\x05token\x18\t \x01(\t\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"T\n\x07\x43luster\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\"t\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\".\n\tImageType\x12\x08\n\x04\x42\x41SE\x10\x00\x12\n\n\x06PUBLIC\x10\x01\x12\x0b\n\x07PRIVATE\x10\x02\".\n\x05Mount\x12\x11\n\tlocalPath\x18\x01 \x01(\t\x12\x12\n\nremotePath\x18\x02 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05*J\n\x06Status\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x32&\n\x06Master\x12\x1c\n\x06report\x12\x08.TaskMsg\x1a\x06.Reply\"\x00\x32-\n\x06Worker\x12#\n\x0cprocess_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x62\x06proto3') + serialized_pb=_b('\n\trpc.proto\"f\n\x05Reply\x12\"\n\x06status\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"\'\n\tReportMsg\x12\x1a\n\x08taskmsgs\x18\x01 \x03(\x0b\x32\x08.TaskMsg\"m\n\x07TaskMsg\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x12\n\ninstanceid\x18\x02 \x01(\x05\x12\x1f\n\x0einstanceStatus\x18\x03 \x01(\x0e\x32\x07.Status\x12\r\n\x05token\x18\x04 \x01(\t\x12\x0e\n\x06\x65rrmsg\x18\x05 \x01(\t\"\xc6\x01\n\x08TaskInfo\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x12\n\ninstanceid\x18\x03 \x01(\x05\x12\x15\n\rinstanceCount\x18\x04 \x01(\x05\x12\x15\n\rmaxRetryCount\x18\x05 \x01(\x05\x12\x1f\n\nparameters\x18\x06 \x01(\x0b\x32\x0b.Parameters\x12\x19\n\x07\x63luster\x18\x07 \x01(\x0b\x32\x08.Cluster\x12\x0f\n\x07timeout\x18\x08 \x01(\x05\x12\r\n\x05token\x18\t \x01(\t\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"T\n\x07\x43luster\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\"t\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\".\n\tImageType\x12\x08\n\x04\x42\x41SE\x10\x00\x12\n\n\x06PUBLIC\x10\x01\x12\x0b\n\x07PRIVATE\x10\x02\".\n\x05Mount\x12\x11\n\tlocalPath\x18\x01 \x01(\t\x12\x12\n\nremotePath\x18\x02 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05*[\n\x06Status\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x12\x0f\n\x0bOUTPUTERROR\x10\x05\x32(\n\x06Master\x12\x1e\n\x06report\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x32-\n\x06Worker\x12#\n\x0cprocess_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x62\x06proto3') ) _STATUS = _descriptor.EnumDescriptor( @@ -49,11 +49,15 @@ _STATUS = _descriptor.EnumDescriptor( name='TIMEOUT', index=4, number=4, options=None, type=None), + _descriptor.EnumValueDescriptor( + name='OUTPUTERROR', index=5, number=5, + options=None, + type=None), ], containing_type=None, options=None, - serialized_start=979, - serialized_end=1053, + serialized_start=1029, + serialized_end=1120, ) _sym_db.RegisterEnumDescriptor(_STATUS) @@ -63,6 +67,7 @@ RUNNING = 1 COMPLETED = 2 FAILED = 3 TIMEOUT = 4 +OUTPUTERROR = 5 _REPLY_REPLYSTATUS = _descriptor.EnumDescriptor( @@ -82,8 +87,8 @@ _REPLY_REPLYSTATUS = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=82, - serialized_end=122, + serialized_start=75, + serialized_end=115, ) _sym_db.RegisterEnumDescriptor(_REPLY_REPLYSTATUS) @@ -108,8 +113,8 @@ _IMAGE_IMAGETYPE = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=815, - serialized_end=861, + serialized_start=865, + serialized_end=911, ) _sym_db.RegisterEnumDescriptor(_IMAGE_IMAGETYPE) @@ -148,8 +153,39 @@ _REPLY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=20, - serialized_end=122, + serialized_start=13, + serialized_end=115, +) + + +_REPORTMSG = _descriptor.Descriptor( + name='ReportMsg', + full_name='ReportMsg', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='taskmsgs', full_name='ReportMsg.taskmsgs', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=117, + serialized_end=156, ) @@ -188,6 +224,13 @@ _TASKMSG = _descriptor.Descriptor( message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='errmsg', full_name='TaskMsg.errmsg', index=4, + number=5, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), ], extensions=[ ], @@ -200,8 +243,8 @@ _TASKMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=124, - serialized_end=217, + serialized_start=158, + serialized_end=267, ) @@ -287,8 +330,8 @@ _TASKINFO = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=220, - serialized_end=418, + serialized_start=270, + serialized_end=468, ) @@ -332,8 +375,8 @@ _PARAMETERS = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=420, - serialized_end=515, + serialized_start=470, + serialized_end=565, ) @@ -370,8 +413,8 @@ _COMMAND_ENVVARSENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=611, - serialized_end=657, + serialized_start=661, + serialized_end=707, ) _COMMAND = _descriptor.Descriptor( @@ -414,8 +457,8 @@ _COMMAND = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=518, - serialized_end=657, + serialized_start=568, + serialized_end=707, ) @@ -459,8 +502,8 @@ _CLUSTER = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=659, - serialized_end=743, + serialized_start=709, + serialized_end=793, ) @@ -505,8 +548,8 @@ _IMAGE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=745, - serialized_end=861, + serialized_start=795, + serialized_end=911, ) @@ -543,8 +586,8 @@ _MOUNT = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=863, - serialized_end=909, + serialized_start=913, + serialized_end=959, ) @@ -595,12 +638,13 @@ _INSTANCE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=911, - serialized_end=977, + serialized_start=961, + serialized_end=1027, ) _REPLY.fields_by_name['status'].enum_type = _REPLY_REPLYSTATUS _REPLY_REPLYSTATUS.containing_type = _REPLY +_REPORTMSG.fields_by_name['taskmsgs'].message_type = _TASKMSG _TASKMSG.fields_by_name['instanceStatus'].enum_type = _STATUS _TASKINFO.fields_by_name['parameters'].message_type = _PARAMETERS _TASKINFO.fields_by_name['cluster'].message_type = _CLUSTER @@ -613,6 +657,7 @@ _CLUSTER.fields_by_name['mount'].message_type = _MOUNT _IMAGE.fields_by_name['type'].enum_type = _IMAGE_IMAGETYPE _IMAGE_IMAGETYPE.containing_type = _IMAGE DESCRIPTOR.message_types_by_name['Reply'] = _REPLY +DESCRIPTOR.message_types_by_name['ReportMsg'] = _REPORTMSG DESCRIPTOR.message_types_by_name['TaskMsg'] = _TASKMSG DESCRIPTOR.message_types_by_name['TaskInfo'] = _TASKINFO DESCRIPTOR.message_types_by_name['Parameters'] = _PARAMETERS @@ -626,28 +671,35 @@ _sym_db.RegisterFileDescriptor(DESCRIPTOR) Reply = _reflection.GeneratedProtocolMessageType('Reply', (_message.Message,), dict( DESCRIPTOR = _REPLY, - __module__ = 'protos.rpc_pb2' + __module__ = 'rpc_pb2' # @@protoc_insertion_point(class_scope:Reply) )) _sym_db.RegisterMessage(Reply) +ReportMsg = _reflection.GeneratedProtocolMessageType('ReportMsg', (_message.Message,), dict( + DESCRIPTOR = _REPORTMSG, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:ReportMsg) + )) +_sym_db.RegisterMessage(ReportMsg) + TaskMsg = _reflection.GeneratedProtocolMessageType('TaskMsg', (_message.Message,), dict( DESCRIPTOR = _TASKMSG, - __module__ = 'protos.rpc_pb2' + __module__ = 'rpc_pb2' # @@protoc_insertion_point(class_scope:TaskMsg) )) _sym_db.RegisterMessage(TaskMsg) TaskInfo = _reflection.GeneratedProtocolMessageType('TaskInfo', (_message.Message,), dict( DESCRIPTOR = _TASKINFO, - __module__ = 'protos.rpc_pb2' + __module__ = 'rpc_pb2' # @@protoc_insertion_point(class_scope:TaskInfo) )) _sym_db.RegisterMessage(TaskInfo) Parameters = _reflection.GeneratedProtocolMessageType('Parameters', (_message.Message,), dict( DESCRIPTOR = _PARAMETERS, - __module__ = 'protos.rpc_pb2' + __module__ = 'rpc_pb2' # @@protoc_insertion_point(class_scope:Parameters) )) _sym_db.RegisterMessage(Parameters) @@ -656,12 +708,12 @@ Command = _reflection.GeneratedProtocolMessageType('Command', (_message.Message, EnvVarsEntry = _reflection.GeneratedProtocolMessageType('EnvVarsEntry', (_message.Message,), dict( DESCRIPTOR = _COMMAND_ENVVARSENTRY, - __module__ = 'protos.rpc_pb2' + __module__ = 'rpc_pb2' # @@protoc_insertion_point(class_scope:Command.EnvVarsEntry) )) , DESCRIPTOR = _COMMAND, - __module__ = 'protos.rpc_pb2' + __module__ = 'rpc_pb2' # @@protoc_insertion_point(class_scope:Command) )) _sym_db.RegisterMessage(Command) @@ -669,28 +721,28 @@ _sym_db.RegisterMessage(Command.EnvVarsEntry) Cluster = _reflection.GeneratedProtocolMessageType('Cluster', (_message.Message,), dict( DESCRIPTOR = _CLUSTER, - __module__ = 'protos.rpc_pb2' + __module__ = 'rpc_pb2' # @@protoc_insertion_point(class_scope:Cluster) )) _sym_db.RegisterMessage(Cluster) Image = _reflection.GeneratedProtocolMessageType('Image', (_message.Message,), dict( DESCRIPTOR = _IMAGE, - __module__ = 'protos.rpc_pb2' + __module__ = 'rpc_pb2' # @@protoc_insertion_point(class_scope:Image) )) _sym_db.RegisterMessage(Image) Mount = _reflection.GeneratedProtocolMessageType('Mount', (_message.Message,), dict( DESCRIPTOR = _MOUNT, - __module__ = 'protos.rpc_pb2' + __module__ = 'rpc_pb2' # @@protoc_insertion_point(class_scope:Mount) )) _sym_db.RegisterMessage(Mount) Instance = _reflection.GeneratedProtocolMessageType('Instance', (_message.Message,), dict( DESCRIPTOR = _INSTANCE, - __module__ = 'protos.rpc_pb2' + __module__ = 'rpc_pb2' # @@protoc_insertion_point(class_scope:Instance) )) _sym_db.RegisterMessage(Instance) @@ -705,15 +757,15 @@ _MASTER = _descriptor.ServiceDescriptor( file=DESCRIPTOR, index=0, options=None, - serialized_start=1055, - serialized_end=1093, + serialized_start=1122, + serialized_end=1162, methods=[ _descriptor.MethodDescriptor( name='report', full_name='Master.report', index=0, containing_service=None, - input_type=_TASKMSG, + input_type=_REPORTMSG, output_type=_REPLY, options=None, ), @@ -729,8 +781,8 @@ _WORKER = _descriptor.ServiceDescriptor( file=DESCRIPTOR, index=1, options=None, - serialized_start=1095, - serialized_end=1140, + serialized_start=1164, + serialized_end=1209, methods=[ _descriptor.MethodDescriptor( name='process_task', diff --git a/src/protos/rpc_pb2_grpc.py b/src/protos/rpc_pb2_grpc.py index e5a6690..68f8e4f 100644 --- a/src/protos/rpc_pb2_grpc.py +++ b/src/protos/rpc_pb2_grpc.py @@ -1,7 +1,7 @@ # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! import grpc -from protos import rpc_pb2 as protos_dot_rpc__pb2 +from protos import rpc_pb2 as rpc__pb2 class MasterStub(object): @@ -16,8 +16,8 @@ class MasterStub(object): """ self.report = channel.unary_unary( '/Master/report', - request_serializer=protos_dot_rpc__pb2.TaskMsg.SerializeToString, - response_deserializer=protos_dot_rpc__pb2.Reply.FromString, + request_serializer=rpc__pb2.ReportMsg.SerializeToString, + response_deserializer=rpc__pb2.Reply.FromString, ) @@ -37,8 +37,8 @@ def add_MasterServicer_to_server(servicer, server): rpc_method_handlers = { 'report': grpc.unary_unary_rpc_method_handler( servicer.report, - request_deserializer=protos_dot_rpc__pb2.TaskMsg.FromString, - response_serializer=protos_dot_rpc__pb2.Reply.SerializeToString, + request_deserializer=rpc__pb2.ReportMsg.FromString, + response_serializer=rpc__pb2.Reply.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( @@ -58,8 +58,8 @@ class WorkerStub(object): """ self.process_task = channel.unary_unary( '/Worker/process_task', - request_serializer=protos_dot_rpc__pb2.TaskInfo.SerializeToString, - response_deserializer=protos_dot_rpc__pb2.Reply.FromString, + request_serializer=rpc__pb2.TaskInfo.SerializeToString, + response_deserializer=rpc__pb2.Reply.FromString, ) @@ -79,8 +79,8 @@ def add_WorkerServicer_to_server(servicer, server): rpc_method_handlers = { 'process_task': grpc.unary_unary_rpc_method_handler( servicer.process_task, - request_deserializer=protos_dot_rpc__pb2.TaskInfo.FromString, - response_serializer=protos_dot_rpc__pb2.Reply.SerializeToString, + request_deserializer=rpc__pb2.TaskInfo.FromString, + response_serializer=rpc__pb2.Reply.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( diff --git a/src/worker/taskcontroller.py b/src/worker/taskcontroller.py index 174f1ce..916bb36 100755 --- a/src/worker/taskcontroller.py +++ b/src/worker/taskcontroller.py @@ -15,7 +15,7 @@ import grpc #from utils.log import logger #from utils import env import json,lxc,subprocess,threading,os,time,traceback -from utils import imagemgr +from utils import imagemgr,etcdlib from protos import rpc_pb2, rpc_pb2_grpc def ip_to_int(addr): @@ -29,9 +29,39 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): def __init__(self): rpc_pb2_grpc.WorkerServicer.__init__(self) + etcdaddr = env.getenv("ETCD") + logger.info ("using ETCD %s" % etcdaddr ) + + clustername = env.getenv("CLUSTER_NAME") + logger.info ("using CLUSTER_NAME %s" % clustername ) + + # init etcdlib client + try: + self.etcdclient = etcdlib.Client(etcdaddr, prefix = clustername) + except Exception: + logger.error ("connect etcd failed, maybe etcd address not correct...") + sys.exit(1) + else: + logger.info("etcd connected") + + # get master ip and report port + [success,masterip] = self.etcdclient.getkey("service/master") + if not success: + logger.error("Fail to get master ip address.") + sys.exit(1) + else: + self.master_ip = masterip + logger.info("Get master ip address: %s" % (self.master_ip)) + self.master_port = env.getenv('BATCH_MASTER_PORT') + self.imgmgr = imagemgr.ImageMgr() self.fspath = env.getenv('FS_PREFIX') self.confpath = env.getenv('DOCKLET_CONF') + + self.taskmsgs = [] + self.msgslock = threading.Lock() + self.report_interval = 2 + self.lock = threading.Lock() self.cons_gateway = env.getenv('BATCH_GATEWAY') self.cons_ips = env.getenv('BATCH_NET') @@ -45,6 +75,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): self.free_ips.append(i) logger.info("Free ip addresses pool %s" % str(self.free_ips)) + self.start_report() logger.info('TaskController init success') # Need Locks @@ -237,6 +268,35 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): logger.info("release ip address %s" % ip) self.release_ip(ip) + def add_msg(self,taskid,instanceid,instanceStatus,token,errmsg): + self.msgslock.acquire() + try: + self.msgslock.append(rpc_pb2.TaskMsg(str(taskid),int(instanceid),instanceStatus,token,errmsg)) + except Exception as err: + logger.error(traceback.format_exc()) + self.msgslock.release() + + def report_msg(self): + channel = grpc.insecure_channel(self.master_ip+":"+self.master_port) + stub = rpc_pb2_grpc.MasterStub(channel) + while True: + self.msgslock.acquire() + reportmsg = rpc_pb2.ReportMsg(taskmsgs = self.taskmsgs) + try: + response = stub.report(reportmsg) + logger.info("Response from master by reporting: "+str(response.status)+" "+response.message) + except Exception as err: + logger.error(traceback.format_exc()) + self.taskmsgs = [] + self.msgslock.release() + time.sleep(self.report_interval) + + def start_report(self): + thread = threading.Thread(target = self.report_msg, args=()) + thread.setDaemon(True) + thread.start() + logger.info("Start to report task messages to master every %d seconds." % self.report_interval) + _ONE_DAY_IN_SECONDS = 60 * 60 * 24