PHengLEI-TestCases/Y02_ThreeD_M6_Unstruct_Bran.../acllite_resource.py

116 lines
3.4 KiB
Python
Raw Normal View History

2023-06-28 18:11:30 +08:00
# -*- coding: utf-8 -*-
# @Author: sun
# @Date: 2022-03-23 19:32:36
# @Last Modified by: sun
# @Last Modified time: 2022-03-23 19:32:51
"""
Copyright (R) @huawei.com, all rights reserved
-*- coding:utf-8 -*-
CREATED: 2021-01-20 20:12:13
MODIFIED: 2021-02-03 14:04:45
"""
import threading
import acl
import acllite_utils as utils
REGISTER = 0
UNREGISTER = 1
class _ResourceList(object):
"""Acl resources of current application
This class provide register inferace of acl resource, when application
exit, all register resource will release befor acl.rt.reset_device to
avoid program abnormal
"""
_instance_lock = threading.Lock()
def __init__(self):
self.resources = []
def __new__(cls, *args, **kwargs):
if not hasattr(_ResourceList, "_instance"):
with _ResourceList._instance_lock:
if not hasattr(_ResourceList, "_instance"):
_ResourceList._instance = object.__new__(
cls, *args, **kwargs)
return _ResourceList._instance
def register(self, resource):
"""Resource register interface
Args:
resource: object with acl resource, the object must be has
method destroy()
"""
item = {"resource": resource, "status": REGISTER}
self.resources.append(item)
def unregister(self, resource):
"""Resource unregister interface
If registered resource release by self and no need _ResourceList
release, the resource object should unregister self
Args:
resource: registered resource
"""
for item in self.resources:
if resource == item["resource"]:
item["status"] = UNREGISTER
def destroy(self):
"""Destroy all register resource"""
for item in self.resources:
if item["status"] == REGISTER:
item["resource"].destroy()
item["status"] = UNREGISTER
resource_list = _ResourceList()
class AclLiteResource(object):
"""
AclLiteResource
"""
def __init__(self, device_id=0):
self.device_id = device_id
self.context = None
self.stream = None
self.run_mode = None
def init(self):
"""
init resource
"""
print("init resource stage:")
ret = acl.init()
print(f"resource ret:{ret}")
utils.check_ret("acl.init", ret)
ret = acl.rt.set_device(self.device_id)
utils.check_ret("acl.rt.set_device", ret)
self.context, ret = acl.rt.create_context(self.device_id)
utils.check_ret("acl.rt.create_context", ret)
self.stream, ret = acl.rt.create_stream()
utils.check_ret("acl.rt.create_stream", ret)
self.run_mode, ret = acl.rt.get_run_mode()
utils.check_ret("acl.rt.get_run_mode", ret)
print("Init resource success")
def __del__(self):
print("acl resource release all resource")
resource_list.destroy()
if self.stream:
print("acl resource release stream")
acl.rt.destroy_stream(self.stream)
if self.context:
print("acl resource release context")
acl.rt.destroy_context(self.context)
print("Reset acl device ", self.device_id)
acl.rt.reset_device(self.device_id)
acl.finalize()
print("Release acl resource success")