Initial commit

This commit is contained in:
Feng Ren 2024-06-26 15:59:19 +08:00
parent 7632450819
commit fe0d5095f1
25 changed files with 1589 additions and 1 deletions

173
README.md Normal file → Executable file
View File

@ -1,2 +1,173 @@
# jycache-model
## 什么是 JYCache for Model
JYCache for Model (简称 "jycache-model") 目标成为一款“小而美”的工具,帮助用户能够方便地从模型仓库下载、管理、分享模型文件。
通常模型文件很大模型仓库的网络带宽不高而且不稳定有些模型仓库需要设置代理或者使用镜像网站才能访问。因此用户下载模型文件的时间往往很长。jycache-model 提供了 P2P 的模型共享方式,让用户能够以更快的速度获得所需要的模型。
![[hffs-readme-diagram|1000]]
jycache-model 的典型使用场景有:
- **同伴之间模型共享**:如果实验室或者开发部门的其他小伙伴已经下载了你需要的模型文件**HFFS 的 P2P 共享方式能让你从他们那里得到模型模型的下载速度不再是个头疼的问题。当然如果目标模型还没有被其他人下载过jycache-model 会自动从模型仓库下载模型,然后你可以通过 jycache-model 把模型分享给其他小伙伴。
- **机器之间模型传输**有些小伙伴需要两台主机Windows 和 Linux完成模型的下载和使用Windows 上的 VPN 很容易配置所以它负责下载模型Linux 的开发环境很方便,所以它负责模型的微调、推理等任务。通过 jycache-model 的 P2P 共享功能,两台主机之间的模型下载和拷贝就不再需要手动操作了。
- **多源断点续传**:浏览器默认不支持模型下载的断点续传,但是 jycache-model 支持该功能。无论模型文件从哪里下载模型仓库或者其他同伴的机器jycache-model 支持不同下载源之间的断点续传。
## jycache-model 如何工作
![[hffs-simple-architecture|1000]]
1. 通过 `hffs daemon start` 命令启动 HFFS daemon 服务;
2. 通过 `hffs peer add` 相关命令把局域网内其他机器作为 peer 和本机配对;
3. 通过 `hffs model add` 命令下载目标模型文件;
在下载目标模型文件的时候,本机的 daemon 服务会从其他匹配的 peer 机器以及 hf.co 主站以其镜像网站查找目标模型文件,并选择最快的方式进行下载。
`hffs daemon`、`hffs peer`、`hffs model` 命令还包括其他的功能,请见下面的文档说明。
## 安装
> [!NOTE]
> 确保你安装了 Python 3.11+ 版本并且安装了 pip。
> 可以考虑使用 [Miniconda](https://docs.anaconda.com/miniconda/miniconda-install/) 安装和管理不同版本的 Python。
> pip 的使用见 [这里](https://pip.pypa.io/en/stable/cli/pip_install/)。
```bash
pip install -i https://test.pypi.org/simple/ hffs
```
## 命令行
### HFFS Daemon 服务管理
#### 启动 HFFS Daemon
```bash
hffs daemon start [--port PORT_NUM]
```
`--port` 参数配置 daemon 进程的端口号。
#### 关闭 HFFS
```bash
hffs daemon stop
```
### Peer 管理
> [!NOTE]
> 关于自动 Peer 管理为了提高易用性HFFS 计划加入自动 Peer 管理功能HFFS 自动发现、连接 Peer。在该功能发布以前用户可以通过下面的命令手动管理 Peer。
在 Unix-like 操作系统上,可以使用 [这里](https://www.51cto.com/article/720658.html) 介绍的 `ifconfig` 或者 `hostname` 命令行查找机器的 IP 地址。 在 Windows 操作系统上,可以使用 [这里](https://support.microsoft.com/zh-cn/windows/%E5%9C%A8-windows-%E4%B8%AD%E6%9F%A5%E6%89%BE-ip-%E5%9C%B0%E5%9D%80-f21a9bbc-c582-55cd-35e0-73431160a1b9) 介绍的方式找到机器的 IP 地址。
#### 添加 Peer
```bash
hffs peer add IP [--port PORT_NUM]
```
用户通过上面的命令把 peer 节点配对。配对后,两个节点之间的模型实现互通共享。其中,
- `IP` 参数是目标节点的 ip 地址
- `PORT_NUM` 参数是目标节点 daemon 进程的端口号
#### 查看 Peer
```bash
hffs peer ls
```
用户通过上面的命令查看 peer 信息。
在 Daemon 已经启动的情况下, Daemon 会定期查询其他 peer 是否在线。`hffs peer ls` 命令会把在线的 peer 标注为 "_active_"。
> [!NOTE]
> 如果 peer 互通在 Windows 上出现问题请检查1. Daemon 是否已经启动2. Windows 的防火墙是否打开(参见 [这里](https://support.microsoft.com/zh-cn/windows/%E5%85%81%E8%AE%B8%E5%BA%94%E7%94%A8%E9%80%9A%E8%BF%87-windows-defender-%E9%98%B2%E7%81%AB%E5%A2%99%E7%9A%84%E9%A3%8E%E9%99%A9-654559af-3f54-3dcf-349f-71ccd90bcc5c)
#### 删除 Peer
```bash
hffs peer rm IP [--port PORT_NUM]
```
用户通过上面的命令删除 peer 节点。
### 模型管理
#### 查看模型
```bash
hffs model ls [--repo_id REPO_ID] [--file FILE]
```
扫描已经下载的模型。该命令返回如下信息:
- 如果没有指定 REPO_ID返回 repo 列表
- `REPO_ID` 的 [相关文档](https://huggingface.co/docs/hub/en/api#get-apimodelsrepoid-or-apimodelsrepoidrevisionrevision)
- 如果制定了 REPO_ID但是没有指定 FILE返回 repo 中所有缓存的文件
- `FILE` 是模型文件相对 git root 目录的相对路径,该路径可以在 huggingface 的网页上查看
- 在执行添加、删除模型文件命令的时候,都需要使用该路径作为参数指定目标文件;
- 如果同时制定了 `REPO_ID``FILE`,返回指定文件在本机文件系统中的绝对路径
- 用户可以使用该绝对路径访问模型文件
- 注意:在 Unix-like 的操作系统中,由于缓存内部使用了软连接的方式保存文件,目标模型文件的 git 路径以及文件系统中的路径别没有直接关系
#### 搜索模型
```bash
hffs model search REPO_ID FILE [--revision REVISION]
```
搜索目标模型文件在哪些 peer 上已经存在。
- 如果模型还未下载到本地,从 peer 节点或者 hf.co 下载目标模型
- `REPO_ID` 参数说明见 `hffs model ls` 命令
- `FILE` 参数说明见 `hffs model ls` 命令
- `REVISION` 的 [相关文档](https://huggingface.co/docs/hub/en/api#get-apimodelsrepoid-or-apimodelsrepoidrevisionrevision)
#### 添加模型
```bash
hffs model add REPO_ID FILE [--revision REVISION]
```
下载指定的模型。
- 如果模型还未下载到本地,从 peer 节点或者 hf.co 下载目标模型
- `REPO_ID` 参数说明见 `hffs model ls` 命令
- `FILE` 参数说明见 `hffs model ls` 命令
- `REVISION` 参数说明见 `hffs model search` 命令
#### 删除模型
```bash
hffs model rm REPO_ID FILE [--revision REVISION]
```
删除已经下载的模型数据。
- 如果模型还未下载到本地,从 peer 节点或者 hf.co 下载目标模型
- `REPO_ID` 参数说明见 `hffs model ls` 命令
- `FILE` 参数说明见 `hffs model ls` 命令
- `REVISION` 的 [相关文档](https://huggingface.co/docs/hub/en/api#get-apimodelsrepoid-or-apimodelsrepoidrevisionrevision)
### 卸载管理
#### 卸载软件
> [!WARNING]
> 卸载软件将会清除所有添加的配置以及已下载的模型,无法恢复,请谨慎操作!
```bash
# 清除用户数据
hffs uninstall
# 卸载软件包
pip uninstall hffs
```

10
hc_acc_cli.toml Normal file
View File

@ -0,0 +1,10 @@
[model]
download_dir="download"
[model.aria2]
exec_path=""
conf_path=""
[peer]
data_path="peers.json"

0
hffs/__init__.py Normal file
View File

0
hffs/client/__init__.py Normal file
View File

View File

@ -0,0 +1,83 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import psutil
import logging
import time
import shutil
import platform
import signal
import subprocess
import asyncio
from ..common.settings import HFFS_EXEC_NAME
from .http_client import get_service_status, post_stop_service
async def is_service_running():
try:
_ = await get_service_status()
return True
except ConnectionError:
return False
except Exception as e:
logging.info(f"If error not caused by service not start, may need check it! ERROR: {e}")
return False
async def stop_service():
try:
await post_stop_service()
logging.info("Service stopped success!")
except ConnectionError:
logging.info("Can not connect to service, may already stopped!")
except Exception as e:
raise SystemError(f"Failed to stop service! ERROR: {e}")
async def daemon_start(args):
if await is_service_running():
raise LookupError("Service already start!")
exec_path = shutil.which(HFFS_EXEC_NAME)
if not exec_path:
raise FileNotFoundError(HFFS_EXEC_NAME)
creation_flags = 0
if platform.system() in ["Linux"]:
# deal zombie process
signal.signal(signal.SIGCHLD, signal.SIG_IGN)
elif platform.system() in ["Windows"]:
creation_flags = subprocess.CREATE_NO_WINDOW
cmdline_daemon_false = "--daemon=false"
_ = subprocess.Popen([exec_path, "daemon", "start", "--port={}".format(args.port), cmdline_daemon_false],
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
creationflags=creation_flags)
wait_start_time = 3
await asyncio.sleep(wait_start_time)
if await is_service_running():
logging.info("Daemon process started successfully")
else:
raise LookupError("Daemon start but not running, check service or retry!")
async def daemon_stop():
if not await is_service_running():
logging.info("Service not running, stop nothing!")
return
await stop_service()
wait_stop_time = 3
await asyncio.sleep(wait_stop_time)
if await is_service_running():
raise LookupError("Stopped service but still running, check service or retry!")

188
hffs/client/http_client.py Normal file
View File

@ -0,0 +1,188 @@
import asyncio
import time
import os
import aiohttp
import aiohttp.client_exceptions
import logging
from ..common.peer import Peer
from huggingface_hub import hf_hub_url, get_hf_file_metadata
from ..common.settings import load_local_service_port, HFFS_API_PING, HFFS_API_PEER_CHANGE, HFFS_API_ALIVE_PEERS
from ..common.settings import HFFS_API_STATUS, HFFS_API_STOP
logger = logging.getLogger(__name__)
LOCAL_HOST = "127.0.0.1"
def timeout_sess(timeout=60):
return aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout))
async def ping(peer, timeout=15):
alive = False
seq = os.urandom(4).hex()
url = f"http://{peer.ip}:{peer.port}" + HFFS_API_PING + f"?seq={seq}"
logger.debug(f"probing {peer.ip}:{peer.port}, seq = {seq}")
try:
async with timeout_sess(timeout) as session:
async with session.get(url) as response:
if response.status == 200:
alive = True
except TimeoutError:
pass
except Exception as e:
logger.warning(e)
peer.set_alive(alive)
peer.set_epoch(int(time.time()))
status_msg = "alive" if alive else "dead"
logger.debug(f"Peer {peer.ip}:{peer.port} (seq:{seq}) is {status_msg}")
return peer
async def alive_peers(timeout=2):
port = load_local_service_port()
url = f"http://{LOCAL_HOST}:{port}" + HFFS_API_ALIVE_PEERS
peers = []
try:
async with timeout_sess(timeout) as session:
async with session.get(url) as response:
if response.status == 200:
peer_list = await response.json()
peers = [Peer.from_dict(peer) for peer in peer_list]
else:
err = f"Failed to get alive peers, HTTP status: {response.status}"
logger.error(err)
except aiohttp.client_exceptions.ClientConnectionError:
logger.warning("Prompt: connect local service failed, may not start, "
"execute hffs daemon start to see which peers are active")
except TimeoutError:
logger.error("Prompt: connect local service timeout, may not start, "
"execute hffs daemon start to see which peers are active")
except Exception as e:
logger.warning(e)
logger.warning("Connect service error, please check it, usually caused by service not start!")
return peers
async def search_coro(peer, repo_id, revision, file_name):
"""Check if a certain file exists in a peer's model repository
Returns:
Peer or None: if the peer has the target file, return the peer, otherwise None
"""
try:
async with timeout_sess(10) as session:
async with session.head(f"http://{peer.ip}:{peer.port}/{repo_id}/resolve/{revision}/{file_name}") as response:
if response.status == 200:
return peer
except Exception:
return None
async def do_search(peers, repo_id, revision, file_name):
tasks = []
def all_finished(tasks):
return all([task.done() for task in tasks])
async with asyncio.TaskGroup() as g:
for peer in peers:
coro = search_coro(peer, repo_id, revision, file_name)
tasks.append(g.create_task(coro))
while not all_finished(tasks):
await asyncio.sleep(1)
print(".", end="")
# add new line after the dots
print("")
return [task.result() for task in tasks if task.result() is not None]
async def search_model(peers, repo_id, file_name, revision):
if not peers:
logger.info("No active peers to search")
return []
logger.info("Will check the following peers:")
logger.info(Peer.print_peers(peers))
avails = await do_search(peers, repo_id, revision, file_name)
logger.info("Peers who have the model:")
logger.info(Peer.print_peers(avails))
return avails
async def get_model_etag(endpoint, repo_id, filename, revision='main'):
url = hf_hub_url(
repo_id=repo_id,
filename=filename,
revision=revision,
endpoint=endpoint
)
metadata = get_hf_file_metadata(url)
return metadata.etag
async def notify_peer_change(timeout=2):
try:
port = load_local_service_port()
except LookupError:
return
url = f"http://{LOCAL_HOST}:{port}" + HFFS_API_PEER_CHANGE
try:
async with timeout_sess(timeout) as session:
async with session.get(url) as response:
if response.status != 200:
logger.debug(f"Peer change http status: {response.status}")
except TimeoutError:
pass # silently ignore timeout
except aiohttp.client_exceptions.ClientConnectionError:
logger.error("Connect local service failed, please check service!")
except Exception as e:
logger.error(f"Peer change error: {e}")
logger.error("Please check the error, usually caused by local service not start!")
async def get_service_status():
port = load_local_service_port()
url = f"http://{LOCAL_HOST}:{port}" + HFFS_API_STATUS
timeout = 5
try:
async with timeout_sess(timeout) as session:
async with session.get(url) as response:
if response.status != 200:
raise ValueError(f"Server response not 200 OK! status: {response.status}")
else:
return await response.json()
except (TimeoutError, ConnectionError, aiohttp.client_exceptions.ClientConnectionError):
raise ConnectionError("Connect server failed or timeout")
async def post_stop_service():
port = load_local_service_port()
url = f"http://{LOCAL_HOST}:{port}" + HFFS_API_STOP
timeout = 5
try:
async with timeout_sess(timeout) as session:
async with session.post(url) as response:
if response.status != 200:
raise ValueError(f"Server response not 200 OK! status: {response.status}")
except (TimeoutError, ConnectionError, aiohttp.client_exceptions.ClientConnectionError):
raise ConnectionError("Connect server failed or timeout")

View File

@ -0,0 +1,237 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import logging
from pathlib import Path
from prettytable import PrettyTable
from huggingface_hub import scan_cache_dir, hf_hub_download, CachedRevisionInfo, CachedRepoInfo, HFCacheInfo
from . import http_client
from ..common.settings import HFFS_MODEL_DIR
from ..common.hf_adapter import save_etag
logger = logging.getLogger(__name__)
def _assume(pred, msg):
if not pred:
logger.info(msg)
raise ValueError()
def _is_parent(parent: Path, child: Path):
try:
child.absolute().relative_to(parent.absolute())
return True
except ValueError:
return False
def _rm_file(fp: Path, root_path: Path, msg: str):
# fp is NOT in root_path, raise error
_assume(_is_parent(root_path, fp), f"{fp} is not in {root_path}")
# remove target file
if fp.exists() and fp.is_file():
fp.unlink()
logger.debug(f"{msg}: {fp}")
# remove parent directories if empty up to root_path
parent_dir = fp.parent
while _is_parent(root_path, parent_dir):
if not any(parent_dir.iterdir()):
parent_dir.rmdir()
logger.debug(f"Remove {parent_dir}")
parent_dir = parent_dir.parent
else:
break
def _match_repo(cache_info: HFCacheInfo, repo_id):
for repo in cache_info.repos:
if repo.repo_id == repo_id:
return repo
return None
def _match_rev(repo_info: CachedRepoInfo, revision):
for rev in repo_info.revisions:
if revision in rev.refs or rev.commit_hash.startswith(revision):
return rev
return None
def _match_file(rev_info: CachedRevisionInfo, file_name: str):
file_path = rev_info.snapshot_path / file_name
for f in rev_info.files:
if f.file_path == file_path:
return f
return None
def _rm(repo_id, file_name, revision="main"):
# check necessary arguments
_assume(repo_id, "Missing repo_id")
_assume(file_name, "Missing file_name")
_assume(revision, "Missing revision")
if os.path.isabs(file_name):
raise LookupError("File path is path relative to repo, not the path in operating system!")
# match cached repo
cache_info = scan_cache_dir(HFFS_MODEL_DIR)
repo_info = _match_repo(cache_info, repo_id)
_assume(repo_info, "No matching repo")
# match cached revision
rev_info = _match_rev(repo_info, revision)
_assume(rev_info, "No matching revision")
# match cached file
file_info = _match_file(rev_info, file_name)
_assume(file_info, "No matching file")
# remove snapshot file
_rm_file(file_info.file_path,
repo_info.repo_path / "snapshots",
"Remove snapshot file")
# remove blob file, on platform not support symbol link, there are equal
if file_info.blob_path != file_info.file_path:
_rm_file(file_info.blob_path,
repo_info.repo_path / "blobs",
"Remove blob")
# if the snapshot dir is not longer existing, it means that the
# revision is deleted entirely, hence all the refs pointing to
# the revision should be deleted
ref_dir = repo_info.repo_path / "refs"
if not rev_info.snapshot_path.exists() and ref_dir.exists():
ref_files = [ref_dir / ref for ref in rev_info.refs]
for ref in ref_files:
_rm_file(ref, ref_dir, "Remove ref file")
def _ls_repos():
cache_info = scan_cache_dir(cache_dir=HFFS_MODEL_DIR)
table = PrettyTable()
table.field_names = [
"REPO ID",
"SIZE",
"NB FILES",
"LOCAL PATH",
]
table.add_rows([
repo.repo_id,
"{:>12}".format(repo.size_on_disk_str),
repo.nb_files,
str(repo.repo_path),
]
for repo in cache_info.repos
)
# Print the table to stdout
print(table)
def _ls_repo_files(repo_id):
cache_info = scan_cache_dir(HFFS_MODEL_DIR)
repo_info = _match_repo(cache_info, repo_id)
_assume(repo_info, "No matching repo")
files = []
for rev in repo_info.revisions:
for f in rev.files:
refs = ", ".join(rev.refs)
commit = rev.commit_hash[:8]
file_name = f.file_path.relative_to(rev.snapshot_path)
file_path = f.file_path
files.append((refs, commit, file_name, f.size_on_disk_str, file_path))
table = PrettyTable()
table.field_names = ["REFS", "COMMIT", "FILE", "SIZE", "PATH"]
table.add_rows(files)
print(table)
class ModelManager:
def init(self):
if not os.path.exists(HFFS_MODEL_DIR):
os.makedirs(HFFS_MODEL_DIR)
async def search_model(self, repo_id, file_name, revision="main"):
active_peers = await http_client.alive_peers()
avail_peers = await http_client.search_model(active_peers, repo_id, file_name, revision)
return (active_peers, avail_peers)
async def add(self, repo_id, file_name, revision="main"):
async def do_download(endpoint):
path = None
try:
path = hf_hub_download(repo_id,
revision=revision,
cache_dir=HFFS_MODEL_DIR,
filename=file_name,
endpoint=endpoint)
except Exception as e:
logger.info(
f"Failed to download model from {endpoint}. Reason: {e}")
return False, None
try:
etag = await http_client.get_model_etag(endpoint, repo_id, file_name, revision)
if not etag:
raise ValueError("ETag not found!")
save_etag(etag, repo_id, file_name, revision)
except Exception as e:
logger.info(
f"Failed to save etag from {endpoint} for {repo_id}/{file_name}@{revision}")
logger.debug(e)
return False, None
return True, path
if not file_name:
raise ValueError(
"Current not support download full repo, file name must be provided!")
_, avails = await self.search_model(repo_id, file_name, revision)
for peer in avails:
done, path = await do_download(f"http://{peer.ip}:{peer.port}")
if done:
logger.info(f"Download successfully: {path}")
return
logger.info("Cannot download from peers; try mirror sites")
done, path = await do_download("https://hf-mirror.com")
if done:
logger.info(f"Download successfully: {path}")
return
logger.info("Cannot download from mirror site; try hf.co")
done, path = await do_download("https://huggingface.co")
if done:
logger.info(f"Download successfully: {path}")
return
logger.info(
"Cannot find target model in hf.co; double check the model info")
def ls(self, repo_id):
if not repo_id:
_ls_repos()
else:
_ls_repo_files(repo_id)
def rm(self, repo_id, file_name, revision="main"):
try:
_rm(repo_id, file_name, revision)
logger.info("Success to delete file!")
except ValueError:
logger.info("Failed to remove model")

View File

@ -0,0 +1,60 @@
import logging
import urllib3
from typing import List
from ..common.peer import Peer
from .http_client import notify_peer_change, alive_peers
def check_valid_ip_port(ip, port):
converted_url = "{}:{}".format(ip, port)
try:
parsed_url = urllib3.util.parse_url(converted_url)
if not parsed_url.host or not parsed_url.port:
raise ValueError("Should be not None!")
except Exception:
raise ValueError("Invalid IP or port format! IP: {}, port:{}".format(ip, port))
class PeerManager:
DEFAULT_PORT = 9009
def __init__(self, peer_store):
self._peer_store = peer_store
def add_peer(self, ip, port=None):
peer_port = port if port else self.DEFAULT_PORT
check_valid_ip_port(ip, port)
peer = Peer(ip, peer_port)
self._peer_store.add_peer(peer)
logging.info("Add success!")
def remove_peer(self, ip, port=None):
peer_port = port if port else self.DEFAULT_PORT
check_valid_ip_port(ip, port)
peer = Peer(ip, peer_port)
self._peer_store.remove_peer(peer)
logging.info("Remove success!")
def get_peers(self) -> List[Peer]:
return self._peer_store.get_peers()
async def list_peers(self):
alives = await alive_peers()
alives = set(alives)
peers = sorted(self.get_peers())
if len(peers) == 0:
print("No peer is configured.")
return
print("List of peers:")
for peer in peers:
alive = "alive" if peer in alives else ""
print(f"{peer.ip}\t{peer.port}\t{alive}")
async def notify_peer_change(self):
await notify_peer_change()

View File

@ -0,0 +1,32 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import shutil
from ..common.settings import HFFS_HOME
from .daemon_manager import daemon_stop
async def uninstall_hffs():
logging.warning("WARNING: will delete all hffs data on disk, can't recovery it!")
logging.info("\n{}\n".format(HFFS_HOME))
first_confirm = input("UP directory will be delete! Enter y/Y to confirm:")
if first_confirm not in ["y", "Y"]:
logging.info("Canceled uninstall!")
return
second_confirm = input("\nPlease enter y/Y confirm it again, then start uninstall: ")
if second_confirm not in ["y", "Y"]:
logging.info("Canceled uninstall!")
return
await daemon_stop()
shutil.rmtree(HFFS_HOME, ignore_errors=True)
print("Uninstall success!")

0
hffs/common/__init__.py Normal file
View File

96
hffs/common/hf_adapter.py Normal file
View File

@ -0,0 +1,96 @@
import os
import huggingface_hub as hf
from . import settings
def get_sym_path(repo_path, commit_hash, file_path):
return os.path.normpath(f"{repo_path}/snapshots/{commit_hash}/{file_path}")
def file_in_cache(repo_id, file_name, revision="main"):
# see https://huggingface.co/docs/huggingface_hub/v0.23.0/en/package_reference/cache
# for API about HFCacheInfo, CachedRepoInfo, CachedRevisionInfo, CachedFileInfo
cache_info = hf.scan_cache_dir(settings.HFFS_MODEL_DIR)
repo_info = None
repo_path = None
for repo in cache_info.repos:
if repo.repo_id == repo_id:
repo_info = repo
repo_path = repo.repo_path
break
if repo_info is None:
return None # no matching repo
commit_hash = None
rev_info = None
for rev in repo_info.revisions:
if rev.commit_hash.startswith(revision) or revision in rev.refs:
commit_hash = rev.commit_hash
rev_info = rev
break
if commit_hash is None:
return None # no matching revision
etag = None
size = None
file_path = None
sym_path = get_sym_path(repo_path, commit_hash, file_name)
for f in rev_info.files:
if sym_path == str(f.file_path):
size = f.size_on_disk
etag = try_to_load_etag(repo_id, file_name, revision)
file_path = f.file_path
break
if file_path is None:
return None # no matching file
return {
"etag": etag,
"commit_hash": commit_hash,
"size": size,
"file_path": file_path
}
def get_etag_path(repo_id, filename, revision="main"):
model_path = hf.try_to_load_from_cache(
repo_id=repo_id,
filename=filename,
cache_dir=settings.HFFS_MODEL_DIR,
revision=revision,
)
if model_path == hf._CACHED_NO_EXIST:
return None
file_path = os.path.relpath(model_path, settings.HFFS_MODEL_DIR)
return os.path.join(settings.HFFS_ETAG_DIR, file_path)
def try_to_load_etag(repo_id, filename, revision="main"):
etag_path = get_etag_path(repo_id, filename, revision)
if not etag_path or not os.path.exists(etag_path):
return None
with open(etag_path, "r") as f:
return f.read().strip()
def save_etag(etag, repo_id, filename, revision="main"):
etag_path = get_etag_path(repo_id, filename, revision)
if not etag_path:
raise ValueError(
f"Failed to get etag path for repo={repo_id}, file={filename}, revision={revision}")
os.makedirs(os.path.dirname(etag_path), exist_ok=True)
with open(etag_path, "w+") as f:
f.write(etag)

61
hffs/common/peer.py Normal file
View File

@ -0,0 +1,61 @@
class Peer:
def __init__(self, ip, port) -> None:
self._ip = ip
self._port = int(port)
self._alive = False
self._epoch = 0
@property
def ip(self):
return self._ip
@property
def port(self):
return self._port
def is_alive(self):
return self._alive
def set_alive(self, alive):
self._alive = alive
def set_epoch(self, epoch):
self._epoch = epoch
def get_epoch(self):
return self._epoch
def __lt__(self, value):
if not isinstance(value, Peer):
raise TypeError()
if self.ip < value.ip:
return True
elif self.port < value.port:
return True
else:
return False
def __eq__(self, value: object) -> bool:
return isinstance(value, Peer) and self.ip == value.ip and self.port == value.port
def __hash__(self) -> int:
return hash((self.ip, self.port))
def __repr__(self) -> str:
return f"{self.ip}:{self.port}"
def to_dict(self):
return {
"ip": self.ip,
"port": self.port
}
@classmethod
def from_dict(cls, data):
return cls(data["ip"], data["port"])
@classmethod
def print_peers(cls, peers):
return [f"{p}" for p in peers]

52
hffs/common/peer_store.py Normal file
View File

@ -0,0 +1,52 @@
import os
from .peer import Peer
from .settings import HFFS_HOME, HFFS_PEER_CONF
import logging
logger = logging.getLogger(__name__)
def create_file():
os.makedirs(HFFS_HOME, exist_ok=True)
if not os.path.exists(HFFS_PEER_CONF):
with open(HFFS_PEER_CONF, "w", encoding="utf-8"):
logger.debug(f"Created {HFFS_PEER_CONF}")
class PeerStore:
def __init__(self):
self._peers = set()
def __enter__(self):
self.open()
return self
def __exit__(self, type, value, traceback):
if traceback:
logger.debug(f"PeerStore error, type=<{type}>, value=<{value}>")
self.close()
def _load_peers(self):
with open(HFFS_PEER_CONF, "r+", encoding="utf-8") as f:
for line in f:
ip, port = line.strip().split(":")
peer = Peer(ip, port)
self._peers.add(peer)
def open(self):
create_file()
self._load_peers()
def close(self):
with open(HFFS_PEER_CONF, "w", encoding="utf-8") as f:
for peer in self._peers:
f.write(f"{peer.ip}:{peer.port}\n")
def add_peer(self, peer):
self._peers.add(peer)
def remove_peer(self, peer):
self._peers.discard(peer)
def get_peers(self):
return self._peers

39
hffs/common/settings.py Normal file
View File

@ -0,0 +1,39 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import configparser
HFFS_HOME_DEFAULT = os.path.join(os.path.expanduser("~"), ".cache/hffs")
HFFS_HOME = os.environ.get("HFFS_HOME", HFFS_HOME_DEFAULT)
HFFS_PEER_CONF = os.path.join(HFFS_HOME, "hffs_peers.conf")
HFFS_MODEL_DIR = os.path.join(HFFS_HOME, "models")
HFFS_ETAG_DIR = os.path.join(HFFS_HOME, "etags")
HFFS_CONF = os.path.join(HFFS_HOME, "hffs.conf")
HFFS_LOG_DIR = os.path.join(HFFS_HOME, "logs")
HFFS_EXEC_NAME = "hffs"
HFFS_API_PING = "/hffs_api/ping"
HFFS_API_ALIVE_PEERS = "/hffs_api/alive_peers"
HFFS_API_PEER_CHANGE = "/hffs_api/peer_change"
HFFS_API_STATUS = "/hffs_api/status"
HFFS_API_STOP = "/hffs_api/stop"
def save_local_service_port(port):
config = configparser.ConfigParser()
config["DEFAULT"] = {"SERVICE_PORT": str(port)}
with open(HFFS_CONF, "w") as f:
config.write(f)
def load_local_service_port():
config = configparser.ConfigParser()
if not os.path.exists(HFFS_CONF):
raise LookupError("Service port not found, have service start?")
config.read(HFFS_CONF)
return int(config["DEFAULT"]["SERVICE_PORT"])

183
hffs/hffs.py Executable file
View File

@ -0,0 +1,183 @@
#!/usr/bin/python3
import argparse
import asyncio
import os
import logging.handlers
import logging
import sys
from .common.peer_store import PeerStore
from .client import http_client
from .client.model_manager import ModelManager
from .client.peer_manager import PeerManager
from .server import http_server
from .common.settings import HFFS_LOG_DIR
from .client.daemon_manager import daemon_start, daemon_stop
from .client.uninstall_manager import uninstall_hffs
logger = logging.getLogger(__name__)
async def peer_cmd(args):
with PeerStore() as store:
peer_manager = PeerManager(store)
if args.peer_command == "add":
peer_manager.add_peer(args.IP, args.port)
elif args.peer_command == "rm":
peer_manager.remove_peer(args.IP, args.port)
elif args.peer_command == "ls":
await peer_manager.list_peers()
else: # no matching subcmd
raise ValueError("Invalid subcommand")
if args.peer_command in ("add", "rm"):
await peer_manager.notify_peer_change()
async def model_cmd(args):
model_manager = ModelManager()
model_manager.init()
if args.model_command == "search":
await model_manager.search_model(args.repo_id, args.file, args.revision)
elif args.model_command == "add":
await model_manager.add(args.repo_id, args.file, args.revision)
elif args.model_command == "ls":
model_manager.ls(args.repo_id)
elif args.model_command == "rm":
model_manager.rm(args.repo_id, revision=args.revision,
file_name=args.file)
else:
raise ValueError("Invalid subcommand")
async def daemon_cmd(args):
if args.daemon_command == "start":
if args.daemon == "true":
await daemon_start(args)
else:
await http_server.start_server(args.port)
elif args.daemon_command == "stop":
await daemon_stop()
async def uninstall_cmd():
await uninstall_hffs()
async def exec_cmd(args, parser):
try:
if args.command == "peer":
await peer_cmd(args)
elif args.command == "model":
await model_cmd(args)
elif args.command == "daemon":
await daemon_cmd(args)
elif args.command == "uninstall":
await uninstall_cmd()
else:
raise ValueError("Invalid command")
except ValueError as e:
print("{}".format(e))
parser.print_usage()
except Exception as e:
print(f"{e}")
def arg_parser():
parser = argparse.ArgumentParser(prog='hffs')
subparsers = parser.add_subparsers(dest='command')
# hffs daemon {start,stop} [--port port]
daemon_parser = subparsers.add_parser('daemon')
daemon_subparsers = daemon_parser.add_subparsers(dest='daemon_command')
daemon_start_parser = daemon_subparsers.add_parser('start')
daemon_start_parser.add_argument('--port', type=int, default=9009)
daemon_start_parser.add_argument("--daemon", type=str, default="true")
daemon_subparsers.add_parser('stop')
# hffs peer {add,rm,ls} IP [--port port]
peer_parser = subparsers.add_parser('peer')
peer_subparsers = peer_parser.add_subparsers(dest='peer_command')
peer_add_parser = peer_subparsers.add_parser('add')
peer_add_parser.add_argument('IP')
peer_add_parser.add_argument('--port', type=int, default=9009)
peer_rm_parser = peer_subparsers.add_parser('rm')
peer_rm_parser.add_argument('IP')
peer_rm_parser.add_argument('--port', type=int, default=9009)
peer_subparsers.add_parser('ls')
# hffs model {ls,add,rm,search} [--repo-id id] [--revision REVISION] [--file FILE]
model_parser = subparsers.add_parser('model')
model_subparsers = model_parser.add_subparsers(dest='model_command')
model_ls_parser = model_subparsers.add_parser('ls')
model_ls_parser.add_argument('--repo_id')
model_add_parser = model_subparsers.add_parser('add')
model_add_parser.add_argument('repo_id')
model_add_parser.add_argument('file')
model_add_parser.add_argument('--revision', type=str, default="main")
model_rm_parser = model_subparsers.add_parser('rm')
model_rm_parser.add_argument('repo_id')
model_rm_parser.add_argument('file')
model_rm_parser.add_argument('--revision', type=str, default="main")
model_search_parser = model_subparsers.add_parser('search')
model_search_parser.add_argument('repo_id')
model_search_parser.add_argument('file')
model_search_parser.add_argument('--revision', type=str, default="main")
# hffs uninstall
subparsers.add_parser('uninstall')
return parser.parse_args(), parser
def logging_level():
# Only use DEBUG or INFO level for logging
verbose = os.environ.get("HFFS_VERBOSE", None)
return logging.DEBUG if verbose else logging.INFO
def logging_handler(args):
# daemon's logs go to log files, others go to stdout
if args.command == "daemon" and args.daemon_command == "start":
os.makedirs(HFFS_LOG_DIR, exist_ok=True)
log_path = os.path.join(HFFS_LOG_DIR, "hffs.log")
handler = logging.handlers.RotatingFileHandler(log_path, maxBytes=2*1024*1024, backupCount=5)
log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
handler.setFormatter(logging.Formatter(log_format))
else:
handler = logging.StreamHandler(stream=sys.stderr)
log_format = "%(message)s"
handler.setFormatter(logging.Formatter(log_format))
return handler
def setup_logging(args):
# configure root logger
handler = logging_handler(args)
level = logging_level()
handler.setLevel(level)
root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(level)
# suppress lib's info log
logging.getLogger('asyncio').setLevel(logging.WARNING)
async def async_main():
args, parser = arg_parser()
setup_logging(args)
await exec_cmd(args, parser)
def main():
try:
asyncio.run(async_main())
except (KeyboardInterrupt, asyncio.exceptions.CancelledError):
# ignore error, async not run complete, error log may appear between async log
pass

0
hffs/server/__init__.py Normal file
View File

218
hffs/server/http_server.py Normal file
View File

@ -0,0 +1,218 @@
import asyncio
import json
import os
import logging
import re
from aiohttp import web
from aiohttp import streamer
from aiohttp.web_runner import GracefulExit
from contextvars import ContextVar
import huggingface_hub as hf
from .peer_prober import PeerProber
from ..common.peer_store import PeerStore
from ..common.hf_adapter import file_in_cache
from ..common.settings import save_local_service_port, HFFS_API_PING, HFFS_API_PEER_CHANGE, HFFS_API_ALIVE_PEERS
from ..common.settings import HFFS_API_STATUS, HFFS_API_STOP
ctx_var_peer_prober = ContextVar("PeerProber")
def extract_model_info(request):
user = request.match_info['user']
model = request.match_info['model']
revision = request.match_info['revision']
file_name = request.match_info['file_name']
repo_id = f"{user}/{model}"
return repo_id, file_name, revision
@streamer
async def file_sender(writer, file_path=None, file_range=()):
"""
This function will read large file chunk by chunk and send it through HTTP
without reading them into memory
"""
file_start, file_end = file_range
with open(file_path, 'rb') as f:
if file_start is not None:
f.seek(file_start)
buf_size = 2 ** 18
while True:
to_read = min(buf_size, file_end + 1 - f.tell()
if file_end else buf_size)
buf = f.read(to_read)
if not buf:
break
await writer.write(buf)
async def download_file(request):
def parse_byte_range(byte_range):
"""Returns the two numbers in 'bytes=123-456' or throws ValueError.
The last number or both numbers may be None.
"""
byte_range_re = re.compile(r'bytes=(\d+)-(\d+)?$')
if not byte_range or byte_range.strip() == '':
return None, None
m = byte_range_re.match(byte_range)
if not m:
raise ValueError('Invalid byte range %s' % byte_range)
first, last = [x and int(x) for x in m.groups()]
if last and last < first:
raise ValueError('Invalid byte range %s' % byte_range)
return first, last
try:
file_start, file_end = parse_byte_range(request.headers.get("Range"))
except Exception as e:
err_msg = "Invalid file range! ERROR: {}".format(e)
logging.warning(err_msg)
return web.Response(body=err_msg, status=400)
repo_id, file_name, revision = extract_model_info(request)
cached = file_in_cache(repo_id, file_name, revision)
if not cached:
logging.error("download 404 not cached")
return web.Response(
body=f'File <{file_name}> is not cached',
status=404)
headers = {"Content-disposition": f"attachment; filename={file_name}"}
file_path = cached["file_path"]
if not os.path.exists(file_path):
logging.error("download 404 not exist")
return web.Response(
body=f'File <{file_path}> does not exist',
status=404
)
logging.debug("download 200")
return web.Response(
body=file_sender(file_path=file_path,
file_range=(file_start, file_end)),
headers=headers
)
async def pong(_):
# logging.debug(f"[SERVER] seq={_.query['seq']}")
return web.Response(text='pong')
async def alive_peers(_):
peer_prober = ctx_var_peer_prober.get()
peers = peer_prober.get_actives()
return web.json_response([peer.to_dict() for peer in peers])
async def search_model(request):
repo_id, file_name, revision = extract_model_info(request)
cached = file_in_cache(repo_id, file_name, revision)
if not cached:
return web.Response(status=404)
else:
headers = {
hf.constants.HUGGINGFACE_HEADER_X_REPO_COMMIT: cached["commit_hash"],
"ETag": cached["etag"] if cached["etag"] else "",
"Content-Length": str(cached["size"]),
"Location": str(request.url),
}
logging.debug(f"search_model: {headers}")
return web.Response(status=200, headers=headers)
def get_peers():
peers = set()
with PeerStore() as peer_store:
peers = peer_store.get_peers()
return peers
async def on_peer_change(_):
peers = get_peers()
peer_prober: PeerProber = ctx_var_peer_prober.get()
peer_prober.update_peers(peers)
return web.Response(status=200)
async def get_service_status(_):
return web.json_response(data={})
async def post_stop_service(request):
resp = web.Response()
await resp.prepare(request)
await resp.write_eof()
logging.warning("Received exit request, exit server!")
raise GracefulExit()
async def start_server_safe(port):
# set up context before starting the server
peers = get_peers()
peer_prober = PeerProber(peers)
ctx_var_peer_prober.set(peer_prober)
# start peer prober to run in the background
asyncio.create_task(peer_prober.start_probe())
# start aiohttp server
app = web.Application()
# HEAD requests
app.router.add_head(
'/{user}/{model}/resolve/{revision}/{file_name:.*}', search_model)
# GET requests
app.router.add_get(HFFS_API_PING, pong)
app.router.add_get(HFFS_API_ALIVE_PEERS, alive_peers)
app.router.add_get(HFFS_API_PEER_CHANGE, on_peer_change)
app.router.add_get(
'/{user}/{model}/resolve/{revision}/{file_name:.*}', download_file)
app.router.add_get(HFFS_API_STATUS, get_service_status)
app.router.add_post(HFFS_API_STOP, post_stop_service)
# start web server
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner=runner, host='0.0.0.0', port=port)
await site.start()
save_local_service_port(port)
logging.info(f"HFFS daemon started at port {port}!")
# keep the server running
while True:
await asyncio.sleep(3600)
async def start_server(port):
try:
await start_server_safe(port)
except OSError as e:
if e.errno == 48:
print(f"Daemon is NOT started: port {port} is already in use")
except Exception as e:
logging.error("Failed to start HFFS daemon")
logging.error(e)

View File

@ -0,0 +1,80 @@
import asyncio
import heapq
import logging
from ..common.peer import Peer
from ..client.http_client import ping
class PeerProber:
def __init__(self, peers):
self._peers = peers
self._actives = set()
self._updates = None
self._probe_heap = []
self._probing = False
def get_actives(self):
return list(self._actives)
def update_peers(self, peers):
self._updates = set(peers)
def _reset_peer_heap(self):
self._probe_heap = []
for peer in self._peers:
heapq.heappush(self._probe_heap, (peer.get_epoch(), peer))
def _do_update_peers(self):
if self._updates:
self._peers = self._updates
self._updates = None
self._reset_peer_heap()
async def start_probe(self):
"""Start probing peers for liveness.
This function uses asyncio to probe peers for liveness. It will wake up every 1 seconds, and
pop a peer from the heap. It will then send a ping request to the peer. The peer is taken out
of the haep until we get a response from the peer or the ping request times out. After that,
the peer is put back into the heap.
"""
if self._probing:
return
self._probing = True
# Initialize the heap with the peers, sorted by their epoch
self._reset_peer_heap()
if len(self._probe_heap) == 0:
logging.info("No peers configured to probe")
def probe_cb(task):
try:
peer = task.result()
if isinstance(peer, Peer):
heapq.heappush(self._probe_heap, (peer.get_epoch(), peer))
if peer.is_alive():
self._actives.add(peer)
else:
self._actives.discard(peer)
except asyncio.exceptions.CancelledError:
logging.debug("probing is canceled")
while self._probing:
await asyncio.sleep(3)
self._do_update_peers()
if len(self._probe_heap) == 0:
continue
_, peer = heapq.heappop(self._probe_heap)
probe = asyncio.create_task(ping(peer))
probe.add_done_callback(probe_cb)
async def stop_probe(self):
self._probing = False
self._probe_heap = []
self._actives = set()

7
main.py Normal file
View File

@ -0,0 +1,7 @@
#!/usr/bin/python3
#-*- encoding: UTF-8 -*-
from hffs.hffs import main
if __name__ == "__main__":
main()

23
requirements.txt Normal file
View File

@ -0,0 +1,23 @@
aiohttp==3.9.5
aiosignal==1.3.1
attrs==23.2.0
certifi==2024.2.2
charset-normalizer==3.3.2
filelock==3.14.0
frozenlist==1.4.1
fsspec==2024.3.1
huggingface-hub==0.23.0
idna==3.7
ifaddr==0.2.0
multidict==6.0.5
packaging==24.0
prettytable==3.10.0
PyYAML==6.0.1
requests==2.31.0
tqdm==4.66.4
typing_extensions==4.11.0
urllib3==2.2.1
wcwidth==0.2.13
yarl==1.9.4
zeroconf==0.132.2
psutil==5.9.8

Binary file not shown.

After

Width:  |  Height:  |  Size: 238 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 78 KiB

2
setup.cfg Normal file
View File

@ -0,0 +1,2 @@
[metadata]
description-file = README.md

46
setup.py Normal file
View File

@ -0,0 +1,46 @@
#!/usr/bin/python3
#-*- encoding: UTF-8 -*-
from setuptools import setup, find_packages
"""
打包的用的setup必须引入
"""
VERSION = '0.1.1'
setup(name='hffs',
version=VERSION,
description="a tiny cli and server use p2p accelerate hugging face model download!",
long_description=open("README.md", "r").read(),
long_description_content_type="text/markdown",
classifiers=["Topic :: Software Development", "Development Status :: 3 - Alpha",
"Programming Language :: Python :: 3.11"],
# Get strings from http://pypi.python.org/pypi?%3Aaction=list_classifiers
keywords='hffs python hugging face download accelerate',
author='9#',
author_email='953175531@qq.com',
url='https://github.com/madstorage-dev/hffs',
license='',
packages=find_packages(),
include_package_data=True,
zip_safe=True,
install_requires=open('requirements.txt', 'r') .read().splitlines(),
python_requires=">=3.11",
entry_points={
'console_scripts': [
'hffs = hffs.hffs:main'
]
},
setup_requires=['setuptools', 'wheel']
)
# usage:
# requires:
# pip3 install twine
# clean:
# rm -rf build/ dist/ hffs.egg-info/
# build:
# python3 setup.py sdist bdist_wheel
# upload:
# twine upload dist/hffs*

0
test.py Normal file
View File