Merge pull request #303 from GallenShao/batch

simple taskmgr
This commit is contained in:
GallenShao 2018-07-13 10:47:09 +08:00 committed by GitHub
commit c41bdfbbc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 323 additions and 41 deletions

89
src/master/taskmgr.py Normal file
View File

@ -0,0 +1,89 @@
import threading
import time
from concurrent import futures
import grpc
from protos.taskmgr_pb2 import Task, Reply
from protos.taskmgr_pb2_grpc import TaskReporterServicer, add_TaskReporterServicer_to_server
class TaskReport(TaskReporterServicer):
def __init__(self, taskmgr):
self.taskmgr = taskmgr
def report(self, request, context):
self.taskmgr.on_task_report(request)
return Reply(message='received')
class TaskMgr(threading.Thread):
# load task information from etcd
# initial a task queue and task schedueler
# taskmgr: a taskmgr instance
def __init__(self):
threading.Thread.__init__(self)
self.thread_stop = False
self.taskQueue = []
def run(self):
self.serve()
while not self.thread_stop:
task = self.task_scheduler()
if task is not None:
self.task_processor(task)
time.sleep(1)
def serve(self):
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
add_TaskReporterServicer_to_server(TaskReport(self), self.server)
self.server.add_insecure_port('[::]:50051')
self.server.start()
def stop(self):
self.thread_stop = True
self.server.stop(0)
# this method is called when worker send heart-beat rpc request
def on_task_report(self, task):
self.taskQueue.append('task')
print('rec')
time.sleep(2)
print(self.taskQueue)
# this is a thread to process task(or a instance)
def task_processor(self,task):
# call the rpc to call a function in worker
# create container -> execute task
# (one instance or multiple instances)
# retry when failed
print('processing %s' % task)
# this is a thread to schdule the tasks
def task_scheduler(self):
try:
task = self.taskQueue.pop(0)
except:
task = None
return task
# user: username
# task: a json string
# save the task information into database
# called when jobmgr assign task to taskmgr
def add_task(self,user,task):
pass
# user: username
# jobid: the id of job
# taskid: the id of task
# get the information of a task, including the status, task description and other information
def get_task(self, user, jobid, taskid):
pass

20
src/protos/taskmgr.proto Normal file
View File

@ -0,0 +1,20 @@
syntax = "proto3";
service TaskReporter {
rpc report (Task) returns (Reply) {};
}
message Task {
int32 id = 1;
TaskStatus taskStatus = 2 [default = RUNNING];
enum TaskStatus {
RUNNING = 0;
FAILED = 1;
TIMEOUT = 2;
}
}
message Reply {
string message = 1;
}

168
src/protos/taskmgr_pb2.py Normal file
View File

@ -0,0 +1,168 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: protos/taskmgr.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='protos/taskmgr.proto',
package='',
syntax='proto3',
serialized_pb=_b('\n\x14protos/taskmgr.proto\"l\n\x04Task\x12\n\n\x02id\x18\x01 \x01(\x05\x12$\n\ntaskStatus\x18\x02 \x01(\x0e\x32\x10.Task.TaskStatus\"2\n\nTaskStatus\x12\x0b\n\x07RUNNING\x10\x00\x12\n\n\x06\x46\x41ILED\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\"\x18\n\x05Reply\x12\x0f\n\x07message\x18\x01 \x01(\t2)\n\x0cTaskReporter\x12\x19\n\x06report\x12\x05.Task\x1a\x06.Reply\"\x00\x62\x06proto3')
)
_TASK_TASKSTATUS = _descriptor.EnumDescriptor(
name='TaskStatus',
full_name='Task.TaskStatus',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='RUNNING', index=0, number=0,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='FAILED', index=1, number=1,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='TIMEOUT', index=2, number=2,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=82,
serialized_end=132,
)
_sym_db.RegisterEnumDescriptor(_TASK_TASKSTATUS)
_TASK = _descriptor.Descriptor(
name='Task',
full_name='Task',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='id', full_name='Task.id', index=0,
number=1, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='taskStatus', full_name='Task.taskStatus', index=1,
number=2, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
_TASK_TASKSTATUS,
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=24,
serialized_end=132,
)
_REPLY = _descriptor.Descriptor(
name='Reply',
full_name='Reply',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='message', full_name='Reply.message', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=134,
serialized_end=158,
)
_TASK.fields_by_name['taskStatus'].enum_type = _TASK_TASKSTATUS
_TASK_TASKSTATUS.containing_type = _TASK
DESCRIPTOR.message_types_by_name['Task'] = _TASK
DESCRIPTOR.message_types_by_name['Reply'] = _REPLY
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
Task = _reflection.GeneratedProtocolMessageType('Task', (_message.Message,), dict(
DESCRIPTOR = _TASK,
__module__ = 'protos.taskmgr_pb2'
# @@protoc_insertion_point(class_scope:Task)
))
_sym_db.RegisterMessage(Task)
Reply = _reflection.GeneratedProtocolMessageType('Reply', (_message.Message,), dict(
DESCRIPTOR = _REPLY,
__module__ = 'protos.taskmgr_pb2'
# @@protoc_insertion_point(class_scope:Reply)
))
_sym_db.RegisterMessage(Reply)
_TASKREPORTER = _descriptor.ServiceDescriptor(
name='TaskReporter',
full_name='TaskReporter',
file=DESCRIPTOR,
index=0,
options=None,
serialized_start=160,
serialized_end=201,
methods=[
_descriptor.MethodDescriptor(
name='report',
full_name='TaskReporter.report',
index=0,
containing_service=None,
input_type=_TASK,
output_type=_REPLY,
options=None,
),
])
_sym_db.RegisterServiceDescriptor(_TASKREPORTER)
DESCRIPTOR.services_by_name['TaskReporter'] = _TASKREPORTER
# @@protoc_insertion_point(module_scope)

View File

@ -0,0 +1,46 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
from protos import taskmgr_pb2 as protos_dot_taskmgr__pb2
class TaskReporterStub(object):
# missing associated documentation comment in .proto file
pass
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.report = channel.unary_unary(
'/TaskReporter/report',
request_serializer=protos_dot_taskmgr__pb2.Task.SerializeToString,
response_deserializer=protos_dot_taskmgr__pb2.Reply.FromString,
)
class TaskReporterServicer(object):
# missing associated documentation comment in .proto file
pass
def report(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_TaskReporterServicer_to_server(servicer, server):
rpc_method_handlers = {
'report': grpc.unary_unary_rpc_method_handler(
servicer.report,
request_deserializer=protos_dot_taskmgr__pb2.Task.FromString,
response_serializer=protos_dot_taskmgr__pb2.Reply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'TaskReporter', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))

View File

@ -1,41 +0,0 @@
class TaskMgr(object):
# task: a json string
# this is a thread to process task(or a instance)
def task_processor(self,task):
# call the rpc to call a function in worker
# create container -> execute task
# (one instance or multiple instances)
# retry when failed
pass
# this is a thread to schdule the tasks
def task_scheduler(self):
# choose a task from queue, create a task processor for it
pass
# user: username
# task: a json string
# save the task information into database
def add_task(self,user,task):
pass
# user: username
# jobid: the id of job
# taskid: the id of task
# get the information of a task, including the status, task description and other information
def get_task(self, user, jobid, taskid):
pass
# task: a json string
# this is a rpc function for worker, task processor call this function to execute a task in a worker
@staticmethod
def execute_task(self,task):
return
# load task information from etcd
# initial a task queue and task schedueler
# taskmgr: a taskmgr instance
def __init__(self):
pass