fix some bugs
This commit is contained in:
parent
ca50e328a0
commit
53a5705177
|
@ -293,7 +293,7 @@ class JobMgr():
|
||||||
def stop_job(self, user, job_id):
|
def stop_job(self, user, job_id):
|
||||||
logger.info("[jobmgr] stop job(id:%s) user(%s)"%(job_id, user))
|
logger.info("[jobmgr] stop job(id:%s) user(%s)"%(job_id, user))
|
||||||
if job_id not in self.job_map.keys():
|
if job_id not in self.job_map.keys():
|
||||||
return [False,"Job id %s does not exists! Maybe it has been finished."]
|
return [False,"Job id %s does not exists! Maybe it has been finished."%job_id]
|
||||||
try:
|
try:
|
||||||
job = self.job_map[job_id]
|
job = self.job_map[job_id]
|
||||||
if job.job_db.status == 'done' or job.job_db.status == 'failed':
|
if job.job_db.status == 'done' or job.job_db.status == 'failed':
|
||||||
|
@ -304,6 +304,7 @@ class JobMgr():
|
||||||
taskid = job_id + '_' + task_idx
|
taskid = job_id + '_' + task_idx
|
||||||
self.taskmgr.lazy_stop_task(taskid)
|
self.taskmgr.lazy_stop_task(taskid)
|
||||||
job.stop_job()
|
job.stop_job()
|
||||||
|
del self.job_map[job_id]
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.error(traceback.format_exc())
|
logger.error(traceback.format_exc())
|
||||||
#logger.error(err)
|
#logger.error(err)
|
||||||
|
|
|
@ -297,7 +297,7 @@ class TaskMgr(threading.Thread):
|
||||||
|
|
||||||
def stop_subtask(self, subtask):
|
def stop_subtask(self, subtask):
|
||||||
try:
|
try:
|
||||||
self.logger.info('[task_processor] Stoping task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid))
|
self.logger.info('[task_processor] Stopping task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid))
|
||||||
channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port))
|
channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port))
|
||||||
stub = WorkerStub(channel)
|
stub = WorkerStub(channel)
|
||||||
response = stub.stop_task(subtask.command_info)
|
response = stub.stop_task(subtask.command_info)
|
||||||
|
@ -423,7 +423,7 @@ class TaskMgr(threading.Thread):
|
||||||
self.stop_vnode(sub_task)
|
self.stop_vnode(sub_task)
|
||||||
#pass
|
#pass
|
||||||
|
|
||||||
@data_lock('task_stop_lock')
|
@data_lock('stop_lock')
|
||||||
def lazy_stop_task(self, taskid):
|
def lazy_stop_task(self, taskid):
|
||||||
self.lazy_stop_list.append(taskid)
|
self.lazy_stop_list.append(taskid)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue