commit
831b0ea94e
|
@ -62,7 +62,7 @@ class BatchJob(object):
|
||||||
task_name = self.user + '_' + self.job_id + '_' + task_idx
|
task_name = self.user + '_' + self.job_id + '_' + task_idx
|
||||||
return task_name, self.raw_job_info["tasks"][task_idx]
|
return task_name, self.raw_job_info["tasks"][task_idx]
|
||||||
return '', None
|
return '', None
|
||||||
|
|
||||||
# a task has finished
|
# a task has finished
|
||||||
def finish_task(self, task_idx):
|
def finish_task(self, task_idx):
|
||||||
pass
|
pass
|
||||||
|
@ -137,7 +137,7 @@ class JobMgr(threading.Thread):
|
||||||
if not task_info:
|
if not task_info:
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
task_priority = self.job_priority
|
task_priority = job.job_priority
|
||||||
self.taskmgr.add_task(job.user, task_name, task_info, task_priority)
|
self.taskmgr.add_task(job.user, task_name, task_info, task_priority)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -155,4 +155,3 @@ class JobMgr(threading.Thread):
|
||||||
# a task has finished
|
# a task has finished
|
||||||
def report(self, task):
|
def report(self, task):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
|
@ -374,8 +374,8 @@ class TaskMgr(threading.Thread):
|
||||||
commandLine = json_task['command'],
|
commandLine = json_task['command'],
|
||||||
packagePath = json_task['srcAddr'],
|
packagePath = json_task['srcAddr'],
|
||||||
envVars = {}),
|
envVars = {}),
|
||||||
stderrRedirectPath = json_task['stdErrRedPth'],
|
stderrRedirectPath = json_task.get('stdErrRedPth',""),
|
||||||
stdoutRedirectPath = json_task['stdOutRedPth']),
|
stdoutRedirectPath = json_task.get('stdOutRedPth',"")),
|
||||||
cluster = Cluster(
|
cluster = Cluster(
|
||||||
image = Image(
|
image = Image(
|
||||||
name = json_task['image'].split('_')[0], #json_task['cluster']['image']['name'],
|
name = json_task['image'].split('_')[0], #json_task['cluster']['image']['name'],
|
||||||
|
|
|
@ -10,15 +10,15 @@ def run():
|
||||||
channel = grpc.insecure_channel('localhost:50051')
|
channel = grpc.insecure_channel('localhost:50051')
|
||||||
stub = rpc_pb2_grpc.WorkerStub(channel)
|
stub = rpc_pb2_grpc.WorkerStub(channel)
|
||||||
|
|
||||||
comm = rpc_pb2.Command(commandLine="echo \"stestsfdsf\\ntewtgsdgfdsgret\newarsafsda\" > /root/test.txt;ls /root;sleep 100", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}'
|
comm = rpc_pb2.Command(commandLine="python3 CNN.py", packagePath="/root/nfs/17flowers", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}'
|
||||||
paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/nfs/", stdoutRedirectPath="")
|
paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/nfs/", stdoutRedirectPath="")
|
||||||
|
|
||||||
img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet")
|
img = rpc_pb2.Image(name="17flowers", type=rpc_pb2.Image.PRIVATE, owner="root")
|
||||||
inst = rpc_pb2.Instance(cpu=2, memory=2000, disk=500, gpu=0)
|
inst = rpc_pb2.Instance(cpu=4, memory=4000, disk=8000, gpu=2)
|
||||||
mnt = rpc_pb2.Mount(localPath="",provider='aliyun',remotePath="test-for-docklet",other="oss-cn-beijing.aliyuncs.com",accessKey="LTAIdl7gmmIhfqA9",secretKey="")
|
mnt = rpc_pb2.Mount(localPath="",provider='aliyun',remotePath="test-for-docklet",other="oss-cn-beijing.aliyuncs.com",accessKey="LTAIdl7gmmIhfqA9",secretKey="")
|
||||||
clu = rpc_pb2.Cluster(image=img, instance=inst, mount=[])
|
clu = rpc_pb2.Cluster(image=img, instance=inst, mount=[])
|
||||||
|
|
||||||
task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=1,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=600,token="test")
|
task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=1,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=60000,token="test")
|
||||||
|
|
||||||
response = stub.process_task(task)
|
response = stub.process_task(task)
|
||||||
print("Batch client received: " + str(response.status)+" "+response.message)
|
print("Batch client received: " + str(response.status)+" "+response.message)
|
||||||
|
@ -35,5 +35,5 @@ def stop_task():
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
run()
|
run()
|
||||||
time.sleep(2)
|
#time.sleep(4)
|
||||||
stop_task()
|
#stop_task()
|
||||||
|
|
|
@ -45,6 +45,8 @@ def nvidia_smi():
|
||||||
return ret.stdout.decode('utf-8').split('\n')
|
return ret.stdout.decode('utf-8').split('\n')
|
||||||
except subprocess.CalledProcessError:
|
except subprocess.CalledProcessError:
|
||||||
return None
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_gpu_driver_version():
|
def get_gpu_driver_version():
|
||||||
|
|
|
@ -544,7 +544,7 @@ class Collector(threading.Thread):
|
||||||
workerinfo['gpuinfo'] = self.collect_gpuinfo()
|
workerinfo['gpuinfo'] = self.collect_gpuinfo()
|
||||||
workerinfo['diskinfo'] = self.collect_diskinfo()
|
workerinfo['diskinfo'] = self.collect_diskinfo()
|
||||||
workerinfo['running'] = True
|
workerinfo['running'] = True
|
||||||
#time.sleep(self.interval)
|
time.sleep(self.interval)
|
||||||
if self.test:
|
if self.test:
|
||||||
break
|
break
|
||||||
# print(self.etcdser.getkey('/meminfo/total'))
|
# print(self.etcdser.getkey('/meminfo/total'))
|
||||||
|
|
|
@ -15,7 +15,7 @@ import grpc
|
||||||
#from utils.log import logger
|
#from utils.log import logger
|
||||||
#from utils import env
|
#from utils import env
|
||||||
import json,lxc,subprocess,threading,os,time,traceback
|
import json,lxc,subprocess,threading,os,time,traceback
|
||||||
from utils import imagemgr,etcdlib
|
from utils import imagemgr,etcdlib,gputools
|
||||||
from worker import ossmounter
|
from worker import ossmounter
|
||||||
from protos import rpc_pb2, rpc_pb2_grpc
|
from protos import rpc_pb2, rpc_pb2_grpc
|
||||||
|
|
||||||
|
@ -80,6 +80,12 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
self.free_ips.append(i)
|
self.free_ips.append(i)
|
||||||
logger.info("Free ip addresses pool %s" % str(self.free_ips))
|
logger.info("Free ip addresses pool %s" % str(self.free_ips))
|
||||||
|
|
||||||
|
self.gpu_lock = threading.Lock()
|
||||||
|
self.gpu_status = {}
|
||||||
|
gpus = gputools.get_gpu_status()
|
||||||
|
for gpu in gpus:
|
||||||
|
self.gpu_status[gpu['id']] = ""
|
||||||
|
|
||||||
self.start_report()
|
self.start_report()
|
||||||
logger.info('TaskController init success')
|
logger.info('TaskController init success')
|
||||||
|
|
||||||
|
@ -102,6 +108,42 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
logger.info(str(self.free_ips))
|
logger.info(str(self.free_ips))
|
||||||
self.lock.release()
|
self.lock.release()
|
||||||
|
|
||||||
|
def add_gpu_device(self, lxcname, gpu_need):
|
||||||
|
if gpu_need < 1:
|
||||||
|
return [True, ""]
|
||||||
|
self.gpu_lock.acquire()
|
||||||
|
use_gpus = []
|
||||||
|
for gpuid in self.gpu_status.keys():
|
||||||
|
if self.gpu_status[gpuid] == "":
|
||||||
|
use_gpus.append(gpuid)
|
||||||
|
if len(use_gpus) < gpu_need:
|
||||||
|
self.gpu_lock.release()
|
||||||
|
return [False, "No free GPUs"]
|
||||||
|
for gpuid in use_gpus:
|
||||||
|
self.gpu_status[gpuid] = lxcname
|
||||||
|
try:
|
||||||
|
gputools.add_device(lxcname, "/dev/nvidiactl")
|
||||||
|
gputools.add_device(lxcname, "/dev/nvidia-uvm")
|
||||||
|
for gpuid in use_gpus:
|
||||||
|
gputools.add_device(lxcname,"/dev/nvidia"+str(gpuid))
|
||||||
|
logger.info("Add gpu:"+str(gpuid) +" to lxc:"+str(lxcname))
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
for gpuid in use_gpus:
|
||||||
|
self.gpu_status[gpuid] = ""
|
||||||
|
self.gpu_lock.release()
|
||||||
|
return [False, "Error occurs when adding gpu device."]
|
||||||
|
|
||||||
|
self.gpu_lock.release()
|
||||||
|
return [True, ""]
|
||||||
|
|
||||||
|
def release_gpu_device(self, lxcname):
|
||||||
|
self.gpu_lock.acquire()
|
||||||
|
for gpuid in self.gpu_status.keys():
|
||||||
|
if self.gpu_status[gpuid] == lxcname:
|
||||||
|
self.gpu_status[gpuid] = ""
|
||||||
|
self.gpu_lock.release()
|
||||||
|
|
||||||
#mount_oss
|
#mount_oss
|
||||||
def mount_oss(self, datapath, mount_info):
|
def mount_oss(self, datapath, mount_info):
|
||||||
self.mount_lock.acquire()
|
self.mount_lock.acquire()
|
||||||
|
@ -170,6 +212,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
if outpath[i] == "":
|
if outpath[i] == "":
|
||||||
outpath[i] = "/root/nfs/"
|
outpath[i] = "/root/nfs/"
|
||||||
timeout = request.timeout
|
timeout = request.timeout
|
||||||
|
gpu_need = int(request.cluster.instance.gpu)
|
||||||
|
|
||||||
# acquire ip
|
# acquire ip
|
||||||
[status, ip] = self.acquire_ip()
|
[status, ip] = self.acquire_ip()
|
||||||
|
@ -225,9 +268,19 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
if not container.start():
|
if not container.start():
|
||||||
logger.error('start container %s failed' % lxcname)
|
logger.error('start container %s failed' % lxcname)
|
||||||
self.release_ip(ip)
|
self.release_ip(ip)
|
||||||
|
self.imgmgr.deleteFS(lxcname)
|
||||||
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Can't start the container")
|
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Can't start the container")
|
||||||
else:
|
|
||||||
logger.info('start container %s success' % lxcname)
|
logger.info('start container %s success' % lxcname)
|
||||||
|
|
||||||
|
#add GPU
|
||||||
|
[success, msg] = self.add_gpu_device(lxcname,gpu_need)
|
||||||
|
if not success:
|
||||||
|
logger.error("Fail to add gpu device. " + msg)
|
||||||
|
container.stop()
|
||||||
|
self.release_ip(ip)
|
||||||
|
self.imgmgr.deleteFS(lxcname)
|
||||||
|
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Fail to add gpu device. " + msg)
|
||||||
|
|
||||||
thread = threading.Thread(target = self.execute_task, args=(username,taskid,instanceid,envs,lxcname,pkgpath,command,timeout,outpath,ip,token,mount_list))
|
thread = threading.Thread(target = self.execute_task, args=(username,taskid,instanceid,envs,lxcname,pkgpath,command,timeout,outpath,ip,token,mount_list))
|
||||||
thread.setDaemon(True)
|
thread.setDaemon(True)
|
||||||
|
@ -252,6 +305,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
if ret.returncode != 0:
|
if ret.returncode != 0:
|
||||||
msg = "Fail to move output_tmp.txt to nfs/%s" % tmpfilename
|
msg = "Fail to move output_tmp.txt to nfs/%s" % tmpfilename
|
||||||
logger.error(msg)
|
logger.error(msg)
|
||||||
|
logger.error(ret.stdout)
|
||||||
return [False,msg]
|
return [False,msg]
|
||||||
logger.info("Succeed to moving output_tmp to nfs/%s" % tmpfilename)
|
logger.info("Succeed to moving output_tmp to nfs/%s" % tmpfilename)
|
||||||
|
|
||||||
|
@ -337,6 +391,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
|
|
||||||
logger.info("release ip address %s" % ip)
|
logger.info("release ip address %s" % ip)
|
||||||
self.release_ip(ip)
|
self.release_ip(ip)
|
||||||
|
self.release_gpu_device(lxcname)
|
||||||
|
|
||||||
#umount oss
|
#umount oss
|
||||||
self.umount_oss("%s/global/users/%s/oss" % (self.fspath,username), mount_info)
|
self.umount_oss("%s/global/users/%s/oss" % (self.fspath,username), mount_info)
|
||||||
|
@ -344,6 +399,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
def stop_tasks(self, request, context):
|
def stop_tasks(self, request, context):
|
||||||
for msg in request.taskmsgs:
|
for msg in request.taskmsgs:
|
||||||
lxcname = '%s-batch-%s-%s-%s' % (msg.username,msg.taskid,str(msg.instanceid),msg.token)
|
lxcname = '%s-batch-%s-%s-%s' % (msg.username,msg.taskid,str(msg.instanceid),msg.token)
|
||||||
|
logger.info("Stop the task with lxc:"+lxcname)
|
||||||
subprocess.run("lxc-stop -k -n %s" % lxcname, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
|
subprocess.run("lxc-stop -k -n %s" % lxcname, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
|
||||||
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
|
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
|
||||||
|
|
||||||
|
|
|
@ -220,7 +220,7 @@ class Worker(object):
|
||||||
if status:
|
if status:
|
||||||
# master has know the worker so we start send heartbeat package
|
# master has know the worker so we start send heartbeat package
|
||||||
if value=='ok':
|
if value=='ok':
|
||||||
self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 3)
|
self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 60)
|
||||||
else:
|
else:
|
||||||
logger.error("get key %s failed, master may be crashed" % self.addr)
|
logger.error("get key %s failed, master may be crashed" % self.addr)
|
||||||
self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 60)
|
self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 60)
|
||||||
|
|
Loading…
Reference in New Issue