Merge pull request #348 from FirmlyReality/batch
Can stop tasks on taskcontroller
This commit is contained in:
commit
a4ff878c3e
|
@ -2,7 +2,7 @@ import sys
|
||||||
if sys.path[0].endswith("master"):
|
if sys.path[0].endswith("master"):
|
||||||
sys.path[0] = sys.path[0][:-6]
|
sys.path[0] = sys.path[0][:-6]
|
||||||
|
|
||||||
import grpc
|
import grpc,time
|
||||||
|
|
||||||
from protos import rpc_pb2, rpc_pb2_grpc
|
from protos import rpc_pb2, rpc_pb2_grpc
|
||||||
|
|
||||||
|
@ -10,7 +10,7 @@ def run():
|
||||||
channel = grpc.insecure_channel('localhost:50051')
|
channel = grpc.insecure_channel('localhost:50051')
|
||||||
stub = rpc_pb2_grpc.WorkerStub(channel)
|
stub = rpc_pb2_grpc.WorkerStub(channel)
|
||||||
|
|
||||||
comm = rpc_pb2.Command(commandLine="echo \"stestsfdsf\\ntewtgsdgfdsgret\newarsafsda\" > /root/test.txt;ls /root", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}'
|
comm = rpc_pb2.Command(commandLine="echo \"stestsfdsf\\ntewtgsdgfdsgret\newarsafsda\" > /root/test.txt;ls /root;sleep 100", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}'
|
||||||
paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/nfs/", stdoutRedirectPath="")
|
paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/nfs/", stdoutRedirectPath="")
|
||||||
|
|
||||||
img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet")
|
img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet")
|
||||||
|
@ -18,11 +18,22 @@ def run():
|
||||||
mnt = rpc_pb2.Mount(localPath="",provider='aliyun',remotePath="test-for-docklet",other="oss-cn-beijing.aliyuncs.com",accessKey="LTAIdl7gmmIhfqA9",secretKey="")
|
mnt = rpc_pb2.Mount(localPath="",provider='aliyun',remotePath="test-for-docklet",other="oss-cn-beijing.aliyuncs.com",accessKey="LTAIdl7gmmIhfqA9",secretKey="")
|
||||||
clu = rpc_pb2.Cluster(image=img, instance=inst, mount=[])
|
clu = rpc_pb2.Cluster(image=img, instance=inst, mount=[])
|
||||||
|
|
||||||
task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=1,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=5,token="test")
|
task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=1,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=600,token="test")
|
||||||
|
|
||||||
response = stub.process_task(task)
|
response = stub.process_task(task)
|
||||||
print("Batch client received: " + str(response.status)+" "+response.message)
|
print("Batch client received: " + str(response.status)+" "+response.message)
|
||||||
|
|
||||||
|
def stop_task():
|
||||||
|
channel = grpc.insecure_channel('localhost:50051')
|
||||||
|
stub = rpc_pb2_grpc.WorkerStub(channel)
|
||||||
|
|
||||||
|
taskmsg = rpc_pb2.TaskMsg(taskid="test",username="root",instanceid=1,instanceStatus=rpc_pb2.COMPLETED,token="test",errmsg="")
|
||||||
|
reportmsg = rpc_pb2.ReportMsg(taskmsgs = [taskmsg])
|
||||||
|
|
||||||
|
response = stub.stop_tasks(reportmsg)
|
||||||
|
print("Batch client received: " + str(response.status)+" "+response.message)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
run()
|
run()
|
||||||
|
time.sleep(2)
|
||||||
|
stop_task()
|
||||||
|
|
|
@ -1,11 +1,12 @@
|
||||||
syntax = "proto3";
|
syntax = "proto3";
|
||||||
|
|
||||||
service Master {
|
service Master {
|
||||||
rpc report (ReportMsg) returns (Reply) {};
|
rpc report (ReportMsg) returns (Reply) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
service Worker {
|
service Worker {
|
||||||
rpc process_task (TaskInfo) returns (Reply) {}
|
rpc process_task (TaskInfo) returns (Reply) {}
|
||||||
|
rpc stop_tasks (ReportMsg) returns (Reply) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
message Reply {
|
message Reply {
|
||||||
|
@ -24,10 +25,11 @@ message ReportMsg {
|
||||||
|
|
||||||
message TaskMsg {
|
message TaskMsg {
|
||||||
string taskid = 1;
|
string taskid = 1;
|
||||||
int32 instanceid = 2;
|
string username = 2;
|
||||||
Status instanceStatus = 3; // 任务状态
|
int32 instanceid = 3;
|
||||||
string token = 4;
|
Status instanceStatus = 4; // 任务状态
|
||||||
string errmsg = 5;
|
string token = 5;
|
||||||
|
string errmsg = 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
enum Status {
|
enum Status {
|
||||||
|
|
|
@ -20,7 +20,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
|
||||||
name='rpc.proto',
|
name='rpc.proto',
|
||||||
package='',
|
package='',
|
||||||
syntax='proto3',
|
syntax='proto3',
|
||||||
serialized_pb=_b('\n\trpc.proto\"f\n\x05Reply\x12\"\n\x06status\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"\'\n\tReportMsg\x12\x1a\n\x08taskmsgs\x18\x01 \x03(\x0b\x32\x08.TaskMsg\"m\n\x07TaskMsg\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x12\n\ninstanceid\x18\x02 \x01(\x05\x12\x1f\n\x0einstanceStatus\x18\x03 \x01(\x0e\x32\x07.Status\x12\r\n\x05token\x18\x04 \x01(\t\x12\x0e\n\x06\x65rrmsg\x18\x05 \x01(\t\"\xc6\x01\n\x08TaskInfo\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x12\n\ninstanceid\x18\x03 \x01(\x05\x12\x15\n\rinstanceCount\x18\x04 \x01(\x05\x12\x15\n\rmaxRetryCount\x18\x05 \x01(\x05\x12\x1f\n\nparameters\x18\x06 \x01(\x0b\x32\x0b.Parameters\x12\x19\n\x07\x63luster\x18\x07 \x01(\x0b\x32\x08.Cluster\x12\x0f\n\x07timeout\x18\x08 \x01(\x05\x12\r\n\x05token\x18\t \x01(\t\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"T\n\x07\x43luster\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\"t\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\".\n\tImageType\x12\x08\n\x04\x42\x41SE\x10\x00\x12\n\n\x06PUBLIC\x10\x01\x12\x0b\n\x07PRIVATE\x10\x02\"u\n\x05Mount\x12\x10\n\x08provider\x18\x01 \x01(\t\x12\x11\n\tlocalPath\x18\x02 \x01(\t\x12\x12\n\nremotePath\x18\x03 \x01(\t\x12\x11\n\taccessKey\x18\x04 \x01(\t\x12\x11\n\tsecretKey\x18\x05 \x01(\t\x12\r\n\x05other\x18\x06 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05*[\n\x06Status\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x12\x0f\n\x0bOUTPUTERROR\x10\x05\x32(\n\x06Master\x12\x1e\n\x06report\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x32-\n\x06Worker\x12#\n\x0cprocess_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x62\x06proto3')
|
serialized_pb=_b('\n\trpc.proto\"f\n\x05Reply\x12\"\n\x06status\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"\'\n\tReportMsg\x12\x1a\n\x08taskmsgs\x18\x01 \x03(\x0b\x32\x08.TaskMsg\"\x7f\n\x07TaskMsg\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x12\n\ninstanceid\x18\x03 \x01(\x05\x12\x1f\n\x0einstanceStatus\x18\x04 \x01(\x0e\x32\x07.Status\x12\r\n\x05token\x18\x05 \x01(\t\x12\x0e\n\x06\x65rrmsg\x18\x06 \x01(\t\"\xc6\x01\n\x08TaskInfo\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x12\n\ninstanceid\x18\x03 \x01(\x05\x12\x15\n\rinstanceCount\x18\x04 \x01(\x05\x12\x15\n\rmaxRetryCount\x18\x05 \x01(\x05\x12\x1f\n\nparameters\x18\x06 \x01(\x0b\x32\x0b.Parameters\x12\x19\n\x07\x63luster\x18\x07 \x01(\x0b\x32\x08.Cluster\x12\x0f\n\x07timeout\x18\x08 \x01(\x05\x12\r\n\x05token\x18\t \x01(\t\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"T\n\x07\x43luster\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\"t\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\".\n\tImageType\x12\x08\n\x04\x42\x41SE\x10\x00\x12\n\n\x06PUBLIC\x10\x01\x12\x0b\n\x07PRIVATE\x10\x02\"u\n\x05Mount\x12\x10\n\x08provider\x18\x01 \x01(\t\x12\x11\n\tlocalPath\x18\x02 \x01(\t\x12\x12\n\nremotePath\x18\x03 \x01(\t\x12\x11\n\taccessKey\x18\x04 \x01(\t\x12\x11\n\tsecretKey\x18\x05 \x01(\t\x12\r\n\x05other\x18\x06 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05*[\n\x06Status\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x12\x0f\n\x0bOUTPUTERROR\x10\x05\x32(\n\x06Master\x12\x1e\n\x06report\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x32Q\n\x06Worker\x12#\n\x0cprocess_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x12\"\n\nstop_tasks\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x62\x06proto3')
|
||||||
)
|
)
|
||||||
|
|
||||||
_STATUS = _descriptor.EnumDescriptor(
|
_STATUS = _descriptor.EnumDescriptor(
|
||||||
|
@ -56,8 +56,8 @@ _STATUS = _descriptor.EnumDescriptor(
|
||||||
],
|
],
|
||||||
containing_type=None,
|
containing_type=None,
|
||||||
options=None,
|
options=None,
|
||||||
serialized_start=1100,
|
serialized_start=1118,
|
||||||
serialized_end=1191,
|
serialized_end=1209,
|
||||||
)
|
)
|
||||||
_sym_db.RegisterEnumDescriptor(_STATUS)
|
_sym_db.RegisterEnumDescriptor(_STATUS)
|
||||||
|
|
||||||
|
@ -113,8 +113,8 @@ _IMAGE_IMAGETYPE = _descriptor.EnumDescriptor(
|
||||||
],
|
],
|
||||||
containing_type=None,
|
containing_type=None,
|
||||||
options=None,
|
options=None,
|
||||||
serialized_start=865,
|
serialized_start=883,
|
||||||
serialized_end=911,
|
serialized_end=929,
|
||||||
)
|
)
|
||||||
_sym_db.RegisterEnumDescriptor(_IMAGE_IMAGETYPE)
|
_sym_db.RegisterEnumDescriptor(_IMAGE_IMAGETYPE)
|
||||||
|
|
||||||
|
@ -204,33 +204,40 @@ _TASKMSG = _descriptor.Descriptor(
|
||||||
is_extension=False, extension_scope=None,
|
is_extension=False, extension_scope=None,
|
||||||
options=None, file=DESCRIPTOR),
|
options=None, file=DESCRIPTOR),
|
||||||
_descriptor.FieldDescriptor(
|
_descriptor.FieldDescriptor(
|
||||||
name='instanceid', full_name='TaskMsg.instanceid', index=1,
|
name='username', full_name='TaskMsg.username', index=1,
|
||||||
number=2, type=5, cpp_type=1, label=1,
|
number=2, type=9, cpp_type=9, label=1,
|
||||||
has_default_value=False, default_value=0,
|
|
||||||
message_type=None, enum_type=None, containing_type=None,
|
|
||||||
is_extension=False, extension_scope=None,
|
|
||||||
options=None, file=DESCRIPTOR),
|
|
||||||
_descriptor.FieldDescriptor(
|
|
||||||
name='instanceStatus', full_name='TaskMsg.instanceStatus', index=2,
|
|
||||||
number=3, type=14, cpp_type=8, label=1,
|
|
||||||
has_default_value=False, default_value=0,
|
|
||||||
message_type=None, enum_type=None, containing_type=None,
|
|
||||||
is_extension=False, extension_scope=None,
|
|
||||||
options=None, file=DESCRIPTOR),
|
|
||||||
_descriptor.FieldDescriptor(
|
|
||||||
name='token', full_name='TaskMsg.token', index=3,
|
|
||||||
number=4, type=9, cpp_type=9, label=1,
|
|
||||||
has_default_value=False, default_value=_b("").decode('utf-8'),
|
has_default_value=False, default_value=_b("").decode('utf-8'),
|
||||||
message_type=None, enum_type=None, containing_type=None,
|
message_type=None, enum_type=None, containing_type=None,
|
||||||
is_extension=False, extension_scope=None,
|
is_extension=False, extension_scope=None,
|
||||||
options=None, file=DESCRIPTOR),
|
options=None, file=DESCRIPTOR),
|
||||||
_descriptor.FieldDescriptor(
|
_descriptor.FieldDescriptor(
|
||||||
name='errmsg', full_name='TaskMsg.errmsg', index=4,
|
name='instanceid', full_name='TaskMsg.instanceid', index=2,
|
||||||
|
number=3, type=5, cpp_type=1, label=1,
|
||||||
|
has_default_value=False, default_value=0,
|
||||||
|
message_type=None, enum_type=None, containing_type=None,
|
||||||
|
is_extension=False, extension_scope=None,
|
||||||
|
options=None, file=DESCRIPTOR),
|
||||||
|
_descriptor.FieldDescriptor(
|
||||||
|
name='instanceStatus', full_name='TaskMsg.instanceStatus', index=3,
|
||||||
|
number=4, type=14, cpp_type=8, label=1,
|
||||||
|
has_default_value=False, default_value=0,
|
||||||
|
message_type=None, enum_type=None, containing_type=None,
|
||||||
|
is_extension=False, extension_scope=None,
|
||||||
|
options=None, file=DESCRIPTOR),
|
||||||
|
_descriptor.FieldDescriptor(
|
||||||
|
name='token', full_name='TaskMsg.token', index=4,
|
||||||
number=5, type=9, cpp_type=9, label=1,
|
number=5, type=9, cpp_type=9, label=1,
|
||||||
has_default_value=False, default_value=_b("").decode('utf-8'),
|
has_default_value=False, default_value=_b("").decode('utf-8'),
|
||||||
message_type=None, enum_type=None, containing_type=None,
|
message_type=None, enum_type=None, containing_type=None,
|
||||||
is_extension=False, extension_scope=None,
|
is_extension=False, extension_scope=None,
|
||||||
options=None, file=DESCRIPTOR),
|
options=None, file=DESCRIPTOR),
|
||||||
|
_descriptor.FieldDescriptor(
|
||||||
|
name='errmsg', full_name='TaskMsg.errmsg', index=5,
|
||||||
|
number=6, type=9, cpp_type=9, label=1,
|
||||||
|
has_default_value=False, default_value=_b("").decode('utf-8'),
|
||||||
|
message_type=None, enum_type=None, containing_type=None,
|
||||||
|
is_extension=False, extension_scope=None,
|
||||||
|
options=None, file=DESCRIPTOR),
|
||||||
],
|
],
|
||||||
extensions=[
|
extensions=[
|
||||||
],
|
],
|
||||||
|
@ -244,7 +251,7 @@ _TASKMSG = _descriptor.Descriptor(
|
||||||
oneofs=[
|
oneofs=[
|
||||||
],
|
],
|
||||||
serialized_start=158,
|
serialized_start=158,
|
||||||
serialized_end=267,
|
serialized_end=285,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -330,8 +337,8 @@ _TASKINFO = _descriptor.Descriptor(
|
||||||
extension_ranges=[],
|
extension_ranges=[],
|
||||||
oneofs=[
|
oneofs=[
|
||||||
],
|
],
|
||||||
serialized_start=270,
|
serialized_start=288,
|
||||||
serialized_end=468,
|
serialized_end=486,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -375,8 +382,8 @@ _PARAMETERS = _descriptor.Descriptor(
|
||||||
extension_ranges=[],
|
extension_ranges=[],
|
||||||
oneofs=[
|
oneofs=[
|
||||||
],
|
],
|
||||||
serialized_start=470,
|
serialized_start=488,
|
||||||
serialized_end=565,
|
serialized_end=583,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -413,8 +420,8 @@ _COMMAND_ENVVARSENTRY = _descriptor.Descriptor(
|
||||||
extension_ranges=[],
|
extension_ranges=[],
|
||||||
oneofs=[
|
oneofs=[
|
||||||
],
|
],
|
||||||
serialized_start=661,
|
serialized_start=679,
|
||||||
serialized_end=707,
|
serialized_end=725,
|
||||||
)
|
)
|
||||||
|
|
||||||
_COMMAND = _descriptor.Descriptor(
|
_COMMAND = _descriptor.Descriptor(
|
||||||
|
@ -457,8 +464,8 @@ _COMMAND = _descriptor.Descriptor(
|
||||||
extension_ranges=[],
|
extension_ranges=[],
|
||||||
oneofs=[
|
oneofs=[
|
||||||
],
|
],
|
||||||
serialized_start=568,
|
serialized_start=586,
|
||||||
serialized_end=707,
|
serialized_end=725,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -502,8 +509,8 @@ _CLUSTER = _descriptor.Descriptor(
|
||||||
extension_ranges=[],
|
extension_ranges=[],
|
||||||
oneofs=[
|
oneofs=[
|
||||||
],
|
],
|
||||||
serialized_start=709,
|
serialized_start=727,
|
||||||
serialized_end=793,
|
serialized_end=811,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -548,8 +555,8 @@ _IMAGE = _descriptor.Descriptor(
|
||||||
extension_ranges=[],
|
extension_ranges=[],
|
||||||
oneofs=[
|
oneofs=[
|
||||||
],
|
],
|
||||||
serialized_start=795,
|
serialized_start=813,
|
||||||
serialized_end=911,
|
serialized_end=929,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -614,8 +621,8 @@ _MOUNT = _descriptor.Descriptor(
|
||||||
extension_ranges=[],
|
extension_ranges=[],
|
||||||
oneofs=[
|
oneofs=[
|
||||||
],
|
],
|
||||||
serialized_start=913,
|
serialized_start=931,
|
||||||
serialized_end=1030,
|
serialized_end=1048,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -666,8 +673,8 @@ _INSTANCE = _descriptor.Descriptor(
|
||||||
extension_ranges=[],
|
extension_ranges=[],
|
||||||
oneofs=[
|
oneofs=[
|
||||||
],
|
],
|
||||||
serialized_start=1032,
|
serialized_start=1050,
|
||||||
serialized_end=1098,
|
serialized_end=1116,
|
||||||
)
|
)
|
||||||
|
|
||||||
_REPLY.fields_by_name['status'].enum_type = _REPLY_REPLYSTATUS
|
_REPLY.fields_by_name['status'].enum_type = _REPLY_REPLYSTATUS
|
||||||
|
@ -785,8 +792,8 @@ _MASTER = _descriptor.ServiceDescriptor(
|
||||||
file=DESCRIPTOR,
|
file=DESCRIPTOR,
|
||||||
index=0,
|
index=0,
|
||||||
options=None,
|
options=None,
|
||||||
serialized_start=1193,
|
serialized_start=1211,
|
||||||
serialized_end=1233,
|
serialized_end=1251,
|
||||||
methods=[
|
methods=[
|
||||||
_descriptor.MethodDescriptor(
|
_descriptor.MethodDescriptor(
|
||||||
name='report',
|
name='report',
|
||||||
|
@ -809,8 +816,8 @@ _WORKER = _descriptor.ServiceDescriptor(
|
||||||
file=DESCRIPTOR,
|
file=DESCRIPTOR,
|
||||||
index=1,
|
index=1,
|
||||||
options=None,
|
options=None,
|
||||||
serialized_start=1235,
|
serialized_start=1253,
|
||||||
serialized_end=1280,
|
serialized_end=1334,
|
||||||
methods=[
|
methods=[
|
||||||
_descriptor.MethodDescriptor(
|
_descriptor.MethodDescriptor(
|
||||||
name='process_task',
|
name='process_task',
|
||||||
|
@ -821,6 +828,15 @@ _WORKER = _descriptor.ServiceDescriptor(
|
||||||
output_type=_REPLY,
|
output_type=_REPLY,
|
||||||
options=None,
|
options=None,
|
||||||
),
|
),
|
||||||
|
_descriptor.MethodDescriptor(
|
||||||
|
name='stop_tasks',
|
||||||
|
full_name='Worker.stop_tasks',
|
||||||
|
index=1,
|
||||||
|
containing_service=None,
|
||||||
|
input_type=_REPORTMSG,
|
||||||
|
output_type=_REPLY,
|
||||||
|
options=None,
|
||||||
|
),
|
||||||
])
|
])
|
||||||
_sym_db.RegisterServiceDescriptor(_WORKER)
|
_sym_db.RegisterServiceDescriptor(_WORKER)
|
||||||
|
|
||||||
|
|
|
@ -61,6 +61,11 @@ class WorkerStub(object):
|
||||||
request_serializer=rpc__pb2.TaskInfo.SerializeToString,
|
request_serializer=rpc__pb2.TaskInfo.SerializeToString,
|
||||||
response_deserializer=rpc__pb2.Reply.FromString,
|
response_deserializer=rpc__pb2.Reply.FromString,
|
||||||
)
|
)
|
||||||
|
self.stop_tasks = channel.unary_unary(
|
||||||
|
'/Worker/stop_tasks',
|
||||||
|
request_serializer=rpc__pb2.ReportMsg.SerializeToString,
|
||||||
|
response_deserializer=rpc__pb2.Reply.FromString,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class WorkerServicer(object):
|
class WorkerServicer(object):
|
||||||
|
@ -74,6 +79,13 @@ class WorkerServicer(object):
|
||||||
context.set_details('Method not implemented!')
|
context.set_details('Method not implemented!')
|
||||||
raise NotImplementedError('Method not implemented!')
|
raise NotImplementedError('Method not implemented!')
|
||||||
|
|
||||||
|
def stop_tasks(self, request, context):
|
||||||
|
# missing associated documentation comment in .proto file
|
||||||
|
pass
|
||||||
|
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
||||||
|
context.set_details('Method not implemented!')
|
||||||
|
raise NotImplementedError('Method not implemented!')
|
||||||
|
|
||||||
|
|
||||||
def add_WorkerServicer_to_server(servicer, server):
|
def add_WorkerServicer_to_server(servicer, server):
|
||||||
rpc_method_handlers = {
|
rpc_method_handlers = {
|
||||||
|
@ -82,6 +94,11 @@ def add_WorkerServicer_to_server(servicer, server):
|
||||||
request_deserializer=rpc__pb2.TaskInfo.FromString,
|
request_deserializer=rpc__pb2.TaskInfo.FromString,
|
||||||
response_serializer=rpc__pb2.Reply.SerializeToString,
|
response_serializer=rpc__pb2.Reply.SerializeToString,
|
||||||
),
|
),
|
||||||
|
'stop_tasks': grpc.unary_unary_rpc_method_handler(
|
||||||
|
servicer.stop_tasks,
|
||||||
|
request_deserializer=rpc__pb2.ReportMsg.FromString,
|
||||||
|
response_serializer=rpc__pb2.Reply.SerializeToString,
|
||||||
|
),
|
||||||
}
|
}
|
||||||
generic_handler = grpc.method_handlers_generic_handler(
|
generic_handler = grpc.method_handlers_generic_handler(
|
||||||
'Worker', rpc_method_handlers)
|
'Worker', rpc_method_handlers)
|
||||||
|
|
|
@ -295,7 +295,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
if p.poll() is None:
|
if p.poll() is None:
|
||||||
p.kill()
|
p.kill()
|
||||||
logger.info("Running time(%d) is out. Task(%s-%s-%s) will be killed." % (timeout,str(taskid),str(instanceid),token))
|
logger.info("Running time(%d) is out. Task(%s-%s-%s) will be killed." % (timeout,str(taskid),str(instanceid),token))
|
||||||
self.add_msg(taskid,instanceid,rpc_pb2.TIMEOUT,token,"Running time is out.")
|
self.add_msg(taskid,username,instanceid,rpc_pb2.TIMEOUT,token,"Running time is out.")
|
||||||
else:
|
else:
|
||||||
out,err = p.communicate()
|
out,err = p.communicate()
|
||||||
logger.info(out)
|
logger.info(out)
|
||||||
|
@ -314,14 +314,14 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
else:
|
else:
|
||||||
msg = msg2
|
msg = msg2
|
||||||
logger.info("Output error on Task(%s-%s-%s)." % (str(taskid),str(instanceid),token))
|
logger.info("Output error on Task(%s-%s-%s)." % (str(taskid),str(instanceid),token))
|
||||||
self.add_msg(taskid,instanceid,rpc_pb2.OUTPUTERROR,token,msg)
|
self.add_msg(taskid,username,instanceid,rpc_pb2.OUTPUTERROR,token,msg)
|
||||||
else:
|
else:
|
||||||
if p.poll() == 0:
|
if p.poll() == 0:
|
||||||
logger.info("Task(%s-%s-%s) completed." % (str(taskid),str(instanceid),token))
|
logger.info("Task(%s-%s-%s) completed." % (str(taskid),str(instanceid),token))
|
||||||
self.add_msg(taskid,instanceid,rpc_pb2.COMPLETED,token,"")
|
self.add_msg(taskid,username,instanceid,rpc_pb2.COMPLETED,token,"")
|
||||||
else:
|
else:
|
||||||
logger.info("Task(%s-%s-%s) failed." % (str(taskid),str(instanceid),token))
|
logger.info("Task(%s-%s-%s) failed." % (str(taskid),str(instanceid),token))
|
||||||
self.add_msg(taskid,instanceid,rpc_pb2.FAILED,token,"")
|
self.add_msg(taskid,username,instanceid,rpc_pb2.FAILED,token,"")
|
||||||
|
|
||||||
container = lxc.Container(lxcname)
|
container = lxc.Container(lxcname)
|
||||||
if container.stop():
|
if container.stop():
|
||||||
|
@ -341,10 +341,16 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
#umount oss
|
#umount oss
|
||||||
self.umount_oss("%s/global/users/%s/oss" % (self.fspath,username), mount_info)
|
self.umount_oss("%s/global/users/%s/oss" % (self.fspath,username), mount_info)
|
||||||
|
|
||||||
def add_msg(self,taskid,instanceid,status,token,errmsg):
|
def stop_tasks(self, request, context):
|
||||||
|
for msg in request.taskmsgs:
|
||||||
|
lxcname = '%s-batch-%s-%s-%s' % (msg.username,msg.taskid,str(msg.instanceid),msg.token)
|
||||||
|
subprocess.run("lxc-stop -k -n %s" % lxcname, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
|
||||||
|
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
|
||||||
|
|
||||||
|
def add_msg(self,taskid,username,instanceid,status,token,errmsg):
|
||||||
self.msgslock.acquire()
|
self.msgslock.acquire()
|
||||||
try:
|
try:
|
||||||
self.taskmsgs.append(rpc_pb2.TaskMsg(taskid=str(taskid),instanceid=int(instanceid),instanceStatus=status,token=token,errmsg=errmsg))
|
self.taskmsgs.append(rpc_pb2.TaskMsg(taskid=str(taskid),username=username,instanceid=int(instanceid),instanceStatus=status,token=token,errmsg=errmsg))
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.error(traceback.format_exc())
|
logger.error(traceback.format_exc())
|
||||||
self.msgslock.release()
|
self.msgslock.release()
|
||||||
|
|
Loading…
Reference in New Issue