Update protos & add function of reporting tasks msgs

This commit is contained in:
zhuyj17 2018-08-05 17:52:02 +08:00
parent 3bb6feb220
commit 0c9dfe726a
4 changed files with 173 additions and 55 deletions

View File

@ -1,7 +1,7 @@
syntax = "proto3";
service Master {
rpc report (TaskMsg) returns (Reply) {};
rpc report (ReportMsg) returns (Reply) {};
}
service Worker {
@ -18,11 +18,16 @@ message Reply {
}
}
message ReportMsg {
repeated TaskMsg taskmsgs = 1;
}
message TaskMsg {
string taskid = 1;
int32 instanceid = 2;
Status instanceStatus = 3; //
string token = 4;
string errmsg = 5;
}
enum Status {
@ -31,6 +36,7 @@ enum Status {
COMPLETED = 2;
FAILED = 3;
TIMEOUT = 4;
OUTPUTERROR = 5;
}
message TaskInfo {

View File

@ -1,5 +1,5 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: protos/rpc.proto
# source: rpc.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
@ -17,10 +17,10 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='protos/rpc.proto',
name='rpc.proto',
package='',
syntax='proto3',
serialized_pb=_b('\n\x10protos/rpc.proto\"f\n\x05Reply\x12\"\n\x06status\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"]\n\x07TaskMsg\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x12\n\ninstanceid\x18\x02 \x01(\x05\x12\x1f\n\x0einstanceStatus\x18\x03 \x01(\x0e\x32\x07.Status\x12\r\n\x05token\x18\x04 \x01(\t\"\xc6\x01\n\x08TaskInfo\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x12\n\ninstanceid\x18\x03 \x01(\x05\x12\x15\n\rinstanceCount\x18\x04 \x01(\x05\x12\x15\n\rmaxRetryCount\x18\x05 \x01(\x05\x12\x1f\n\nparameters\x18\x06 \x01(\x0b\x32\x0b.Parameters\x12\x19\n\x07\x63luster\x18\x07 \x01(\x0b\x32\x08.Cluster\x12\x0f\n\x07timeout\x18\x08 \x01(\x05\x12\r\n\x05token\x18\t \x01(\t\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"T\n\x07\x43luster\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\"t\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\".\n\tImageType\x12\x08\n\x04\x42\x41SE\x10\x00\x12\n\n\x06PUBLIC\x10\x01\x12\x0b\n\x07PRIVATE\x10\x02\".\n\x05Mount\x12\x11\n\tlocalPath\x18\x01 \x01(\t\x12\x12\n\nremotePath\x18\x02 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05*J\n\x06Status\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x32&\n\x06Master\x12\x1c\n\x06report\x12\x08.TaskMsg\x1a\x06.Reply\"\x00\x32-\n\x06Worker\x12#\n\x0cprocess_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x62\x06proto3')
serialized_pb=_b('\n\trpc.proto\"f\n\x05Reply\x12\"\n\x06status\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"\'\n\tReportMsg\x12\x1a\n\x08taskmsgs\x18\x01 \x03(\x0b\x32\x08.TaskMsg\"m\n\x07TaskMsg\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x12\n\ninstanceid\x18\x02 \x01(\x05\x12\x1f\n\x0einstanceStatus\x18\x03 \x01(\x0e\x32\x07.Status\x12\r\n\x05token\x18\x04 \x01(\t\x12\x0e\n\x06\x65rrmsg\x18\x05 \x01(\t\"\xc6\x01\n\x08TaskInfo\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x12\n\ninstanceid\x18\x03 \x01(\x05\x12\x15\n\rinstanceCount\x18\x04 \x01(\x05\x12\x15\n\rmaxRetryCount\x18\x05 \x01(\x05\x12\x1f\n\nparameters\x18\x06 \x01(\x0b\x32\x0b.Parameters\x12\x19\n\x07\x63luster\x18\x07 \x01(\x0b\x32\x08.Cluster\x12\x0f\n\x07timeout\x18\x08 \x01(\x05\x12\r\n\x05token\x18\t \x01(\t\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"T\n\x07\x43luster\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\"t\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\".\n\tImageType\x12\x08\n\x04\x42\x41SE\x10\x00\x12\n\n\x06PUBLIC\x10\x01\x12\x0b\n\x07PRIVATE\x10\x02\".\n\x05Mount\x12\x11\n\tlocalPath\x18\x01 \x01(\t\x12\x12\n\nremotePath\x18\x02 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05*[\n\x06Status\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x12\x0f\n\x0bOUTPUTERROR\x10\x05\x32(\n\x06Master\x12\x1e\n\x06report\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x32-\n\x06Worker\x12#\n\x0cprocess_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x62\x06proto3')
)
_STATUS = _descriptor.EnumDescriptor(
@ -49,11 +49,15 @@ _STATUS = _descriptor.EnumDescriptor(
name='TIMEOUT', index=4, number=4,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='OUTPUTERROR', index=5, number=5,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=979,
serialized_end=1053,
serialized_start=1029,
serialized_end=1120,
)
_sym_db.RegisterEnumDescriptor(_STATUS)
@ -63,6 +67,7 @@ RUNNING = 1
COMPLETED = 2
FAILED = 3
TIMEOUT = 4
OUTPUTERROR = 5
_REPLY_REPLYSTATUS = _descriptor.EnumDescriptor(
@ -82,8 +87,8 @@ _REPLY_REPLYSTATUS = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
serialized_start=82,
serialized_end=122,
serialized_start=75,
serialized_end=115,
)
_sym_db.RegisterEnumDescriptor(_REPLY_REPLYSTATUS)
@ -108,8 +113,8 @@ _IMAGE_IMAGETYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
serialized_start=815,
serialized_end=861,
serialized_start=865,
serialized_end=911,
)
_sym_db.RegisterEnumDescriptor(_IMAGE_IMAGETYPE)
@ -148,8 +153,39 @@ _REPLY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=20,
serialized_end=122,
serialized_start=13,
serialized_end=115,
)
_REPORTMSG = _descriptor.Descriptor(
name='ReportMsg',
full_name='ReportMsg',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='taskmsgs', full_name='ReportMsg.taskmsgs', index=0,
number=1, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=117,
serialized_end=156,
)
@ -188,6 +224,13 @@ _TASKMSG = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='errmsg', full_name='TaskMsg.errmsg', index=4,
number=5, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
@ -200,8 +243,8 @@ _TASKMSG = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=124,
serialized_end=217,
serialized_start=158,
serialized_end=267,
)
@ -287,8 +330,8 @@ _TASKINFO = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=220,
serialized_end=418,
serialized_start=270,
serialized_end=468,
)
@ -332,8 +375,8 @@ _PARAMETERS = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=420,
serialized_end=515,
serialized_start=470,
serialized_end=565,
)
@ -370,8 +413,8 @@ _COMMAND_ENVVARSENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=611,
serialized_end=657,
serialized_start=661,
serialized_end=707,
)
_COMMAND = _descriptor.Descriptor(
@ -414,8 +457,8 @@ _COMMAND = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=518,
serialized_end=657,
serialized_start=568,
serialized_end=707,
)
@ -459,8 +502,8 @@ _CLUSTER = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=659,
serialized_end=743,
serialized_start=709,
serialized_end=793,
)
@ -505,8 +548,8 @@ _IMAGE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=745,
serialized_end=861,
serialized_start=795,
serialized_end=911,
)
@ -543,8 +586,8 @@ _MOUNT = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=863,
serialized_end=909,
serialized_start=913,
serialized_end=959,
)
@ -595,12 +638,13 @@ _INSTANCE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=911,
serialized_end=977,
serialized_start=961,
serialized_end=1027,
)
_REPLY.fields_by_name['status'].enum_type = _REPLY_REPLYSTATUS
_REPLY_REPLYSTATUS.containing_type = _REPLY
_REPORTMSG.fields_by_name['taskmsgs'].message_type = _TASKMSG
_TASKMSG.fields_by_name['instanceStatus'].enum_type = _STATUS
_TASKINFO.fields_by_name['parameters'].message_type = _PARAMETERS
_TASKINFO.fields_by_name['cluster'].message_type = _CLUSTER
@ -613,6 +657,7 @@ _CLUSTER.fields_by_name['mount'].message_type = _MOUNT
_IMAGE.fields_by_name['type'].enum_type = _IMAGE_IMAGETYPE
_IMAGE_IMAGETYPE.containing_type = _IMAGE
DESCRIPTOR.message_types_by_name['Reply'] = _REPLY
DESCRIPTOR.message_types_by_name['ReportMsg'] = _REPORTMSG
DESCRIPTOR.message_types_by_name['TaskMsg'] = _TASKMSG
DESCRIPTOR.message_types_by_name['TaskInfo'] = _TASKINFO
DESCRIPTOR.message_types_by_name['Parameters'] = _PARAMETERS
@ -626,28 +671,35 @@ _sym_db.RegisterFileDescriptor(DESCRIPTOR)
Reply = _reflection.GeneratedProtocolMessageType('Reply', (_message.Message,), dict(
DESCRIPTOR = _REPLY,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Reply)
))
_sym_db.RegisterMessage(Reply)
ReportMsg = _reflection.GeneratedProtocolMessageType('ReportMsg', (_message.Message,), dict(
DESCRIPTOR = _REPORTMSG,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:ReportMsg)
))
_sym_db.RegisterMessage(ReportMsg)
TaskMsg = _reflection.GeneratedProtocolMessageType('TaskMsg', (_message.Message,), dict(
DESCRIPTOR = _TASKMSG,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:TaskMsg)
))
_sym_db.RegisterMessage(TaskMsg)
TaskInfo = _reflection.GeneratedProtocolMessageType('TaskInfo', (_message.Message,), dict(
DESCRIPTOR = _TASKINFO,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:TaskInfo)
))
_sym_db.RegisterMessage(TaskInfo)
Parameters = _reflection.GeneratedProtocolMessageType('Parameters', (_message.Message,), dict(
DESCRIPTOR = _PARAMETERS,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Parameters)
))
_sym_db.RegisterMessage(Parameters)
@ -656,12 +708,12 @@ Command = _reflection.GeneratedProtocolMessageType('Command', (_message.Message,
EnvVarsEntry = _reflection.GeneratedProtocolMessageType('EnvVarsEntry', (_message.Message,), dict(
DESCRIPTOR = _COMMAND_ENVVARSENTRY,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Command.EnvVarsEntry)
))
,
DESCRIPTOR = _COMMAND,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Command)
))
_sym_db.RegisterMessage(Command)
@ -669,28 +721,28 @@ _sym_db.RegisterMessage(Command.EnvVarsEntry)
Cluster = _reflection.GeneratedProtocolMessageType('Cluster', (_message.Message,), dict(
DESCRIPTOR = _CLUSTER,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Cluster)
))
_sym_db.RegisterMessage(Cluster)
Image = _reflection.GeneratedProtocolMessageType('Image', (_message.Message,), dict(
DESCRIPTOR = _IMAGE,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Image)
))
_sym_db.RegisterMessage(Image)
Mount = _reflection.GeneratedProtocolMessageType('Mount', (_message.Message,), dict(
DESCRIPTOR = _MOUNT,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Mount)
))
_sym_db.RegisterMessage(Mount)
Instance = _reflection.GeneratedProtocolMessageType('Instance', (_message.Message,), dict(
DESCRIPTOR = _INSTANCE,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Instance)
))
_sym_db.RegisterMessage(Instance)
@ -705,15 +757,15 @@ _MASTER = _descriptor.ServiceDescriptor(
file=DESCRIPTOR,
index=0,
options=None,
serialized_start=1055,
serialized_end=1093,
serialized_start=1122,
serialized_end=1162,
methods=[
_descriptor.MethodDescriptor(
name='report',
full_name='Master.report',
index=0,
containing_service=None,
input_type=_TASKMSG,
input_type=_REPORTMSG,
output_type=_REPLY,
options=None,
),
@ -729,8 +781,8 @@ _WORKER = _descriptor.ServiceDescriptor(
file=DESCRIPTOR,
index=1,
options=None,
serialized_start=1095,
serialized_end=1140,
serialized_start=1164,
serialized_end=1209,
methods=[
_descriptor.MethodDescriptor(
name='process_task',

View File

@ -1,7 +1,7 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
from protos import rpc_pb2 as protos_dot_rpc__pb2
from protos import rpc_pb2 as rpc__pb2
class MasterStub(object):
@ -16,8 +16,8 @@ class MasterStub(object):
"""
self.report = channel.unary_unary(
'/Master/report',
request_serializer=protos_dot_rpc__pb2.TaskMsg.SerializeToString,
response_deserializer=protos_dot_rpc__pb2.Reply.FromString,
request_serializer=rpc__pb2.ReportMsg.SerializeToString,
response_deserializer=rpc__pb2.Reply.FromString,
)
@ -37,8 +37,8 @@ def add_MasterServicer_to_server(servicer, server):
rpc_method_handlers = {
'report': grpc.unary_unary_rpc_method_handler(
servicer.report,
request_deserializer=protos_dot_rpc__pb2.TaskMsg.FromString,
response_serializer=protos_dot_rpc__pb2.Reply.SerializeToString,
request_deserializer=rpc__pb2.ReportMsg.FromString,
response_serializer=rpc__pb2.Reply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
@ -58,8 +58,8 @@ class WorkerStub(object):
"""
self.process_task = channel.unary_unary(
'/Worker/process_task',
request_serializer=protos_dot_rpc__pb2.TaskInfo.SerializeToString,
response_deserializer=protos_dot_rpc__pb2.Reply.FromString,
request_serializer=rpc__pb2.TaskInfo.SerializeToString,
response_deserializer=rpc__pb2.Reply.FromString,
)
@ -79,8 +79,8 @@ def add_WorkerServicer_to_server(servicer, server):
rpc_method_handlers = {
'process_task': grpc.unary_unary_rpc_method_handler(
servicer.process_task,
request_deserializer=protos_dot_rpc__pb2.TaskInfo.FromString,
response_serializer=protos_dot_rpc__pb2.Reply.SerializeToString,
request_deserializer=rpc__pb2.TaskInfo.FromString,
response_serializer=rpc__pb2.Reply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(

View File

@ -15,7 +15,7 @@ import grpc
#from utils.log import logger
#from utils import env
import json,lxc,subprocess,threading,os,time,traceback
from utils import imagemgr
from utils import imagemgr,etcdlib
from protos import rpc_pb2, rpc_pb2_grpc
def ip_to_int(addr):
@ -29,9 +29,39 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
def __init__(self):
rpc_pb2_grpc.WorkerServicer.__init__(self)
etcdaddr = env.getenv("ETCD")
logger.info ("using ETCD %s" % etcdaddr )
clustername = env.getenv("CLUSTER_NAME")
logger.info ("using CLUSTER_NAME %s" % clustername )
# init etcdlib client
try:
self.etcdclient = etcdlib.Client(etcdaddr, prefix = clustername)
except Exception:
logger.error ("connect etcd failed, maybe etcd address not correct...")
sys.exit(1)
else:
logger.info("etcd connected")
# get master ip and report port
[success,masterip] = self.etcdclient.getkey("service/master")
if not success:
logger.error("Fail to get master ip address.")
sys.exit(1)
else:
self.master_ip = masterip
logger.info("Get master ip address: %s" % (self.master_ip))
self.master_port = env.getenv('BATCH_MASTER_PORT')
self.imgmgr = imagemgr.ImageMgr()
self.fspath = env.getenv('FS_PREFIX')
self.confpath = env.getenv('DOCKLET_CONF')
self.taskmsgs = []
self.msgslock = threading.Lock()
self.report_interval = 2
self.lock = threading.Lock()
self.cons_gateway = env.getenv('BATCH_GATEWAY')
self.cons_ips = env.getenv('BATCH_NET')
@ -45,6 +75,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
self.free_ips.append(i)
logger.info("Free ip addresses pool %s" % str(self.free_ips))
self.start_report()
logger.info('TaskController init success')
# Need Locks
@ -237,6 +268,35 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
logger.info("release ip address %s" % ip)
self.release_ip(ip)
def add_msg(self,taskid,instanceid,instanceStatus,token,errmsg):
self.msgslock.acquire()
try:
self.msgslock.append(rpc_pb2.TaskMsg(str(taskid),int(instanceid),instanceStatus,token,errmsg))
except Exception as err:
logger.error(traceback.format_exc())
self.msgslock.release()
def report_msg(self):
channel = grpc.insecure_channel(self.master_ip+":"+self.master_port)
stub = rpc_pb2_grpc.MasterStub(channel)
while True:
self.msgslock.acquire()
reportmsg = rpc_pb2.ReportMsg(taskmsgs = self.taskmsgs)
try:
response = stub.report(reportmsg)
logger.info("Response from master by reporting: "+str(response.status)+" "+response.message)
except Exception as err:
logger.error(traceback.format_exc())
self.taskmsgs = []
self.msgslock.release()
time.sleep(self.report_interval)
def start_report(self):
thread = threading.Thread(target = self.report_msg, args=())
thread.setDaemon(True)
thread.start()
logger.info("Start to report task messages to master every %d seconds." % self.report_interval)
_ONE_DAY_IN_SECONDS = 60 * 60 * 24