Update taskcontroller to create container by grpc

This commit is contained in:
Firmlyzhu 2018-07-19 19:03:24 +08:00
parent 8645c6260c
commit 7e0603b1fb
6 changed files with 309 additions and 117 deletions

50
conf/container.batch.conf Normal file
View File

@ -0,0 +1,50 @@
# This is the common container.conf for all containers.
# If want set custom settings, you have two choices:
# 1. Directly modify this file, which is not recommend, because the
# setting will be overriden when new version container.conf released.
# 2. Use a custom config file in this conf directory: lxc.custom.conf,
# it uses the same grammer as container.conf, and will be merged
# with the default container.conf by docklet at runtime.
#
# The following is an example mounting user html directory
# lxc.mount.entry = /public/home/%USERNAME%/public_html %ROOTFS%/root/public_html none bind,rw,create=dir 0 0
#
#### include /usr/share/lxc/config/ubuntu.common.conf
lxc.include = /usr/share/lxc/config/ubuntu.common.conf
############## DOCKLET CONFIG ##############
# Setup 0 tty devices
lxc.tty = 0
lxc.rootfs = %ROOTFS%
lxc.utsname = %HOSTNAME%
lxc.network.type = veth
lxc.network.name = eth0
lxc.network.link = lxcbr0
lxc.network.flags = up
lxc.cgroup.pids.max = 2048
lxc.cgroup.memory.limit_in_bytes = %CONTAINER_MEMORY%M
#lxc.cgroup.memory.kmem.limit_in_bytes = 512M
#lxc.cgroup.memory.soft_limit_in_bytes = 4294967296
#lxc.cgroup.memory.memsw.limit_in_bytes = 8589934592
# lxc.cgroup.cpu.cfs_period_us : period time of cpu, default 100000, means 100ms
# lxc.cgroup.cpu.cfs_quota_us : quota time of this process
lxc.cgroup.cpu.cfs_quota_us = %CONTAINER_CPU%
lxc.cap.drop = sys_admin net_admin mac_admin mac_override sys_time sys_module
lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/data %ROOTFS%/root/nfs none bind,rw,create=dir 0 0
#lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/hosts/%CLUSTERID%.hosts %ROOTFS%/etc/hosts none bind,ro,create=file 0 0
lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/ssh %ROOTFS%/root/.ssh none bind,ro,create=dir 0 0
lxc.mount.entry = %FS_PREFIX%/local/temp/%LXCNAME%/ %ROOTFS%/tmp none bind,rw,create=dir 0 0
# setting hostname
lxc.hook.pre-start = HNAME=%HOSTNAME% %LXCSCRIPT%/lxc-prestart
# setting nfs softlink
#lxc.hook.mount = %LXCSCRIPT%/lxc-mount

View File

@ -0,0 +1,28 @@
import sys
if sys.path[0].endswith("master"):
sys.path[0] = sys.path[0][:-6]
import grpc
from protos import rpc_pb2, rpc_pb2_grpc
def run():
channel = grpc.insecure_channel('localhost:50051')
stub = rpc_pb2_grpc.WorkerStub(channel)
comm = rpc_pb2.Command(commandLine="echo hello_world > test.txt", packagePath=".", envVars={})
paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="", stdoutRedirectPath="")
img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.PUBLIC, owner="docklet")
inst = rpc_pb2.Instance(cpu=2, memory=2000, disk=500, gpu=0)
mnt = rpc_pb2.Mount(localPath="",remotePath="")
clu = rpc_pb2.Cluster(image=img, instance=inst, mount=[mnt])
task = rpc_pb2.Task(id="test",username="root",instanceid=0,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=10)
response = stub.process_task(task)
print("Batch client received: " + str(response.status)+" "+response.message)
if __name__ == '__main__':
run()

View File

@ -1,16 +1,16 @@
syntax = "proto3";
service Master {
rpc report (Task) returns (Reply) {};
rpc report (TaskMsg) returns (Reply) {};
}
service Worker {
//rpc add_task (Task) returns (Reply) {}
rpc process_task (Task) returns (Reply) {}
}
message Reply {
ReplyStatus message = 1; //
ReplyStatus status = 1; //
string message = 2;
enum ReplyStatus {
ACCEPTED = 0;
@ -18,22 +18,30 @@ message Reply {
}
}
message TaskMsg {
string taskid = 1;
int32 instanceid = 2;
Status instanceStatus = 3; //
}
enum Status {
WAITING = 0;
RUNNING = 1;
COMPLETED = 2;
FAILED = 3;
TIMEOUT = 4;
}
message Task {
string id = 1;
TaskStatus status = 2; //
int32 instanceCount = 3; //
int32 maxRetryCount = 4; //
Parameters parameters = 5; //
Cluster cluster = 6; //
int32 Timeout = 7; //
enum TaskStatus {
WAITING = 0;
RUNNING = 1;
COMPLETED = 2;
FAILED = 3;
TIMEOUT = 4;
}
string username = 2;
int32 instanceid = 3;
int32 instanceCount = 4; //
int32 maxRetryCount = 5; //
Parameters parameters = 6; //
Cluster cluster = 7; //
int32 timeout = 8; //
}
message Parameters {

View File

@ -3,6 +3,7 @@
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf.internal import enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
@ -19,36 +20,12 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='rpc.proto',
package='',
syntax='proto3',
serialized_pb=_b('\n\trpc.proto\"V\n\x05Reply\x12#\n\x07message\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"\xff\x01\n\x04Task\x12\n\n\x02id\x18\x01 \x01(\t\x12 \n\x06status\x18\x02 \x01(\x0e\x32\x10.Task.TaskStatus\x12\x15\n\rinstanceCount\x18\x03 \x01(\x05\x12\x15\n\rmaxRetryCount\x18\x04 \x01(\x05\x12\x1f\n\nparameters\x18\x05 \x01(\x0b\x32\x0b.Parameters\x12\x19\n\x07\x63luster\x18\x06 \x01(\x0b\x32\x08.Cluster\x12\x0f\n\x07Timeout\x18\x07 \x01(\x05\"N\n\nTaskStatus\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"T\n\x07\x43luster\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\"j\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\"$\n\tImageType\x12\n\n\x06PUBLIC\x10\x00\x12\x0b\n\x07PRIVATE\x10\x01\".\n\x05Mount\x12\x11\n\tlocalPath\x18\x01 \x01(\t\x12\x12\n\nremotePath\x18\x02 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05\x32#\n\x06Master\x12\x19\n\x06report\x12\x05.Task\x1a\x06.Reply\"\x00\x32)\n\x06Worker\x12\x1f\n\x0cprocess_task\x12\x05.Task\x1a\x06.Reply\"\x00\x62\x06proto3')
serialized_pb=_b('\n\trpc.proto\"f\n\x05Reply\x12\"\n\x06status\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"N\n\x07TaskMsg\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x12\n\ninstanceid\x18\x02 \x01(\x05\x12\x1f\n\x0einstanceStatus\x18\x03 \x01(\x0e\x32\x07.Status\"\xb3\x01\n\x04Task\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x12\n\ninstanceid\x18\x03 \x01(\x05\x12\x15\n\rinstanceCount\x18\x04 \x01(\x05\x12\x15\n\rmaxRetryCount\x18\x05 \x01(\x05\x12\x1f\n\nparameters\x18\x06 \x01(\x0b\x32\x0b.Parameters\x12\x19\n\x07\x63luster\x18\x07 \x01(\x0b\x32\x08.Cluster\x12\x0f\n\x07timeout\x18\x08 \x01(\x05\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"T\n\x07\x43luster\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\"j\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\"$\n\tImageType\x12\n\n\x06PUBLIC\x10\x00\x12\x0b\n\x07PRIVATE\x10\x01\".\n\x05Mount\x12\x11\n\tlocalPath\x18\x01 \x01(\t\x12\x12\n\nremotePath\x18\x02 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05*J\n\x06Status\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x32&\n\x06Master\x12\x1c\n\x06report\x12\x08.TaskMsg\x1a\x06.Reply\"\x00\x32)\n\x06Worker\x12\x1f\n\x0cprocess_task\x12\x05.Task\x1a\x06.Reply\"\x00\x62\x06proto3')
)
_REPLY_REPLYSTATUS = _descriptor.EnumDescriptor(
name='ReplyStatus',
full_name='Reply.ReplyStatus',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='ACCEPTED', index=0, number=0,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='REFUSED', index=1, number=1,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=59,
serialized_end=99,
)
_sym_db.RegisterEnumDescriptor(_REPLY_REPLYSTATUS)
_TASK_TASKSTATUS = _descriptor.EnumDescriptor(
name='TaskStatus',
full_name='Task.TaskStatus',
_STATUS = _descriptor.EnumDescriptor(
name='Status',
full_name='Status',
filename=None,
file=DESCRIPTOR,
values=[
@ -75,10 +52,40 @@ _TASK_TASKSTATUS = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
serialized_start=279,
serialized_end=357,
serialized_start=928,
serialized_end=1002,
)
_sym_db.RegisterEnumDescriptor(_TASK_TASKSTATUS)
_sym_db.RegisterEnumDescriptor(_STATUS)
Status = enum_type_wrapper.EnumTypeWrapper(_STATUS)
WAITING = 0
RUNNING = 1
COMPLETED = 2
FAILED = 3
TIMEOUT = 4
_REPLY_REPLYSTATUS = _descriptor.EnumDescriptor(
name='ReplyStatus',
full_name='Reply.ReplyStatus',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='ACCEPTED', index=0, number=0,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='REFUSED', index=1, number=1,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=75,
serialized_end=115,
)
_sym_db.RegisterEnumDescriptor(_REPLY_REPLYSTATUS)
_IMAGE_IMAGETYPE = _descriptor.EnumDescriptor(
name='ImageType',
@ -97,8 +104,8 @@ _IMAGE_IMAGETYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
serialized_start=754,
serialized_end=790,
serialized_start=774,
serialized_end=810,
)
_sym_db.RegisterEnumDescriptor(_IMAGE_IMAGETYPE)
@ -111,12 +118,19 @@ _REPLY = _descriptor.Descriptor(
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='message', full_name='Reply.message', index=0,
name='status', full_name='Reply.status', index=0,
number=1, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='message', full_name='Reply.message', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
@ -131,7 +145,52 @@ _REPLY = _descriptor.Descriptor(
oneofs=[
],
serialized_start=13,
serialized_end=99,
serialized_end=115,
)
_TASKMSG = _descriptor.Descriptor(
name='TaskMsg',
full_name='TaskMsg',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='taskid', full_name='TaskMsg.taskid', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='instanceid', full_name='TaskMsg.instanceid', index=1,
number=2, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='instanceStatus', full_name='TaskMsg.instanceStatus', index=2,
number=3, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=117,
serialized_end=195,
)
@ -150,43 +209,50 @@ _TASK = _descriptor.Descriptor(
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='status', full_name='Task.status', index=1,
number=2, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
name='username', full_name='Task.username', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='instanceCount', full_name='Task.instanceCount', index=2,
name='instanceid', full_name='Task.instanceid', index=2,
number=3, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='maxRetryCount', full_name='Task.maxRetryCount', index=3,
name='instanceCount', full_name='Task.instanceCount', index=3,
number=4, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='parameters', full_name='Task.parameters', index=4,
number=5, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
name='maxRetryCount', full_name='Task.maxRetryCount', index=4,
number=5, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='cluster', full_name='Task.cluster', index=5,
name='parameters', full_name='Task.parameters', index=5,
number=6, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='Timeout', full_name='Task.Timeout', index=6,
number=7, type=5, cpp_type=1, label=1,
name='cluster', full_name='Task.cluster', index=6,
number=7, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='timeout', full_name='Task.timeout', index=7,
number=8, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
@ -196,7 +262,6 @@ _TASK = _descriptor.Descriptor(
],
nested_types=[],
enum_types=[
_TASK_TASKSTATUS,
],
options=None,
is_extendable=False,
@ -204,8 +269,8 @@ _TASK = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=102,
serialized_end=357,
serialized_start=198,
serialized_end=377,
)
@ -249,8 +314,8 @@ _PARAMETERS = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=359,
serialized_end=454,
serialized_start=379,
serialized_end=474,
)
@ -287,8 +352,8 @@ _COMMAND_ENVVARSENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=550,
serialized_end=596,
serialized_start=570,
serialized_end=616,
)
_COMMAND = _descriptor.Descriptor(
@ -331,8 +396,8 @@ _COMMAND = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=457,
serialized_end=596,
serialized_start=477,
serialized_end=616,
)
@ -376,8 +441,8 @@ _CLUSTER = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=598,
serialized_end=682,
serialized_start=618,
serialized_end=702,
)
@ -422,8 +487,8 @@ _IMAGE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=684,
serialized_end=790,
serialized_start=704,
serialized_end=810,
)
@ -460,8 +525,8 @@ _MOUNT = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=792,
serialized_end=838,
serialized_start=812,
serialized_end=858,
)
@ -512,16 +577,15 @@ _INSTANCE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=840,
serialized_end=906,
serialized_start=860,
serialized_end=926,
)
_REPLY.fields_by_name['message'].enum_type = _REPLY_REPLYSTATUS
_REPLY.fields_by_name['status'].enum_type = _REPLY_REPLYSTATUS
_REPLY_REPLYSTATUS.containing_type = _REPLY
_TASK.fields_by_name['status'].enum_type = _TASK_TASKSTATUS
_TASKMSG.fields_by_name['instanceStatus'].enum_type = _STATUS
_TASK.fields_by_name['parameters'].message_type = _PARAMETERS
_TASK.fields_by_name['cluster'].message_type = _CLUSTER
_TASK_TASKSTATUS.containing_type = _TASK
_PARAMETERS.fields_by_name['command'].message_type = _COMMAND
_COMMAND_ENVVARSENTRY.containing_type = _COMMAND
_COMMAND.fields_by_name['envVars'].message_type = _COMMAND_ENVVARSENTRY
@ -531,6 +595,7 @@ _CLUSTER.fields_by_name['mount'].message_type = _MOUNT
_IMAGE.fields_by_name['type'].enum_type = _IMAGE_IMAGETYPE
_IMAGE_IMAGETYPE.containing_type = _IMAGE
DESCRIPTOR.message_types_by_name['Reply'] = _REPLY
DESCRIPTOR.message_types_by_name['TaskMsg'] = _TASKMSG
DESCRIPTOR.message_types_by_name['Task'] = _TASK
DESCRIPTOR.message_types_by_name['Parameters'] = _PARAMETERS
DESCRIPTOR.message_types_by_name['Command'] = _COMMAND
@ -538,6 +603,7 @@ DESCRIPTOR.message_types_by_name['Cluster'] = _CLUSTER
DESCRIPTOR.message_types_by_name['Image'] = _IMAGE
DESCRIPTOR.message_types_by_name['Mount'] = _MOUNT
DESCRIPTOR.message_types_by_name['Instance'] = _INSTANCE
DESCRIPTOR.enum_types_by_name['Status'] = _STATUS
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
Reply = _reflection.GeneratedProtocolMessageType('Reply', (_message.Message,), dict(
@ -547,6 +613,13 @@ Reply = _reflection.GeneratedProtocolMessageType('Reply', (_message.Message,), d
))
_sym_db.RegisterMessage(Reply)
TaskMsg = _reflection.GeneratedProtocolMessageType('TaskMsg', (_message.Message,), dict(
DESCRIPTOR = _TASKMSG,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:TaskMsg)
))
_sym_db.RegisterMessage(TaskMsg)
Task = _reflection.GeneratedProtocolMessageType('Task', (_message.Message,), dict(
DESCRIPTOR = _TASK,
__module__ = 'rpc_pb2'
@ -614,15 +687,15 @@ _MASTER = _descriptor.ServiceDescriptor(
file=DESCRIPTOR,
index=0,
options=None,
serialized_start=908,
serialized_end=943,
serialized_start=1004,
serialized_end=1042,
methods=[
_descriptor.MethodDescriptor(
name='report',
full_name='Master.report',
index=0,
containing_service=None,
input_type=_TASK,
input_type=_TASKMSG,
output_type=_REPLY,
options=None,
),
@ -638,8 +711,8 @@ _WORKER = _descriptor.ServiceDescriptor(
file=DESCRIPTOR,
index=1,
options=None,
serialized_start=945,
serialized_end=986,
serialized_start=1044,
serialized_end=1085,
methods=[
_descriptor.MethodDescriptor(
name='process_task',

View File

@ -1,7 +1,7 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
import rpc_pb2 as rpc__pb2
from protos import rpc_pb2 as rpc__pb2
class MasterStub(object):
@ -16,7 +16,7 @@ class MasterStub(object):
"""
self.report = channel.unary_unary(
'/Master/report',
request_serializer=rpc__pb2.Task.SerializeToString,
request_serializer=rpc__pb2.TaskMsg.SerializeToString,
response_deserializer=rpc__pb2.Reply.FromString,
)
@ -37,7 +37,7 @@ def add_MasterServicer_to_server(servicer, server):
rpc_method_handlers = {
'report': grpc.unary_unary_rpc_method_handler(
servicer.report,
request_deserializer=rpc__pb2.Task.FromString,
request_deserializer=rpc__pb2.TaskMsg.FromString,
response_serializer=rpc__pb2.Reply.SerializeToString,
),
}
@ -68,8 +68,8 @@ class WorkerServicer(object):
pass
def process_task(self, request, context):
"""rpc add_task (Task) returns (Reply) {}
"""
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

View File

@ -1,16 +1,27 @@
#!/usr/bin/python3
import sys
if sys.path[0].endswith("worker"):
sys.path[0] = sys.path[0][:-6]
from utils import env, tools
#config = env.getenv("CONFIG")
config = "/opt/docklet/local/docklet-running.conf"
tools.loadenv(config)
from utils.log import initlogging
initlogging("docklet-worker")
from utils.log import logger
from concurrent import futures
import grpc
from utils.log import logger
from utils import env
import json,lxc,subprocess,threading,os
#from utils.log import logger
#from utils import env
import json,lxc,subprocess,threading,os,time
from utils import imagemgr
from protos import rpc_pb2, rpc_pb2_grpc
class TaskController(rpc_pb2_grpc.WorkerServicer):
def __init__(self):
rpc_pb2_grpc.WorkerServicer.__init__(self)
self.imgmgr = imagemgr.ImageMgr()
self.fspath = env.getenv('FS_PREFIX')
self.confpath = env.getenv('DOCKLET_CONF')
@ -20,22 +31,24 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
logger.info('TaskController init success')
def process_task(self, request, context):
logger.info('excute task with parameter: ' + parameter)
parameter = json.loads(parameter)
jobid = parameter['JobId']
taskid = parameter['TaskId']
taskno = parameter['TaskNo']
username = parameter['UserName']
lxcname = '%s-%s-%s-%s' % (username,jobid,taskid,taskno)
command = '/root/getenv.sh' #parameter['Parameters']['Command']['CommandLine']
envs = {'MYENV1':'MYVAL1', 'MYENV2':'MYVAL2'} #parameters['Parameters']['Command']['EnvVars']
envs['TASK_NO']=taskno
image = parameter['ImageId']
instance_type = parameter['InstanceType']
logger.info('excute task with parameter: ' + str(request))
taskid = request.id
instanceid = request.instanceid
status = self.imgmgr.prepareFS(username,image,lxcname,instance_type['disk'])
command = request.parameters.command.commandLine #'/root/getenv.sh' #parameter['Parameters']['Command']['CommandLine']
#envs = {'MYENV1':'MYVAL1', 'MYENV2':'MYVAL2'} #parameters['Parameters']['Command']['EnvVars']
envs = request.parameters.command.envVars
image = {}
image['name'] = request.cluster.image.name
image['type'] = 'private' if request.cluster.image.type == rpc_pb2.Image.PRIVATE else 'public'
image['owner'] = request.cluster.image.owner
username = request.username
lxcname = '%s-batch-%s-%s' % (username,taskid,str(instanceid))
instance_type = request.cluster.instance
status = self.imgmgr.prepareFS(username,image,lxcname,str(instance_type.disk))
if not status:
return [False, "Create container for batch failed when preparing filesystem"]
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED, message="Create container for batch failed when preparing filesystem")
rootfs = "/var/lib/lxc/%s/rootfs" % lxcname
@ -48,13 +61,16 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
def config_prepare(content):
content = content.replace("%ROOTFS%",rootfs)
content = content.replace("%CONTAINER_MEMORY%",str(instance_type['memory']))
content = content.replace("%CONTAINER_CPU%",str(instance_type['cpu']*100000))
content = content.replace("%HOSTNAME%","batch-%s" % instanceid)
content = content.replace("%CONTAINER_MEMORY%",str(instance_type.memory))
content = content.replace("%CONTAINER_CPU%",str(instance_type.cpu*100000))
content = content.replace("%FS_PREFIX%",self.fspath)
content = content.replace("%LXCSCRIPT%",env.getenv("LXC_SCRIPT"))
content = content.replace("%USERNAME%",username)
content = content.replace("%LXCNAME%",lxcname)
return content
logger.info(self.confpath)
conffile = open(self.confpath+"/container.batch.conf", 'r')
conftext = conffile.read()
conffile.close()
@ -68,18 +84,18 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
container = lxc.Container(lxcname)
if not container.start():
logger.error('start container %s failed' % lxcname)
return True
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
#return json.dumps({'success':'false','message': "start container failed"})
else:
logger.info('start container %s success' % lxcname)
#mount oss here
thread = threading.Thread(target = self.excute_task, args=(jobid,taskid,envs,lxcname,command))
thread.setDaemon(True)
thread.start()
#thread = threading.Thread(target = self.excute_task, args=(jobid,taskid,envs,lxcname,command))
#thread.setDaemon(True)
#thread.start()
return True
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
#return json.dumps({'success':'true','message':'task is running'})
def excute_task(self,jobid,taskid,envs,lxcname,command):
@ -109,3 +125,20 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
logger.info("delete container %s success" % lxcname)
else:
logger.error("delete container %s failed" % lxcname)
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
def TaskControllerServe():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
rpc_pb2_grpc.add_WorkerServicer_to_server(TaskController(), server)
server.add_insecure_port('[::]:50051')
server.start()
logger.info("Start TaskController Servicer")
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
if __name__ == "__main__":
TaskControllerServe()