Merge pull request #309 from FirmlyReality/batch

Batch
This commit is contained in:
Yujian Zhu 2018-07-19 19:07:48 +08:00 committed by GitHub
commit 400b421021
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 423 additions and 120 deletions

50
conf/container.batch.conf Normal file
View File

@ -0,0 +1,50 @@
# This is the common container.conf for all containers.
# If want set custom settings, you have two choices:
# 1. Directly modify this file, which is not recommend, because the
# setting will be overriden when new version container.conf released.
# 2. Use a custom config file in this conf directory: lxc.custom.conf,
# it uses the same grammer as container.conf, and will be merged
# with the default container.conf by docklet at runtime.
#
# The following is an example mounting user html directory
# lxc.mount.entry = /public/home/%USERNAME%/public_html %ROOTFS%/root/public_html none bind,rw,create=dir 0 0
#
#### include /usr/share/lxc/config/ubuntu.common.conf
lxc.include = /usr/share/lxc/config/ubuntu.common.conf
############## DOCKLET CONFIG ##############
# Setup 0 tty devices
lxc.tty = 0
lxc.rootfs = %ROOTFS%
lxc.utsname = %HOSTNAME%
lxc.network.type = veth
lxc.network.name = eth0
lxc.network.link = lxcbr0
lxc.network.flags = up
lxc.cgroup.pids.max = 2048
lxc.cgroup.memory.limit_in_bytes = %CONTAINER_MEMORY%M
#lxc.cgroup.memory.kmem.limit_in_bytes = 512M
#lxc.cgroup.memory.soft_limit_in_bytes = 4294967296
#lxc.cgroup.memory.memsw.limit_in_bytes = 8589934592
# lxc.cgroup.cpu.cfs_period_us : period time of cpu, default 100000, means 100ms
# lxc.cgroup.cpu.cfs_quota_us : quota time of this process
lxc.cgroup.cpu.cfs_quota_us = %CONTAINER_CPU%
lxc.cap.drop = sys_admin net_admin mac_admin mac_override sys_time sys_module
lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/data %ROOTFS%/root/nfs none bind,rw,create=dir 0 0
#lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/hosts/%CLUSTERID%.hosts %ROOTFS%/etc/hosts none bind,ro,create=file 0 0
lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/ssh %ROOTFS%/root/.ssh none bind,ro,create=dir 0 0
lxc.mount.entry = %FS_PREFIX%/local/temp/%LXCNAME%/ %ROOTFS%/tmp none bind,rw,create=dir 0 0
# setting hostname
lxc.hook.pre-start = HNAME=%HOSTNAME% %LXCSCRIPT%/lxc-prestart
# setting nfs softlink
#lxc.hook.mount = %LXCSCRIPT%/lxc-mount

View File

@ -16,13 +16,14 @@ fi
# some packages' name maybe different in debian
apt-get install -y cgmanager lxc lxcfs lxc-templates lvm2 bridge-utils curl exim4 openssh-server openvswitch-switch
apt-get install -y python3 python3-netifaces python3-flask python3-flask-sqlalchemy python3-pampy python3-httplib2 python3-pip
apt-get install -y python3-psutil python3-flask-migrate
apt-get install -y python3-psutil python3-flask-migrate python3-paramiko
apt-get install -y python3-lxc
apt-get install -y python3-requests python3-suds
apt-get install -y nodejs nodejs-legacy npm
apt-get install -y etcd
apt-get install -y glusterfs-client attr
apt-get install -y nginx
pip3 install grpcio grpcio-tools googleapis-common-protos
#add ip forward
echo "net.ipv4.ip_forward=1" >>/etc/sysctl.conf

View File

@ -0,0 +1,28 @@
import sys
if sys.path[0].endswith("master"):
sys.path[0] = sys.path[0][:-6]
import grpc
from protos import rpc_pb2, rpc_pb2_grpc
def run():
channel = grpc.insecure_channel('localhost:50051')
stub = rpc_pb2_grpc.WorkerStub(channel)
comm = rpc_pb2.Command(commandLine="echo hello_world > test.txt", packagePath=".", envVars={})
paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="", stdoutRedirectPath="")
img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.PUBLIC, owner="docklet")
inst = rpc_pb2.Instance(cpu=2, memory=2000, disk=500, gpu=0)
mnt = rpc_pb2.Mount(localPath="",remotePath="")
clu = rpc_pb2.Cluster(image=img, instance=inst, mount=[mnt])
task = rpc_pb2.Task(id="test",username="root",instanceid=0,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=10)
response = stub.process_task(task)
print("Batch client received: " + str(response.status)+" "+response.message)
if __name__ == '__main__':
run()

View File

@ -1,15 +1,16 @@
syntax = "proto3";
service Master {
rpc report (Task) returns (Reply) {};
rpc report (TaskMsg) returns (Reply) {};
}
service Worker {
rpc add_task (Task) returns (Reply) {}
rpc process_task (Task) returns (Reply) {}
}
message Reply {
ReplyStatus message = 1; //
ReplyStatus status = 1; //
string message = 2;
enum ReplyStatus {
ACCEPTED = 0;
@ -17,22 +18,30 @@ message Reply {
}
}
message TaskMsg {
string taskid = 1;
int32 instanceid = 2;
Status instanceStatus = 3; //
}
enum Status {
WAITING = 0;
RUNNING = 1;
COMPLETED = 2;
FAILED = 3;
TIMEOUT = 4;
}
message Task {
string id = 1;
TaskStatus status = 2; //
int32 instanceCount = 3; //
int32 maxRetryCount = 4; //
Parameters parameters = 5; //
Cluster cluster = 6; //
int32 Timeout = 7; //
enum TaskStatus {
WAITING = 0;
RUNNING = 1;
COMPLETED = 2;
FAILED = 3;
TIMEOUT = 4;
}
string username = 2;
int32 instanceid = 3;
int32 instanceCount = 4; //
int32 maxRetryCount = 5; //
Parameters parameters = 6; //
Cluster cluster = 7; //
int32 timeout = 8; //
}
message Parameters {
@ -74,4 +83,4 @@ message Instance {
int32 memory = 2; // mb
int32 disk = 3; // mb
int32 gpu = 4; //
}
}

View File

@ -1,8 +1,9 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: protos/rpc.proto
# source: rpc.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf.internal import enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
@ -16,39 +17,15 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='protos/rpc.proto',
name='rpc.proto',
package='',
syntax='proto3',
serialized_pb=_b('\n\x10protos/rpc.proto\"V\n\x05Reply\x12#\n\x07message\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"\xff\x01\n\x04Task\x12\n\n\x02id\x18\x01 \x01(\t\x12 \n\x06status\x18\x02 \x01(\x0e\x32\x10.Task.TaskStatus\x12\x15\n\rinstanceCount\x18\x03 \x01(\x05\x12\x15\n\rmaxRetryCount\x18\x04 \x01(\x05\x12\x1f\n\nparameters\x18\x05 \x01(\x0b\x32\x0b.Parameters\x12\x19\n\x07\x63luster\x18\x06 \x01(\x0b\x32\x08.Cluster\x12\x0f\n\x07Timeout\x18\x07 \x01(\x05\"N\n\nTaskStatus\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"T\n\x07\x43luster\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\"j\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\"$\n\tImageType\x12\n\n\x06PUBLIC\x10\x00\x12\x0b\n\x07PRIVATE\x10\x01\".\n\x05Mount\x12\x11\n\tlocalPath\x18\x01 \x01(\t\x12\x12\n\nremotePath\x18\x02 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05\x32#\n\x06Master\x12\x19\n\x06report\x12\x05.Task\x1a\x06.Reply\"\x00\x32%\n\x06Worker\x12\x1b\n\x08\x61\x64\x64_task\x12\x05.Task\x1a\x06.Reply\"\x00\x62\x06proto3')
serialized_pb=_b('\n\trpc.proto\"f\n\x05Reply\x12\"\n\x06status\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"N\n\x07TaskMsg\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x12\n\ninstanceid\x18\x02 \x01(\x05\x12\x1f\n\x0einstanceStatus\x18\x03 \x01(\x0e\x32\x07.Status\"\xb3\x01\n\x04Task\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x12\n\ninstanceid\x18\x03 \x01(\x05\x12\x15\n\rinstanceCount\x18\x04 \x01(\x05\x12\x15\n\rmaxRetryCount\x18\x05 \x01(\x05\x12\x1f\n\nparameters\x18\x06 \x01(\x0b\x32\x0b.Parameters\x12\x19\n\x07\x63luster\x18\x07 \x01(\x0b\x32\x08.Cluster\x12\x0f\n\x07timeout\x18\x08 \x01(\x05\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"T\n\x07\x43luster\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\"j\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\"$\n\tImageType\x12\n\n\x06PUBLIC\x10\x00\x12\x0b\n\x07PRIVATE\x10\x01\".\n\x05Mount\x12\x11\n\tlocalPath\x18\x01 \x01(\t\x12\x12\n\nremotePath\x18\x02 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05*J\n\x06Status\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x32&\n\x06Master\x12\x1c\n\x06report\x12\x08.TaskMsg\x1a\x06.Reply\"\x00\x32)\n\x06Worker\x12\x1f\n\x0cprocess_task\x12\x05.Task\x1a\x06.Reply\"\x00\x62\x06proto3')
)
_REPLY_REPLYSTATUS = _descriptor.EnumDescriptor(
name='ReplyStatus',
full_name='Reply.ReplyStatus',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='ACCEPTED', index=0, number=0,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='REFUSED', index=1, number=1,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=66,
serialized_end=106,
)
_sym_db.RegisterEnumDescriptor(_REPLY_REPLYSTATUS)
_TASK_TASKSTATUS = _descriptor.EnumDescriptor(
name='TaskStatus',
full_name='Task.TaskStatus',
_STATUS = _descriptor.EnumDescriptor(
name='Status',
full_name='Status',
filename=None,
file=DESCRIPTOR,
values=[
@ -75,10 +52,40 @@ _TASK_TASKSTATUS = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
serialized_start=286,
serialized_end=364,
serialized_start=928,
serialized_end=1002,
)
_sym_db.RegisterEnumDescriptor(_TASK_TASKSTATUS)
_sym_db.RegisterEnumDescriptor(_STATUS)
Status = enum_type_wrapper.EnumTypeWrapper(_STATUS)
WAITING = 0
RUNNING = 1
COMPLETED = 2
FAILED = 3
TIMEOUT = 4
_REPLY_REPLYSTATUS = _descriptor.EnumDescriptor(
name='ReplyStatus',
full_name='Reply.ReplyStatus',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='ACCEPTED', index=0, number=0,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='REFUSED', index=1, number=1,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=75,
serialized_end=115,
)
_sym_db.RegisterEnumDescriptor(_REPLY_REPLYSTATUS)
_IMAGE_IMAGETYPE = _descriptor.EnumDescriptor(
name='ImageType',
@ -97,8 +104,8 @@ _IMAGE_IMAGETYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
serialized_start=761,
serialized_end=797,
serialized_start=774,
serialized_end=810,
)
_sym_db.RegisterEnumDescriptor(_IMAGE_IMAGETYPE)
@ -111,12 +118,19 @@ _REPLY = _descriptor.Descriptor(
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='message', full_name='Reply.message', index=0,
name='status', full_name='Reply.status', index=0,
number=1, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='message', full_name='Reply.message', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
@ -130,8 +144,53 @@ _REPLY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=20,
serialized_end=106,
serialized_start=13,
serialized_end=115,
)
_TASKMSG = _descriptor.Descriptor(
name='TaskMsg',
full_name='TaskMsg',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='taskid', full_name='TaskMsg.taskid', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='instanceid', full_name='TaskMsg.instanceid', index=1,
number=2, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='instanceStatus', full_name='TaskMsg.instanceStatus', index=2,
number=3, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=117,
serialized_end=195,
)
@ -150,43 +209,50 @@ _TASK = _descriptor.Descriptor(
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='status', full_name='Task.status', index=1,
number=2, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
name='username', full_name='Task.username', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='instanceCount', full_name='Task.instanceCount', index=2,
name='instanceid', full_name='Task.instanceid', index=2,
number=3, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='maxRetryCount', full_name='Task.maxRetryCount', index=3,
name='instanceCount', full_name='Task.instanceCount', index=3,
number=4, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='parameters', full_name='Task.parameters', index=4,
number=5, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
name='maxRetryCount', full_name='Task.maxRetryCount', index=4,
number=5, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='cluster', full_name='Task.cluster', index=5,
name='parameters', full_name='Task.parameters', index=5,
number=6, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='Timeout', full_name='Task.Timeout', index=6,
number=7, type=5, cpp_type=1, label=1,
name='cluster', full_name='Task.cluster', index=6,
number=7, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='timeout', full_name='Task.timeout', index=7,
number=8, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
@ -196,7 +262,6 @@ _TASK = _descriptor.Descriptor(
],
nested_types=[],
enum_types=[
_TASK_TASKSTATUS,
],
options=None,
is_extendable=False,
@ -204,8 +269,8 @@ _TASK = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=109,
serialized_end=364,
serialized_start=198,
serialized_end=377,
)
@ -249,8 +314,8 @@ _PARAMETERS = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=366,
serialized_end=461,
serialized_start=379,
serialized_end=474,
)
@ -287,8 +352,8 @@ _COMMAND_ENVVARSENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=557,
serialized_end=603,
serialized_start=570,
serialized_end=616,
)
_COMMAND = _descriptor.Descriptor(
@ -331,8 +396,8 @@ _COMMAND = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=464,
serialized_end=603,
serialized_start=477,
serialized_end=616,
)
@ -376,8 +441,8 @@ _CLUSTER = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=605,
serialized_end=689,
serialized_start=618,
serialized_end=702,
)
@ -422,8 +487,8 @@ _IMAGE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=691,
serialized_end=797,
serialized_start=704,
serialized_end=810,
)
@ -460,8 +525,8 @@ _MOUNT = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=799,
serialized_end=845,
serialized_start=812,
serialized_end=858,
)
@ -512,16 +577,15 @@ _INSTANCE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=847,
serialized_end=913,
serialized_start=860,
serialized_end=926,
)
_REPLY.fields_by_name['message'].enum_type = _REPLY_REPLYSTATUS
_REPLY.fields_by_name['status'].enum_type = _REPLY_REPLYSTATUS
_REPLY_REPLYSTATUS.containing_type = _REPLY
_TASK.fields_by_name['status'].enum_type = _TASK_TASKSTATUS
_TASKMSG.fields_by_name['instanceStatus'].enum_type = _STATUS
_TASK.fields_by_name['parameters'].message_type = _PARAMETERS
_TASK.fields_by_name['cluster'].message_type = _CLUSTER
_TASK_TASKSTATUS.containing_type = _TASK
_PARAMETERS.fields_by_name['command'].message_type = _COMMAND
_COMMAND_ENVVARSENTRY.containing_type = _COMMAND
_COMMAND.fields_by_name['envVars'].message_type = _COMMAND_ENVVARSENTRY
@ -531,6 +595,7 @@ _CLUSTER.fields_by_name['mount'].message_type = _MOUNT
_IMAGE.fields_by_name['type'].enum_type = _IMAGE_IMAGETYPE
_IMAGE_IMAGETYPE.containing_type = _IMAGE
DESCRIPTOR.message_types_by_name['Reply'] = _REPLY
DESCRIPTOR.message_types_by_name['TaskMsg'] = _TASKMSG
DESCRIPTOR.message_types_by_name['Task'] = _TASK
DESCRIPTOR.message_types_by_name['Parameters'] = _PARAMETERS
DESCRIPTOR.message_types_by_name['Command'] = _COMMAND
@ -538,25 +603,33 @@ DESCRIPTOR.message_types_by_name['Cluster'] = _CLUSTER
DESCRIPTOR.message_types_by_name['Image'] = _IMAGE
DESCRIPTOR.message_types_by_name['Mount'] = _MOUNT
DESCRIPTOR.message_types_by_name['Instance'] = _INSTANCE
DESCRIPTOR.enum_types_by_name['Status'] = _STATUS
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
Reply = _reflection.GeneratedProtocolMessageType('Reply', (_message.Message,), dict(
DESCRIPTOR = _REPLY,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Reply)
))
_sym_db.RegisterMessage(Reply)
TaskMsg = _reflection.GeneratedProtocolMessageType('TaskMsg', (_message.Message,), dict(
DESCRIPTOR = _TASKMSG,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:TaskMsg)
))
_sym_db.RegisterMessage(TaskMsg)
Task = _reflection.GeneratedProtocolMessageType('Task', (_message.Message,), dict(
DESCRIPTOR = _TASK,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Task)
))
_sym_db.RegisterMessage(Task)
Parameters = _reflection.GeneratedProtocolMessageType('Parameters', (_message.Message,), dict(
DESCRIPTOR = _PARAMETERS,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Parameters)
))
_sym_db.RegisterMessage(Parameters)
@ -565,12 +638,12 @@ Command = _reflection.GeneratedProtocolMessageType('Command', (_message.Message,
EnvVarsEntry = _reflection.GeneratedProtocolMessageType('EnvVarsEntry', (_message.Message,), dict(
DESCRIPTOR = _COMMAND_ENVVARSENTRY,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Command.EnvVarsEntry)
))
,
DESCRIPTOR = _COMMAND,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Command)
))
_sym_db.RegisterMessage(Command)
@ -578,28 +651,28 @@ _sym_db.RegisterMessage(Command.EnvVarsEntry)
Cluster = _reflection.GeneratedProtocolMessageType('Cluster', (_message.Message,), dict(
DESCRIPTOR = _CLUSTER,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Cluster)
))
_sym_db.RegisterMessage(Cluster)
Image = _reflection.GeneratedProtocolMessageType('Image', (_message.Message,), dict(
DESCRIPTOR = _IMAGE,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Image)
))
_sym_db.RegisterMessage(Image)
Mount = _reflection.GeneratedProtocolMessageType('Mount', (_message.Message,), dict(
DESCRIPTOR = _MOUNT,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Mount)
))
_sym_db.RegisterMessage(Mount)
Instance = _reflection.GeneratedProtocolMessageType('Instance', (_message.Message,), dict(
DESCRIPTOR = _INSTANCE,
__module__ = 'protos.rpc_pb2'
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Instance)
))
_sym_db.RegisterMessage(Instance)
@ -614,15 +687,15 @@ _MASTER = _descriptor.ServiceDescriptor(
file=DESCRIPTOR,
index=0,
options=None,
serialized_start=915,
serialized_end=950,
serialized_start=1004,
serialized_end=1042,
methods=[
_descriptor.MethodDescriptor(
name='report',
full_name='Master.report',
index=0,
containing_service=None,
input_type=_TASK,
input_type=_TASKMSG,
output_type=_REPLY,
options=None,
),
@ -638,12 +711,12 @@ _WORKER = _descriptor.ServiceDescriptor(
file=DESCRIPTOR,
index=1,
options=None,
serialized_start=952,
serialized_end=989,
serialized_start=1044,
serialized_end=1085,
methods=[
_descriptor.MethodDescriptor(
name='add_task',
full_name='Worker.add_task',
name='process_task',
full_name='Worker.process_task',
index=0,
containing_service=None,
input_type=_TASK,

View File

@ -1,7 +1,7 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
from protos import rpc_pb2 as protos_dot_rpc__pb2
from protos import rpc_pb2 as rpc__pb2
class MasterStub(object):
@ -16,8 +16,8 @@ class MasterStub(object):
"""
self.report = channel.unary_unary(
'/Master/report',
request_serializer=protos_dot_rpc__pb2.Task.SerializeToString,
response_deserializer=protos_dot_rpc__pb2.Reply.FromString,
request_serializer=rpc__pb2.TaskMsg.SerializeToString,
response_deserializer=rpc__pb2.Reply.FromString,
)
@ -37,8 +37,8 @@ def add_MasterServicer_to_server(servicer, server):
rpc_method_handlers = {
'report': grpc.unary_unary_rpc_method_handler(
servicer.report,
request_deserializer=protos_dot_rpc__pb2.Task.FromString,
response_serializer=protos_dot_rpc__pb2.Reply.SerializeToString,
request_deserializer=rpc__pb2.TaskMsg.FromString,
response_serializer=rpc__pb2.Reply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
@ -56,10 +56,10 @@ class WorkerStub(object):
Args:
channel: A grpc.Channel.
"""
self.add_task = channel.unary_unary(
'/Worker/add_task',
request_serializer=protos_dot_rpc__pb2.Task.SerializeToString,
response_deserializer=protos_dot_rpc__pb2.Reply.FromString,
self.process_task = channel.unary_unary(
'/Worker/process_task',
request_serializer=rpc__pb2.Task.SerializeToString,
response_deserializer=rpc__pb2.Reply.FromString,
)
@ -67,7 +67,7 @@ class WorkerServicer(object):
# missing associated documentation comment in .proto file
pass
def add_task(self, request, context):
def process_task(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
@ -77,10 +77,10 @@ class WorkerServicer(object):
def add_WorkerServicer_to_server(servicer, server):
rpc_method_handlers = {
'add_task': grpc.unary_unary_rpc_method_handler(
servicer.add_task,
request_deserializer=protos_dot_rpc__pb2.Task.FromString,
response_serializer=protos_dot_rpc__pb2.Reply.SerializeToString,
'process_task': grpc.unary_unary_rpc_method_handler(
servicer.process_task,
request_deserializer=rpc__pb2.Task.FromString,
response_serializer=rpc__pb2.Reply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(

View File

@ -0,0 +1,144 @@
#!/usr/bin/python3
import sys
if sys.path[0].endswith("worker"):
sys.path[0] = sys.path[0][:-6]
from utils import env, tools
#config = env.getenv("CONFIG")
config = "/opt/docklet/local/docklet-running.conf"
tools.loadenv(config)
from utils.log import initlogging
initlogging("docklet-worker")
from utils.log import logger
from concurrent import futures
import grpc
#from utils.log import logger
#from utils import env
import json,lxc,subprocess,threading,os,time
from utils import imagemgr
from protos import rpc_pb2, rpc_pb2_grpc
class TaskController(rpc_pb2_grpc.WorkerServicer):
def __init__(self):
rpc_pb2_grpc.WorkerServicer.__init__(self)
self.imgmgr = imagemgr.ImageMgr()
self.fspath = env.getenv('FS_PREFIX')
self.confpath = env.getenv('DOCKLET_CONF')
#self.masterip = '162.105.88.190'
#self.masterport = 9002
#self.masterrpc = xmlrpc.client.ServerProxy("http://%s:%s" % (self.masterip,self.masterport))
logger.info('TaskController init success')
def process_task(self, request, context):
logger.info('excute task with parameter: ' + str(request))
taskid = request.id
instanceid = request.instanceid
command = request.parameters.command.commandLine #'/root/getenv.sh' #parameter['Parameters']['Command']['CommandLine']
#envs = {'MYENV1':'MYVAL1', 'MYENV2':'MYVAL2'} #parameters['Parameters']['Command']['EnvVars']
envs = request.parameters.command.envVars
image = {}
image['name'] = request.cluster.image.name
image['type'] = 'private' if request.cluster.image.type == rpc_pb2.Image.PRIVATE else 'public'
image['owner'] = request.cluster.image.owner
username = request.username
lxcname = '%s-batch-%s-%s' % (username,taskid,str(instanceid))
instance_type = request.cluster.instance
status = self.imgmgr.prepareFS(username,image,lxcname,str(instance_type.disk))
if not status:
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED, message="Create container for batch failed when preparing filesystem")
rootfs = "/var/lib/lxc/%s/rootfs" % lxcname
if not os.path.isdir("%s/global/users/%s" % (self.fspath,username)):
path = env.getenv('DOCKLET_LIB')
subprocess.call([path+"/userinit.sh", username])
logger.info("user %s directory not found, create it" % username)
sys_run("mkdir -p /var/lib/lxc/%s" % lxcname)
logger.info("generate config file for %s" % lxcname)
def config_prepare(content):
content = content.replace("%ROOTFS%",rootfs)
content = content.replace("%HOSTNAME%","batch-%s" % instanceid)
content = content.replace("%CONTAINER_MEMORY%",str(instance_type.memory))
content = content.replace("%CONTAINER_CPU%",str(instance_type.cpu*100000))
content = content.replace("%FS_PREFIX%",self.fspath)
content = content.replace("%LXCSCRIPT%",env.getenv("LXC_SCRIPT"))
content = content.replace("%USERNAME%",username)
content = content.replace("%LXCNAME%",lxcname)
return content
logger.info(self.confpath)
conffile = open(self.confpath+"/container.batch.conf", 'r')
conftext = conffile.read()
conffile.close()
conftext = config_prepare(conftext)
conffile = open("/var/lib/lxc/%s/config" % lxcname, 'w')
conffile.write(conftext)
conffile.close()
container = lxc.Container(lxcname)
if not container.start():
logger.error('start container %s failed' % lxcname)
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
#return json.dumps({'success':'false','message': "start container failed"})
else:
logger.info('start container %s success' % lxcname)
#mount oss here
#thread = threading.Thread(target = self.excute_task, args=(jobid,taskid,envs,lxcname,command))
#thread.setDaemon(True)
#thread.start()
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
#return json.dumps({'success':'true','message':'task is running'})
def excute_task(self,jobid,taskid,envs,lxcname,command):
cmd = "lxc-attach -n " + lxcname
for envkey,envval in envs.items():
cmd = cmd + " -v %s=%s" % (envkey,envval)
cmd = cmd + " " + command
logger.info('run task with command - %s' % cmd)
Ret = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True)
if Ret == 0:
#call master rpc function to tell the taskmgr
self.masterrpc.complete_task(jobid,taskid)
else:
self.masterrpc.fail_task(jobid,taskid)
#call master rpc function to tell the wrong
#umount oss here
container = lxc.Container(lxcname)
if container.stop():
logger.info("stop container %s success" % lxcname)
else:
logger.error("stop container %s failed" % lxcname)
logger.info("deleting container:%s" % lxcname)
if self.imgmgr.deleteFS(lxcname):
logger.info("delete container %s success" % lxcname)
else:
logger.error("delete container %s failed" % lxcname)
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
def TaskControllerServe():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
rpc_pb2_grpc.add_WorkerServicer_to_server(TaskController(), server)
server.add_insecure_port('[::]:50051')
server.start()
logger.info("Start TaskController Servicer")
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
if __name__ == "__main__":
TaskControllerServe()

View File

@ -19,7 +19,6 @@ from socketserver import ThreadingMixIn
import threading
from utils import etcdlib, proxytool
from worker import container, monitor
from worker.taskmgr import TaskMgr
from utils.nettools import netcontrol,ovscontrol,portcontrol
from utils.lvmtool import new_group, recover_group
from master import network
@ -145,7 +144,6 @@ class Worker(object):
self.rpcserver.register_function(proxytool.delete_route)
self.rpcserver.register_function(portcontrol.acquire_port_mapping)
self.rpcserver.register_function(portcontrol.release_port_mapping)
self.rpcserver.register_function(TaskMgr.execute_task)
# register functions or instances to server for rpc
#self.rpcserver.register_function(function_name)