LinUCB算法项目合并请求 #2

Open
p63824079 wants to merge 15 commits from p63824079/JYCache:master into master
38 changed files with 2882 additions and 39 deletions

View File

@ -0,0 +1,147 @@
import numpy as np
import time
import random
from multiprocessing import Process, Manager
from ScheduleFrame import *
def gen_all_config(num_apps, num_resources):
"""
generate all resource config according to the number of apps and total resources
Args:
num_apps (int): number of apps
num_resources (int): total units of resources
Returns:
list<list>: a list containing all possible config, which is list<int>
"""
if num_apps == 1:
# Only one app, it gets all remaining resources
return [[num_resources]]
all_config = []
for i in range(num_resources + 1):
# Recursively allocate the remaining resources among the remaining app
for sub_allocation in gen_all_config(num_apps - 1, num_resources - i):
all_config.append([i] + sub_allocation)
return all_config
def find_vectors(x, d_max):
n = len(x)
s = sum(x)
solutions = []
def backtrack(current_vector, index, current_distance):
if current_distance > d_max or any(val < 0 for val in current_vector):
return
if index == n:
if sum(current_vector) == s:
solutions.append(list(current_vector))
return
for delta in range(-d_max, d_max + 1):
current_vector[index] += delta
new_distance = current_distance + abs(delta)
backtrack(current_vector, index + 1, new_distance)
current_vector[index] -= delta
initial_vector = x.copy()
backtrack(initial_vector, 0, 0)
return solutions
class EpsilonGreedy(ScheduleFrame):
def __init__(self, epsilon, factor_alpha, num_apps, num_resources):
super().__init__()
self.init_factor = [epsilon, factor_alpha, 0.95, 0.98]
self.epsilon = self.init_factor[0]
self.factor_alpha = self.init_factor[1]
self.all_cache_config = gen_all_config(num_apps, num_resources)
self.n_arms = len(self.all_cache_config)
self.last_arm_index = -1
# 探索过程随机探索与邻近探索的比例
self.sub_epsilon = self.init_factor[2]
self.sub_alpha = self.init_factor[3]
# 邻近探索的曼哈顿距离
self.dmax = 10
self.counts = np.zeros(self.n_arms)
self.values = np.zeros(self.n_arms)
self.neighbour = {}
self.last_reward = [-1] * self.n_arms
# 并行加速搜索临近配置的过程
st_time = time.time()
# for i in range(self.n_arms):
# self.neighbour[i] = find_vectors(self.all_cache_config[i], self.dmax)
all_threads = []
parts_neighbour = []
num_tasks = self.n_arms // 8
with Manager() as manager:
parts_neighbour.append(manager.dict())
for i in range(0, self.n_arms, num_tasks):
all_threads.append(Process(target=self.find_neighbours, args=(i, min(i + num_tasks, self.n_arms), parts_neighbour[-1])))
all_threads[-1].start()
for i in range(len(all_threads)):
all_threads[i].join()
for i in range(len(parts_neighbour)):
self.neighbour.update(parts_neighbour[i])
en_time = time.time()
print('construct neighbour :{}'.format(en_time - st_time))
def select_arm(self):
if np.random.rand() < self.epsilon:
# 探索
self.epsilon *= self.factor_alpha
if np.random.rand() < self.sub_epsilon:
# 随机探索
self.sub_epsilon *=self.sub_alpha
chosen_arm = np.random.randint(self.n_arms)
else:
# 最优点临近探索
curr_max = np.argmax(self.values)
num_neighbours = len(self.neighbour[int(curr_max)])
chosen = random.randint(0, num_neighbours - 1)
chosen_arm = self.all_cache_config.index(self.neighbour[int(curr_max)][chosen])
else:
# 利用
chosen_arm = np.argmax(self.values)
self.last_arm_index = chosen_arm
return self.all_cache_config[chosen_arm]
def update(self, reward, chosen_arm):
if self.last_arm_index == -1:
assert chosen_arm is not None, "Need Initial Arm Index"
chosen_arm_index = self.all_cache_config.index(list(chosen_arm.values()))
else:
chosen_arm_index = self.last_arm_index
last_reward = self.last_reward[chosen_arm_index]
if last_reward > 0 and float(abs(reward - last_reward)) / last_reward > 0.05:
# workload change
print('-----judge workload change, {}'.format(float(abs(reward - last_reward)) / last_reward))
self.last_reward = [-1] * self.n_arms
self.counts = np.zeros(self.n_arms)
self.values = np.zeros(self.n_arms)
self.epsilon = self.init_factor[0]
self.sub_epsilon = self.init_factor[2]
self.counts[chosen_arm_index] += 1
n = self.counts[chosen_arm_index]
value = self.values[chosen_arm_index]
new_value = ((n - 1) / n) * value + (1 / n) * reward
self.values[chosen_arm_index] = new_value
self.last_reward[chosen_arm_index] = reward
def find_neighbours(self, start, end, part_neighbour):
# 并行执行
# start_time = time.time()
for i in range(start, end):
part_neighbour[i] = find_vectors(self.all_cache_config[i], self.dmax)
# end_time = time.time()
# print('----- find_neighbour, from {} to {}, Used Time: {}'.format(start, end, end_time - start_time))
def get_now_reward(self, performance, context_info=None):
th_reward = sum(float(x) for x in performance) / len(performance)
return th_reward

271
LinUCBSchedule/LayerUCB.py Normal file
View File

@ -0,0 +1,271 @@
import random
import numpy as np
from itertools import zip_longest
from ScheduleFrame import *
def gen_feasible_configs(num_of_cache, cache_top_k):
# TODO根据各个应用的top_k整理出的所有可行的结果
num_app = len(cache_top_k)
top_k = len(cache_top_k[0])
def gen_side(tmp, k, n=1):
"""
:param root: root node, first app node
:param n: n_th in top_k
:return:
"""
if n == num_app:
return [[]]
for k in range(top_k):
t1 = k * top_k ** (num_app - n - 1)
t2 = top_k ** (num_app - 1) - (top_k - k - 1) * top_k ** (num_app - n - 1)
delta2 = top_k ** (num_app - 1 - n)
delta1 = top_k ** (num_app - n)
for i in range(t1, t2, delta1):
for j in range(i, i + delta2):
app_core = cache_top_k[n][k]
feasible_core = num_of_cache - sum(tmp[j])
if app_core > feasible_core:
tmp[j].append(feasible_core)
else:
tmp[j].append(app_core)
gen_side(n=n + 1, tmp=tmp, k=k)
return tmp
all_feasible_config = []
for i in range(len(cache_top_k)):
cache_top_k.append(cache_top_k.pop(0))
for j in range(top_k):
tmp = [[cache_top_k[0][j]] for _ in range(top_k ** (num_app - 1))]
res = gen_side(tmp, k=0)
for k in range(i + 1):
res = [[row[-1]] + row[:-1] for row in res]
all_feasible_config.extend(res)
# 先去重
unique_tuples = set(tuple(x) for x in all_feasible_config)
all_feasible_config = [list(x) for x in unique_tuples]
# 对未利用的资源重新分配
for config in all_feasible_config:
assert sum(config) <= num_of_cache, 'The allocated cache exceeds the limit'
if sum(config) < num_of_cache:
left_cache = num_of_cache - sum(config)
for i in range(left_cache):
min_value = min(config)
min_index = config.index(min_value)
config[min_index] += 1
return all_feasible_config
def get_top_k(arr, k, times):
if times < 5 or random.randint(1, 10) > 8:
arr_top_k_id = [random.randint(0, len(arr) - 1) for _ in range(k)]
else:
arr_top_k_id = np.argsort(arr)[-k:]
return list(arr_top_k_id)
def beam_search(num_of_cache, all_apps, p_c_t, times, end_condition=30):
# TODO从各个子bandit中选出top_k的配置并进行组合形成全局的配置
action = {}.fromkeys(all_apps)
num_app = len(all_apps)
top_k = int(10 ** (np.log10(end_condition) / num_app))
cache_top_k = [get_top_k(p_c_t[all_apps[i]], top_k, times) for i in range(num_app)]
feasible_configs = gen_feasible_configs(num_of_cache=num_of_cache, cache_top_k=cache_top_k)
sum_p_list = []
for config in feasible_configs:
try:
config_p = [p_c_t[all_apps[i]][config[i]] for i in range(num_app)]
sum_p = sum(config_p)
sum_p_list.append(sum_p)
except IndexError as e:
print(f"Caught an IndexError: {e}")
print(config)
print(cache_top_k)
cache_act = feasible_configs[np.argmax(sum_p_list)]
for i in range(num_app):
action[all_apps[i]] = cache_act[i]
return action
def latin_hypercube_sampling(n, d, m, M, ratio):
# Initialize the samples array
samples = np.zeros((n, d))
for i in range(n):
# Generate d random numbers and normalize them
while True:
x = np.random.uniform(m, M, d)
if np.sum(x) >= M:
break
x = x / np.sum(x) * M
# Check if all elements are in the range [m, M]
if np.all(x >= m) and np.all(x <= M):
samples[i] = x
else:
# Re-generate the sample if it doesn't satisfy the constraints
i -= 1
sample_config = []
for i in range(len(samples)):
tmp = [int(ele) for ele in samples[i]]
curr_sum = sum(tmp)
tmp[-1] += M - curr_sum
sample_config.append([ele * ratio for ele in tmp])
return sample_config
class LayerUCB(ScheduleFrame):
def __init__(self, all_apps, n_cache, alpha, factor_alpha, n_features):
super().__init__()
self.all_apps = all_apps
self.num_apps = len(all_apps)
self.arm_dict = {}
for app in self.all_apps:
self.arm_dict[app] = list(range(0, n_cache, 5))
self.n_features = n_features
self.A_c = {}
self.b_c = {}
self.p_c_t = {}
for app in self.all_apps:
self.A_c[app] = np.zeros((len(self.arm_dict[app]), self.n_features * 2, self.n_features * 2))
self.b_c[app] = np.zeros((len(self.arm_dict[app]), self.n_features * 2, 1))
self.p_c_t[app] = np.zeros(len(self.arm_dict[app]))
for arm in range(len(self.arm_dict[app])):
self.A_c[app][arm] = np.eye(self.n_features * 2)
# contexts
self.context = {}
self.other_context = {}
sum = np.zeros(n_features)
for app in self.all_apps:
self.context[app] = [1.0 for _ in range(self.n_features)]
sum += np.array(self.context[app])
for app in self.all_apps:
self.other_context[app] = list((sum - np.array(self.context[app])) / (len(self.all_apps) - 1))
return
def select_arm(self):
if self.sampling_model:
if self.times == len(self.sample_config):
# sample end
self.times += 1
self.sampling_model = False
# print(max(self.sample_result))
return self.sample_config[self.sample_result.index(max(self.sample_result))]
# initial phase
cache_allocation = self.sample_config[self.times]
self.times += 1
return cache_allocation
self.times += 1
if self.duration_period > self.threshold and random.randint(1, 10) < 8:
cache_allocation = []
for app in self.all_apps:
cache_allocation.append(self.curr_best_config[app])
return cache_allocation
else:
contexts = {}
for app in self.all_apps:
A = self.A_c[app]
b = self.b_c[app]
contexts[app] = np.hstack((self.context[app], self.other_context[app]))
for i in range(self.n_arms):
theta = np.linalg.inv(A[i]).dot(b[i])
cntx = np.array(contexts[app])
self.p_c_t[app][i] = theta.T.dot(cntx) + self.alpha * np.sqrt(
cntx.dot(np.linalg.inv(A[i]).dot(cntx)))
cache_action = beam_search(self.n_arms - 1, self.all_apps, self.p_c_t, self.times, end_condition=30)
# cache_action is a dict
cache_allocation = []
for app in self.all_apps:
cache_allocation.append(cache_action[app])
self.alpha *= self.factor_alpha
return cache_allocation
def update(self, reward, chosen_arm):
if self.sampling_model:
# initial phase
self.sample_result.append(float(reward))
else:
if self.curr_best_config is None:
self.curr_best_config = chosen_arm
self.curr_best_reward = reward
self.duration_period = 1
# self.history_reward = []
else:
if reward > self.curr_best_reward:
self.curr_best_reward = reward
self.curr_best_config = chosen_arm
self.duration_period = 1
# self.history_reward = []
else:
self.duration_period += 1
contexts = {}
for app in self.all_apps:
arm = chosen_arm[app]
contexts[app] = np.hstack((self.context[app], self.other_context[app]))
self.A_c[app][arm] += np.outer(np.array(contexts[app]), np.array(contexts[app]))
self.b_c[app][arm] = np.add(self.b_c[app][arm].T, np.array(contexts[app]) * reward).reshape(self.n_features * 2, 1)
if self.times > 200:
if len(self.history_reward) < 60:
self.history_reward.append(round(float(reward), 3))
else:
first_half = self.history_reward[:30]
second_half = self.history_reward[30:]
first_aver = sum(first_half) / len(first_half)
second_aver = sum(second_half) / len(second_half)
# print('{} --> {}, {}'.format(first_aver, second_aver, abs(second_aver - first_aver) / first_aver))
if abs(second_aver - first_aver) / first_aver > 0.20:
# workload change
print('----- test workload change -----')
self.reset()
else:
self.history_reward = second_half
def get_now_reward(self, performance, context_info=None):
# update the context
tmp = [list(row) for row in zip_longest(*context_info, fillvalue=None)]
sum_context = np.zeros(self.n_features)
for i, app in enumerate(self.all_apps):
self.context[app] = tmp[i]
sum_context += np.array(self.context[app])
for app in self.all_apps:
self.other_context[app] = list((sum_context - np.array(self.context[app])) / (len(self.all_apps) - 1))
# calculate the reward
th_reward = sum(float(x) for x in performance) / len(performance)
return th_reward
def reset(self):
self.alpha = self.init_factor[0]
self.times = 0
self.sampling_model = True
self.sample_config = latin_hypercube_sampling(20, self.num_apps, 0, (self.n_arms - 1)//self.ratio, self.ratio)
self.sample_result = []
self.curr_best_config = None
self.curr_best_reward = None
self.duration_period = 0
self.history_reward = []
for app in self.all_apps:
self.A_c[app] = np.zeros((self.n_arms, self.n_features * 2, self.n_features * 2))
self.b_c[app] = np.zeros((self.n_arms, self.n_features * 2, 1))
self.p_c_t[app] = np.zeros(self.n_arms)
for arm in range(self.n_arms):
self.A_c[app][arm] = np.eye(self.n_features * 2)
if __name__ == '__main__':
pass

315
LinUCBSchedule/LinUCB.py Normal file
View File

@ -0,0 +1,315 @@
import pickle
import random
import config
from itertools import zip_longest
from ScheduleFrame import *
def gen_feasible_configs(num_of_cache, cache_top_k):
# TODO根据各个应用的top_k整理出的所有可行的结果
num_app = len(cache_top_k)
top_k = len(cache_top_k[0])
def gen_side(tmp, k, n=1):
"""
:param root: root node, first app node
:param n: n_th in top_k
:return:
"""
if n == num_app:
return [[]]
for k in range(top_k):
t1 = k * top_k ** (num_app - n - 1)
t2 = top_k ** (num_app - 1) - (top_k - k - 1) * top_k ** (num_app - n - 1)
delta2 = top_k ** (num_app - 1 - n)
delta1 = top_k ** (num_app - n)
for i in range(t1, t2, delta1):
for j in range(i, i + delta2):
app_core = cache_top_k[n][k]
feasible_core = num_of_cache - sum(tmp[j])
if app_core > feasible_core:
tmp[j].append(feasible_core)
else:
tmp[j].append(app_core)
gen_side(n=n + 1, tmp=tmp, k=k)
return tmp
all_feasible_config = []
for i in range(len(cache_top_k)):
cache_top_k.append(cache_top_k.pop(0))
for j in range(top_k):
tmp = [[cache_top_k[0][j]] for _ in range(top_k ** (num_app - 1))]
res = gen_side(tmp, k=0)
for k in range(i + 1):
res = [[row[-1]] + row[:-1] for row in res]
all_feasible_config.extend(res)
# 先去重
unique_tuples = set(tuple(x) for x in all_feasible_config)
all_feasible_config = [list(x) for x in unique_tuples]
# 对未利用的资源重新分配
for config in all_feasible_config:
assert sum(config) <= num_of_cache, 'The allocated cache exceeds the limit'
if sum(config) < num_of_cache:
left_cache = num_of_cache - sum(config)
for i in range(left_cache):
min_value = min(config)
min_index = config.index(min_value)
config[min_index] += 1
return all_feasible_config
def get_top_k(arr, k, times):
if times < 5 or random.randint(1, 10) > 8:
arr_top_k_id = [random.randint(0, len(arr) - 1) for _ in range(k)]
else:
arr_top_k_id = np.argsort(arr)[-k:]
return list(arr_top_k_id)
def beam_search(num_of_cache, all_apps, p_c_t, times, end_condition=30):
# TODO从各个子bandit中选出top_k的配置并进行组合形成全局的配置
action = {}.fromkeys(all_apps)
num_app = len(all_apps)
top_k = int(10 ** (np.log10(end_condition) / num_app))
cache_top_k = [get_top_k(p_c_t[all_apps[i]], top_k, times) for i in range(num_app)]
feasible_configs = gen_feasible_configs(num_of_cache=num_of_cache, cache_top_k=cache_top_k)
sum_p_list = []
for config in feasible_configs:
try:
config_p = [p_c_t[all_apps[i]][config[i]] for i in range(num_app)]
sum_p = sum(config_p)
sum_p_list.append(sum_p)
except IndexError as e:
print(f"Caught an IndexError: {e}")
print(config)
print(cache_top_k)
cache_act = feasible_configs[np.argmax(sum_p_list)]
for i in range(num_app):
action[all_apps[i]] = cache_act[i]
return action
def latin_hypercube_sampling(n, d, m, M, ratio):
# Initialize the samples array
samples = np.zeros((n, d))
for i in range(n):
# Generate d random numbers and normalize them
while True:
x = np.random.uniform(m, M, d)
if np.sum(x) >= M:
break
x = x / np.sum(x) * M
# Check if all elements are in the range [m, M]
if np.all(x >= m) and np.all(x <= M):
samples[i] = x
else:
# Re-generate the sample if it doesn't satisfy the constraints
i -= 1
sample_config = []
for i in range(len(samples)):
tmp = [int(ele) for ele in samples[i]]
curr_sum = sum(tmp)
tmp[-1] += M - curr_sum
sample_config.append([ele * ratio for ele in tmp])
return sample_config
class LinUCB(ScheduleFrame):
def __init__(self, all_apps, n_cache, alpha, factor_alpha, n_features, sample=False):
super().__init__()
self.all_apps = all_apps
self.num_apps = len(all_apps)
self.n_arms = n_cache + 1
self.n_features = n_features
self.init_factor = [alpha, factor_alpha]
self.alpha = self.init_factor[0]
self.factor_alpha = self.init_factor[1]
self.times = 0
self.sampling_model = sample
self.sample_result = {}
self.sample_config = None
self.sample_times = config.SAMPLE_TIMES
self.ratio = config.SAMPLE_RATIO
self.curr_best_config = None
self.curr_best_reward = None
self.duration_period = 0
self.threshold = config.APPROXIMATE_CONVERGENCE_THRESHOLD
self.probability_threshold = config.PROBABILITY_THRESHOLD
self.load_change_threshold = config.LOAD_CHANGE_THRESHOLD
self.history_reward_window = config.HISTORY_REWARD_WINDOW
self.history_reward = []
self.A_c = {}
self.b_c = {}
self.p_c_t = {}
for app in self.all_apps:
self.A_c[app] = np.zeros((self.n_arms, self.n_features * 2, self.n_features * 2))
self.b_c[app] = np.zeros((self.n_arms, self.n_features * 2, 1))
self.p_c_t[app] = np.zeros(self.n_arms)
for arm in range(self.n_arms):
self.A_c[app][arm] = np.eye(self.n_features * 2)
# contexts
self.context = {}
self.other_context = {}
sum = np.zeros(n_features)
for app in self.all_apps:
self.context[app] = [1.0 for _ in range(self.n_features)]
sum += np.array(self.context[app])
for app in self.all_apps:
self.other_context[app] = list((sum - np.array(self.context[app])) / (len(self.all_apps) - 1))
# Hypercube Sampling
if self.sampling_model:
self.sample_config = latin_hypercube_sampling(self.sample_times, self.num_apps, 0, (self.n_arms - 1)//self.ratio, self.ratio)
return
def select_arm(self):
if self.sampling_model:
if self.times == len(self.sample_config):
# sample end
self.times += 1
self.sampling_model = False
max_sample_config = list(max(self.sample_result, key=self.sample_result.get))
return max_sample_config
# initial phase
cache_allocation = self.sample_config[self.times]
self.times += 1
return cache_allocation
self.times += 1
if self.duration_period > self.threshold and random.randint(1, 100) < self.probability_threshold:
cache_allocation = []
for app in self.all_apps:
cache_allocation.append(self.curr_best_config[app])
return cache_allocation
else:
contexts = {}
for app in self.all_apps:
A = self.A_c[app]
b = self.b_c[app]
contexts[app] = np.hstack((self.context[app], self.other_context[app]))
for i in range(self.n_arms):
theta = np.linalg.inv(A[i]).dot(b[i])
cntx = np.array(contexts[app])
self.p_c_t[app][i] = theta.T.dot(cntx) + self.alpha * np.sqrt(
cntx.dot(np.linalg.inv(A[i]).dot(cntx)))
cache_action = beam_search(self.n_arms - 1, self.all_apps, self.p_c_t, self.times, end_condition=30)
# cache_action is a dict
cache_allocation = []
for app in self.all_apps:
cache_allocation.append(cache_action[app])
self.alpha *= self.factor_alpha
return cache_allocation
def update(self, reward, chosen_arm):
if self.sampling_model:
# initial phase
curr_config = tuple(chosen_arm.values())
self.sample_result[curr_config] = float(reward)
else:
if self.curr_best_config is None:
self.curr_best_config = chosen_arm
self.curr_best_reward = reward
self.duration_period = 1
# self.history_reward = []
else:
if reward > self.curr_best_reward:
self.curr_best_reward = reward
self.curr_best_config = chosen_arm
self.duration_period = 1
# self.history_reward = []
else:
self.duration_period += 1
contexts = {}
for app in self.all_apps:
arm = chosen_arm[app]
contexts[app] = np.hstack((self.context[app], self.other_context[app]))
self.A_c[app][arm] += np.outer(np.array(contexts[app]), np.array(contexts[app]))
self.b_c[app][arm] = np.add(self.b_c[app][arm].T, np.array(contexts[app]) * reward).reshape(self.n_features * 2, 1)
if self.times > self.load_change_threshold:
if len(self.history_reward) < self.history_reward_window:
self.history_reward.append(float(reward))
else:
half_window = self.history_reward_window // 2
first_half = self.history_reward[:half_window]
second_half = self.history_reward[half_window:]
first_aver = sum(first_half) / len(first_half)
second_aver = sum(second_half) / len(second_half)
# print('{} --> {}, {}'.format(first_aver, second_aver, abs(second_aver - first_aver) / first_aver))
if abs(second_aver - first_aver) / first_aver > config.WORKLOAD_CHANGE:
# workload change
print(f'----- test workload change -----')
self.reset()
else:
self.history_reward.pop(0)
def get_now_reward(self, performance, context_info=None):
# update the context
tmp = [list(row) for row in zip_longest(*context_info, fillvalue=None)]
sum_context = np.zeros(self.n_features)
for i, app in enumerate(self.all_apps):
self.context[app] = tmp[i]
sum_context += np.array(self.context[app])
for app in self.all_apps:
self.other_context[app] = list((sum_context - np.array(self.context[app])) / (len(self.all_apps) - 1))
# calculate the reward for hitrate etc bigger is greater
th_reward = sum(float(x) for x in performance) / len(performance)
return th_reward
# smaller is greater
# aver_latency = sum(float(x) for x in performance) / len(performance)
# th_reward = 100 / aver_latency
# return th_reward, aver_latency
def reset(self):
self.alpha = self.init_factor[0]
self.times = 0
self.sampling_model = True
self.sample_config = latin_hypercube_sampling(self.sample_times, self.num_apps, 0, (self.n_arms - 1)//self.ratio, self.ratio)
self.sample_result = {}
self.curr_best_config = None
self.curr_best_reward = None
self.duration_period = 0
self.history_reward = []
for app in self.all_apps:
self.A_c[app] = np.zeros((self.n_arms, self.n_features * 2, self.n_features * 2))
self.b_c[app] = np.zeros((self.n_arms, self.n_features * 2, 1))
self.p_c_t[app] = np.zeros(self.n_arms)
for arm in range(self.n_arms):
self.A_c[app][arm] = np.eye(self.n_features * 2)
def save_to_pickle(self, filename):
with open(filename, 'wb') as f:
pickle.dump(self, f)
@staticmethod
def load_from_pickle(filename):
with open(filename, 'rb') as f:
return pickle.load(f)
if __name__ == '__main__':
mylist = [1, 2, 3, 4, 5, 6]
mylist2 = [1, 2, 3, 4, 5]
print(mylist2[mylist.index(max(mylist))])
pass

View File

View File

@ -0,0 +1,59 @@
from abc import ABC, abstractmethod
import numpy as np
# from scipy.special import gamma
class ScheduleFrame:
def __init__(self):
pass
@abstractmethod
def select_arm(self):
pass
@abstractmethod
def update(self, reward, chosen_arm):
pass
@abstractmethod
def get_now_reward(self, performance, context_info=None):
pass
def latin_hypercube_sampling(n, d, m, M):
# Initialize the samples array
samples = np.zeros((n, d))
for i in range(n):
# Generate d random numbers and normalize them
while True:
x = np.random.uniform(m, M, d)
if np.sum(x) >= M:
break
x = x / np.sum(x) * M
# Check if all elements are in the range [m, M]
if np.all(x >= m) and np.all(x <= M):
samples[i] = x
else:
# Re-generate the sample if it doesn't satisfy the constraints
i -= 1
for i in range(len(samples)):
samples[i] = [int(ele) for ele in samples[i]]
curr_sum = sum(samples[i])
samples[i][-1] += M - curr_sum
return samples
if __name__ == '__main__':
# Parameters
num_samples = 20 # Number of samples
num_dimensions = 3 # Number of dimensions
min_value = 0 # Minimum value of each element
total_sum = 240 # Total sum of elements in each sample
# Generate samples
samples = latin_hypercube_sampling(num_samples, num_dimensions, min_value, total_sum)
print(samples)
pass

View File

@ -0,0 +1,38 @@
import numpy as np
import matplotlib.pyplot as plt
# 生成数据
x = np.linspace(0, 20000, 1000)
# Sigmoid 函数
def sigmoid(x, k=0.0003, x0=13000):
return 1 / (1 + np.exp(-k * (x - x0)))
# Tanh 函数
def tanh(x, k=0.0005, x0=3000):
return (np.tanh(k * (x - x0)) + 1) / 2
# 指数归一化函数
def exponential_normalize(x, alpha=0.5):
return (x / 6000) ** alpha
# Arctan 函数
def arctan(x, k=0.001, x0=3000):
return 0.5 + (1 / np.pi) * np.arctan(k * (x - x0))
# 映射到 0-1
y1 = sigmoid(x)
y2 = tanh(x)
y3 = exponential_normalize(x)
y4 = arctan(x)
# 绘制图像
plt.plot(x, y1, label='sigmoid')
plt.plot(x, y2, label='tanh')
plt.plot(x, y3, label='exponential_normalize')
plt.plot(x, y4, label='arctan')
plt.legend()
plt.title("Function Mapping")
plt.xlabel("Original Values")
plt.ylabel("Mapped Values (0-1)")
plt.savefig('../figures/activate_fun.png')

View File

@ -0,0 +1,88 @@
import re
import matplotlib.pyplot as plt
with open('../Logs/linucb.log', 'r', encoding='utf-8') as file:
text = file.read()
result = [[float(x) for x in match] for match in re.findall(r'reward : ([\d\.]+) \[(\d+), (\d+)\]', text)]
rewards = [item[0] for item in result]
write_pool_sizes = [item[1] for item in result]
read_pool_sizes = [item[2] for item in result]
# flush_pool_sizes = [item[3] for item in result]
def putAll():
plt.figure(figsize=(15, 6))
plt.plot(write_pool_sizes, label='WritePool Size', marker='o', markersize=5, alpha=0.5)
plt.plot(read_pool_sizes, label='ReadPool Size', marker='s', markersize=5, alpha=0.8)
# plt.plot(flush_pool_sizes, label='FlushPool Size', marker='^', markersize=5, alpha=0.8)
plt.title('Pool Sizes Over Time')
plt.xlabel('Time (Epoch)')
plt.ylabel('Pool Size')
plt.legend()
plt.grid(True)
# plt.show()
plt.savefig('../figures/newPoolSize.png', dpi=300)
def separate():
# Plotting the data in a 2x2 grid
fig, axs = plt.subplots(2, 2, figsize=(18, 10))
# Write Pool size plot
axs[0, 0].plot(write_pool_sizes, marker='o', linestyle='-', color='blue')
axs[0, 0].set_title('Write Pool Size Over Time')
axs[0, 0].set_xlabel('Iteration')
axs[0, 0].set_ylabel('Write Pool Size')
axs[0, 0].grid(True)
# Read Pool size plot
axs[0, 1].plot(read_pool_sizes, marker='^', linestyle='-', color='green')
axs[0, 1].set_title('Read Pool Size Over Time')
axs[0, 1].set_xlabel('Iteration')
axs[0, 1].set_ylabel('Read Pool Size')
axs[0, 1].grid(True)
# Flush Pool size plot
# axs[1, 0].plot(flush_pool_sizes, marker='+', linestyle='-', color='red')
# axs[1, 0].set_title('Flush Pool Size Over Time')
# axs[1, 0].set_xlabel('Iteration')
# axs[1, 0].set_ylabel('Flush Pool Size')
# axs[1, 0].grid(True)
# Reward plot
axs[1, 1].plot(rewards, marker='*', linestyle='-', color='purple')
axs[1, 1].set_title('Reward Over Time')
axs[1, 1].set_xlabel('Iteration')
axs[1, 1].set_ylabel('Reward')
axs[1, 1].grid(True)
plt.tight_layout()
plt.savefig('../figures/pool_sizes_separate.png', dpi=300)
def calculate_greater_than_ratio(pool_sizes, average_size=80):
count_greater= sum(1 for size in pool_sizes if size > average_size)
total_count = len(pool_sizes)
ratio = count_greater / total_count
return ratio
def calculate_all_ratio():
average_size = 120 # 单机unet3d专属
train_point = 20 # 训练开始的节点
write_pool_ratio = calculate_greater_than_ratio(write_pool_sizes[train_point:], average_size)
read_pool_ratio = calculate_greater_than_ratio(read_pool_sizes[train_point:], average_size)
# flush_pool_ratio = calculate_greater_than_ratio(flush_pool_sizes[train_point:], average_size)
print(f'WritePool > 120 Ratio: {write_pool_ratio:.2%}')
print(f'ReadPool > 120 Ratio: {read_pool_ratio:.2%}')
# print(f'FlushPool > 120 Ratio: {flush_pool_ratio:.2%}')
if __name__ == '__main__':
separate()
for i in result:
print(i)

View File

@ -0,0 +1,39 @@
import matplotlib.pyplot as plt
if __name__ == '__main__':
log_name = '../Logs/linucb.log'
index = []
reward = []
count = 1
# with open(log_name, 'r') as log_file:
# for line in log_file.readlines():
# # idx, rew = line.split(' ')
# rew = line
# idx = count
# index.append(idx)
# reward.append(rew)
# count += 1
with open(log_name, 'r') as log_file:
for line in log_file.readlines():
if 'reward' in line:
idx = count
reward_str = line.split(':')[1].split('[')[0].strip() # 提取 reward 数值部分
reward.append(float(reward_str))
index.append(idx)
count += 1
reward = [float(i) for i in reward]
# best_score = 0.32
best_score = reward[0] * 1.2
target = [best_score] * len(index)
init_score = [reward[0]] * len(index)
plt.plot(index, reward)
plt.plot(index, target)
plt.plot(index, init_score)
# 设置刻度
plt.xticks(list(range(0, int(index[-1]), int(int(index[-1])/20))), rotation = 45)
plt.yticks([i * 0.2 for i in range(0, int(best_score // 0.2) + 2, 1)])
plt.title('OLUCB')
# plt.show()
plt.savefig('../figures/OLUCB_JYCache.png')

23
LinUCBSchedule/config.py Normal file
View File

@ -0,0 +1,23 @@
# The number of hypercube samples in the initial stage
SAMPLE_TIMES = 20
# Ratio of sample, if ratio is 10 and num_cache is 100, valid solution in sample is 0, 10, 20, 30 ...... 100
SAMPLE_RATIO = 10
# If no better config is found within threshold,
# it is determined that the model has entered a state of approximate convergence
APPROXIMATE_CONVERGENCE_THRESHOLD = 150
# In approximate convergence state,There is a certain probability to directly choose the optimal solution
# that has been explored, otherwise continue to explore
PROBABILITY_THRESHOLD = 80
# It takes time for rewards to level off, only after this threshold, detection begins for load changes
LOAD_CHANGE_THRESHOLD = 50
# The size of history reward sliding window
HISTORY_REWARD_WINDOW = 50
# Basis for workload changes
WORKLOAD_CHANGE = 0.30

View File

221
LinUCBSchedule/main.py Normal file
View File

@ -0,0 +1,221 @@
import csv
import os
import sys
import time
import numpy as np
from LinUCB import *
from EpsilonGreedy import *
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
sys.path.append(rootPath)
from util import *
NUM_RESOURCES_CACHE = 60 #total units of cache size
def for_epsilon_greedy():
cm = simulation_config_management()
curr_config = cm.receive_config()
all_app = curr_config[0]
num_resources = int(np.sum(curr_config[1]))
chosen_arm = {}
curr_allocate = curr_config[1]
for j in range(len(all_app)):
chosen_arm[all_app[j]] = curr_allocate[j]
# initialize parameters
epsilon = 0.5
factor_alpha = 0.98
epochs = 3000
ucb_cache = EpsilonGreedy(epsilon, factor_alpha, len(all_app), num_resources)
file_ = open('linucb.log', 'w', newline='')
start_time = time.time()
for i in range(epochs):
performance = curr_config[2]
th_reward = ucb_cache.get_now_reward(performance)
ucb_cache.update(th_reward, chosen_arm)
# choose an arm
chosen_arm = ucb_cache.select_arm()
# prepare new config
new_config = [curr_config[0], chosen_arm]
cm.send_config(new_config)
# waiting for result
curr_config = cm.receive_config()
# write to log
log_info = str(i) + ' ' + str(th_reward) + '\n'
file_.write(log_info)
if (i + 1) % 100 == 0:
print('epoch [{} / {}]'.format(i + 1, epochs))
end_time = time.time()
print('used time :{}'.format(end_time - start_time))
file_.close()
def for_linucb():
cm = simulation_config_management()
curr_config = cm.receive_config()
all_app = curr_config[0]
num_resources = int(np.sum(curr_config[1]))
alpha = 0.90
factor_alpha = 0.98
n_features = 10
epochs = 1000
ucb_cache = LinUCB(all_app, num_resources, alpha, factor_alpha, n_features)
file_ = open('linucb.log', 'w', newline='')
start_time = time.time()
for i in range(epochs):
curr_allocate = curr_config[1]
performance = curr_config[2]
context_info = curr_config[3:]
chosen_arm = {}
for j in range(len(all_app)):
chosen_arm[all_app[j]] = curr_allocate[j]
th_reward = ucb_cache.get_now_reward(performance, context_info)
ucb_cache.update(th_reward, chosen_arm)
new_arm = ucb_cache.select_arm()
# prepare new config
new_config = [curr_config[0], new_arm]
cm.send_config(new_config)
# waiting for result
curr_config = cm.receive_config()
# write to log
log_info = str(i) + ' ' + str(th_reward) + '\n'
file_.write(log_info)
if (i + 1) % 100 == 0:
print('epoch [{} / {}]'.format(i + 1, epochs))
end_time = time.time()
print('used time :{}'.format(end_time - start_time))
file_.close()
def for_linucb_sample():
cm = simulation_config_management()
curr_config = cm.receive_config()
all_app = curr_config[0]
num_resources = int(np.sum(curr_config[1]))
alpha = 0.95
factor_alpha = 0.98
n_features = 10
epochs = 3000
ucb_cache = LinUCB(all_app, num_resources, alpha, factor_alpha, n_features, True)
file_ = open('linucb.log', 'w', newline='')
start_time = time.time()
for i in range(epochs):
new_arm = ucb_cache.select_arm()
# prepare new config
new_config = [curr_config[0], new_arm]
cm.send_config(new_config)
# waiting for result
curr_config = cm.receive_config()
curr_allocate = curr_config[1]
performance = curr_config[2]
context_info = curr_config[3:]
chosen_arm = {}
for j in range(len(all_app)):
chosen_arm[all_app[j]] = curr_allocate[j]
th_reward = ucb_cache.get_now_reward(performance, context_info)
ucb_cache.update(th_reward, chosen_arm)
# write to log
log_info = str(i) + ' ' + str(th_reward) + '\n'
file_.write(log_info)
if (i + 1) % 100 == 0:
print('epoch [{} / {}]'.format(i + 1, epochs))
end_time = time.time()
print('used time :{}'.format(end_time - start_time))
file_.close()
if __name__ == '__main__':
# for_epsilon_greedy()
for_linucb_sample()
# cm = simulation_config_management()
# curr_config = cm.receive_config()
#
# num_apps = len(curr_config[0])
# num_resources = [NUM_RESOURCES_CACHE]
# all_cache_config = gen_all_config(num_apps, num_resources[0])
# num_config = len(all_cache_config)
#
# epochs = 1000
# file_ = open('linucb.log', 'w', newline='')
#
# app_name = curr_config[0]
# cache_config = curr_config[1]
# cache_hit = curr_config[2]
# cache_reward = curr_config[3]
# init_cache_index = all_cache_config.index(cache_config)
#
# # context infomation and reward
# context, th_reward = get_now_reward(cache_hit)
#
# # initialize parameters
# alpha = 0.999
# factor_alpha = 0.997
#
# n_features_cache = len(context)
# context_cache = np.array(context)
#
# num_arm = num_config
# lin_ucb_cache = LinUCB(alpha, num_arm, n_features_cache)
# # lin_ucb_cache = EpsilonGreedy(alpha, num_arm, n_features_cache)
#
# lin_ucb_cache.update(init_cache_index, th_reward, context_cache)
# start_time = time.time()
# for i in range(epochs):
# context, th_reward = get_now_reward(cache_hit)
# # print('----- reward is: ' + str(th_reward))
# context_cache = np.array(context)
# # choose a arm
# chosen_arm_cache = lin_ucb_cache.select_arm(context_cache, factor_alpha)
# cache_index = chosen_arm_cache
# # prepare new config
# new_config = []
# new_config.append(curr_config[0])
# new_partition = all_cache_config[cache_index]
# new_config.append(new_partition)
#
# # print('new config:' + str(new_config))
# cm.send_config(new_config)
#
# # waiting for result
# curr_config = cm.receive_config()
# # print('recv_config is: ' + str(curr_config))
# app_name = curr_config[0]
# cache_config = curr_config[1]
# cache_hit = curr_config[2]
# cache_reward = curr_config[3]
#
# context, th_reward = get_now_reward(cache_hit)
# context_cache = np.array(context)
# lin_ucb_cache.update(chosen_arm_cache, th_reward, context_cache)
# # write to log
# log_info = str(i) + ' ' + str(th_reward) + '\n'
# file_.write(log_info)
# if (i + 1) % 100 == 0:
# print('epoch [{} / {}]'.format(i + 1, epochs))
# end_time = time.time()
# print('used time :{}'.format(end_time-start_time))
# file_.close()

101
LinUCBSchedule/server.py Normal file
View File

@ -0,0 +1,101 @@
import csv
import os
import sys
import threading
import socket
import time
import numpy as np
from LinUCB import *
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
sys.path.append(rootPath)
from server_util import *
NUM_RESOURCES_CACHE = 352 #total units of cache size 176*2
# initialize LinUCB parameters
alpha = 0.95
factor_alpha = 0.98
num_features = 1 # 缓冲池特征维度
# all_app = ['WritePool', 'ReadPool', 'FlushPool']
all_app = ['WritePool', 'ReadPool']
# 收到客户端发送的配置信息后使用LinUCB,生成最新的配置信息,再发送给客户端
def handle_client(client_socket, ucb_cache, file_writer):
try:
# 从curve接收 pool_name, pool_size, pool_throughput, write read 函数的调用次数
request = client_socket.recv(1024).decode("utf-8")
print(f"Received: {request}")
pool_and_size, pool_and_throughput, pool_and_invoke = process_request(request)
pool_name = list(pool_and_size.keys())
pool_size = list(pool_and_size.values())
pool_throughput = list(pool_and_throughput.values())
context_info = [list(pool_and_invoke.values())]
if(np.sum(pool_throughput) > 0):
start_time = time.time()
chosen_arm = {}
for j in range(len(all_app)):
chosen_arm[all_app[j]] = pool_size[j]
# reward
th_reward = ucb_cache.get_now_reward(pool_throughput, context_info)
ucb_cache.update(th_reward, chosen_arm)
new_size = ucb_cache.select_arm()
end_time = time.time()
# write to log
start_time_log = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
end_time_log = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
file_writer.write('start time : ' + start_time_log + '\n')
run_time = end_time - start_time
file_writer.write(f'run time : {run_time:.2f} s\n')
file_writer.write('end time : ' + end_time_log + '\n')
log_info = 'reward : ' + str(th_reward) + ' ' + str(new_size) + '\n'
file_writer.write(log_info)
else:
new_size = pool_size
# prepare new config
new_config = [pool_name, new_size]
print('new config:' + str(new_config))
# server发送最新的配置信息给curve
serialized_data = '\n'.join([' '.join(map(str, row)) for row in new_config])
client_socket.send(serialized_data.encode())
print("server发送最新配置给client:")
print(serialized_data.encode())
# except Exception as e:
# print(f"An error occurred: {e}")
finally:
client_socket.close()
# 启动server以监听curve发送的配置信息
def start_server():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("0.0.0.0", 2333))
# 允许同时最多5个客户端排队等待连接
server.listen(5)
print("Server listening on port 2333")
# 初始化LinUCB
file_ = open('Logs/linucb.log', 'w', newline='')
num_resources = NUM_RESOURCES_CACHE
# 开启采样
ucb_cache = LinUCB(all_app, num_resources, alpha, factor_alpha, num_features, True)
while True:
client_socket, addr = server.accept()
print(f"Accepted connection from {addr}")
client_handler = threading.Thread(
target=handle_client, args=(client_socket, ucb_cache, file_)
)
client_handler.start()
file_.close()
if __name__ == '__main__':
start_server()

View File

@ -0,0 +1,25 @@
import numpy as np
# unet3d x0 设置为 3000对于 resnet50 x0 设置为 30000
# s3fs x0 设置为13000, k = 0.0003
def sigmoid(x, k=0.0003, x0=13000):
return 1 / (1 + np.exp(-k * (x - x0)))
# 接收curve发送的request解析为pool_and_size, pool_and_throughput
def process_request(request):
pairs = request.split(';')[:-1]
pool_and_size = {}
pool_and_throughput = {}
pool_and_invoke = {}
pair1 = pairs[:2]
pair2 = pairs[2:4]
pair3 = pairs[4:]
for pair in pair1:
key,value = pair.split(':')
pool_and_size[key] = int(value)
for pair in pair2:
key,value = pair.split(':')
pool_and_throughput[key] = float(value)
for pair in pair3:
key,value = pair.split(':')
pool_and_invoke[key] = sigmoid(float(value))
return pool_and_size, pool_and_throughput, pool_and_invoke

248
LinUCBSchedule/util.py Normal file
View File

@ -0,0 +1,248 @@
import socket
import pickle
import time
host = '127.0.0.1'
port = 54000
def get_last_line(file_name):
"""
Open file and read the last line
Args:
file_name (str): file name
Returns:
str: last line of the file, '' if the file not found
"""
try:
with open(file_name, 'rb') as file:
file.seek(-2,2)
while file.read(1)!=b'\n':
file.seek(-2, 1)
return file.readline().decode().strip()
except FileNotFoundError:
return ''
def get_pool_stats():
"""
Communicating with cache server through socket, get current cache pool name and cache allocation
Args:
Returns:
map<poolname->poolsize>: all pool in the cache and their pool size
"""
# link the target program
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
message = "G:"
# print(message)
sock.sendall(message.encode())
# wait the response
response = sock.recv(1024).decode()
# print(response)
sock.close()
deserialized_map = {}
pairs = response.split(';')[:-1]
for pair in pairs:
key,value = pair.split(':')
deserialized_map[key] = int(value)
return deserialized_map
def set_cache_size(workloads, cache_size):
"""
Communicating with cache server through socket, adjust pool size to the new size
Args:
workloads (list<str>): pool name of all workloads
cache_size (list<int>): new size of each pool
Returns:
"""
# link the server program
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
curr_config = [workloads, cache_size]
serialized_data = '\n'.join([' '.join(map(str, row)) for row in curr_config])
serialized_data = 'S:' + serialized_data
# print(serialized_data)
# send to server
sock.sendall(serialized_data.encode())
sock.close()
def receive_config():
"""
Old version, Wait to receive the current resource config
Args:
Returns:
list: [
[name of pool],
[allocation of resource A of every pool], # such as cache size,[16,16]
[context of resource A of every pool ], # for cache size ,context canbe hit_rate,[0.8254, 0.7563]
[latency of every warkload]
]
"""
# create the server socket
# print("receive config")
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('127.0.0.1', 1412))
server_socket.listen(1)
# listening the old_config from the executor
client_socket, _ = server_socket.accept()
received_data = client_socket.recv(1024)
config = pickle.loads(received_data)
client_socket.close()
server_socket.close()
return config
def send_config(new_config):
"""
Old version, Send the new config
Args:
list: [
[name of pool],
[new allocation of resource A of every pool]
]
Returns:
"""
serialized_config = pickle.dumps(new_config)
# connect to the bench
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(('127.0.0.1', 1413))
client_socket.send(serialized_config)
client_socket.close()
print("send_config success!")
return
class config_management:
def __init__(self) -> None:
pass
def receive_config(self):
curr_config = []
# pool name and cache allocation
pool_and_size = get_pool_stats()
pool_name = list(pool_and_size.keys())
pool_size = list(pool_and_size.values())
curr_config.append(pool_name)
curr_config.append(pool_size)
#TODO: cache hit rate
hitrate_log = ['/home/md/SHMCachelib/bin/' + name + '_hitrate.log' for name in pool_name]
hitrates = []
for log in hitrate_log:
hitrates.append(get_last_line(log))
curr_config.append(hitrates)
#TODO: tail latency
latency_log = ['/home/md/SHMCachelib/bin/' + name + '_tailLatency.log' for name in pool_name]
latencies = []
for log in latency_log:
latencies.append(get_last_line(log))
curr_config.append(latencies)
return curr_config
def send_config(self, new_config):
set_cache_size(new_config[0], new_config[1])
def userA_func(resources, threshold):
if threshold < 0:
return 0
if resources < threshold:
return resources * 0.095
else:
return threshold * 0.095 + (resources - threshold) * 0.010
def userB_func(resources, threshold):
if threshold < 0:
return 0
if resources < threshold:
return resources * 0.040
else:
return threshold * 0.040 + (resources - threshold) * 0.005
def uniform(resources):
return min(resources, 240) * 0.003
def sequential(resources):
return 0
def hotspot(resources):
return min(resources, 60) * 0.007 + min(max(0, resources - 60), 180) * 0.002
class userModel:
def __init__(self, name, resources=0, user_func=None):
self.name = name
self.resources = resources
self.user_func = user_func
class simulation_config_management:
def __init__(self):
self.total_resource = 240
self.all_user = [
userModel('A', 0, sequential),
userModel('B', 0, uniform),
userModel('C', 0, hotspot),
]
for u in self.all_user:
u.resources = self.total_resource // len(self.all_user)
self.n_features = 10
self.workload_change = 1000
self.counter = 0
def receive_config(self):
curr_config = []
curr_config.append([u.name for u in self.all_user])
curr_config.append([u.resources for u in self.all_user])
curr_config.append([u.user_func(u.resources) for u in self.all_user])
# context info
for i in range(self.n_features):
curr_config.append([1.0] * len(self.all_user))
self.counter = self.counter + 1
if self.workload_change > 0 and self.counter % self.workload_change == 0:
print('---------------------------workload change -----------------------------')
self.all_user[0].user_func, self.all_user[1].user_func = self.all_user[1].user_func, self.all_user[0].user_func
self.all_user[1].user_func, self.all_user[2].user_func = self.all_user[2].user_func, self.all_user[1].user_func
return curr_config
def send_config(self, new_config):
for i in range(len(self.all_user)):
self.all_user[i].resources = new_config[1][i]
if __name__ == '__main__':
time.sleep(10)
cs = config_management()
curr = cs.receive_config()
for item in curr:
print(item)

View File

@ -1,4 +1,4 @@
# JYCache
# JYCache
**九源缓存存储系统简称JYCache** 是一款面向个人使用、大模型训练推理等多种场景适配大容量对象存储等多种底层存储形态高性能、易扩展的分布式缓存存储系统。通过层次化架构、接入层优化、I/O优化等多种组合优化JYCache 不仅支持文件顺序/随机读写,其读写性能也领先国际主流产品 Alluxio。JYCache 现支持在 X86Intel、AMD、海光等及 ARM鲲鹏、飞腾等平台下运行。

0
build.sh Executable file → Normal file
View File

View File

@ -19,6 +19,7 @@ WriteCacheConfig.CacheConfig.PageBodySize # 写缓存page大小
WriteCacheConfig.CacheConfig.PageMetaSize # 写缓存page元数据大小
WriteCacheConfig.CacheConfig.EnableCAS # 写缓存是否启用CAS
WriteCacheConfig.CacheSafeRatio # 写缓存安全容量阈值(百分比), 缓存达到阈值时阻塞待异步flush释放空间
WriteCacheConfig.EnableThrottle # 本地写缓存是否开启限流策略1为开启0为关闭
# GlobalCache
UseGlobalCache # 全局缓存开关
@ -36,3 +37,5 @@ LogLevel # 日志级别INFO=0, WARNING=1, ERROR=2, FATAL=3
EnableLog # 是否启用日志打印
FlushToRead # 文件flush完成后是否写入读缓存
CleanCacheByOpen # 文件open时是否清理读缓存
EnableResize # 是否将WriteCache和ReadCache放在一个Cachelib下单独开启时与原框架几乎没有差异
EnableLinUCB # 是否开启LinUCB进行调整可单独开启

Binary file not shown.

After

Width:  |  Height:  |  Size: 77 KiB

BIN
doc/image/OLUCB_CS.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 66 KiB

BIN
doc/image/OLUCB_frame.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 75 KiB

BIN
doc/image/result_LinUCB.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 356 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 331 KiB

BIN
doc/image/serial_read.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 133 KiB

350
doc/技术报告.md Normal file
View File

@ -0,0 +1,350 @@
# 一、引言
JYCache是一款面向个人使用、大模型训练推理等多种场景适配大容量对象存储等多种底层存储形态高性能、易扩展的分布式缓存存储系统。对每台推理用机其内部维护一个“客户端缓存模块”包含独立的读写缓存组件和数据源访问组件。通过层次化架构、接入层优化、I/O优化等多种组合优化JYCache 不仅支持文件顺序/随机读写,其读写性能也领先于国际主流产品 Alluxio。但是由于设计中读写缓存之间彼此独立且不可调节现有的工作包含以下几个不足之处
1. 资源利用不均衡:由于读写缓存大小固定,系统可能在某些工作负载下无法充分利用缓存。
例如在读密集型工作负载中,可能出现读缓存已满而写缓存大量空余的情况。
2. 缓存抖动与性能瓶颈当某一类操作如读或写消耗的缓存空间不足时数据会频繁被驱逐导致缓存抖动。这种抖动会增加IO操作降低缓存系统的性能。
3. 无法应对动态场景:在实际运用场景中,各个应用程序的访存模式会随着运行阶段的不同而发生变化。
固定大小的缓存分配方式无法灵活应对这些变化,导致性能下降。
为解决上述问题我们提出了一种新的强化学习方案OLUCB。该方案能够检测真实应用环境中读写缓存的配置情况和访存性能并根据工作负载特征对缓存配置做出实时调整有效增强了原有缓存系统的资源利用率提升读写性能。
# 二、系统架构概览
本章节介绍了我们所提出的方案架构设计以及具体模块划分显示。我们使用CMAB(contextual multi-armed bandit)对面向黑盒任务混部的资源划分问题进行建模并使用CMAB的一种广泛使用于大量场景的算法——LinUCB来设计模型。该算法选取原因如下
1. JYCache运行时内部为黑盒环境我们无法提前大量采集数据建立性能模型也无法获取任何用户提供信息。LinUCB在做决策时不需要任何事先了解工作所需要的信息上下文信息和奖励可以从跟环境交互中得到。
2. 资源划分问题可以自然地建模为MAB问题其中划分方案对应于臂一个臂的预期回报即为相应划分方案的性能。
3. LinUCB算法计算开销小满足JYCache系统隐含要求的快速响应需求。
## 第一节 系统架构图
我们将结合OLUCB方案的JYCache系统架构设计如下图所示。即相比于原JYCache系统主要修改集中在本地缓存一侧。我们将客户端缓存模块中原先彼此独立的读缓存与写缓存进行合并再分配处理而具体分配策略依赖于OLUCB根据实时运行环境下的工作负载特征所作出的决策。
![](image\OLUCB_frame.png)
## 第二节 模块划分
如架构图所示在OLUCB方案中我们采用C/S架构。其中Client运行在JYCache系统中其主要功能如下
1. 每隔一段时间收集当前读写缓冲池的大小配置信息以及统计缓存访问信息将汇总结果发送给Server端。
2. 在得到Server端发来的配置决策后调整自身读写缓冲池的比例。
Server端不间断地监听端口信息并在自身维护强化学习模型OLUCB。当接收到Client发送来的信息时Server使用该模型不断学习并作出新一轮的缓冲池配置决策发送给Client。图示如下
![](image\OLUCB_CS.png)
# 三、详细设计
本章节详细介绍了我们所设计的方案具体内容。
## 第一节 模块描述
本小节介绍了Client模块和Server模块的具体内容。
### 3.1.1 Server模块
Server模块独立运行在本机特定端口```2333```上其主要功能为和Client进行通信和维护一个强化学习模型。模型具体算法详见第二节。
### 3.1.2 Client模块
Client模块运行在JYCache系统内部单独开启一个线程用于统计、通信和执行配置。它首先会统计运行时间内写缓存和读缓存的访问情况具体表现为系统执行读/写操作的访问字节数、访问耗时和操作次数。当达到一定时间间隔后将计算出这段时间内读缓存和写缓存各自的吞吐量把经过处理的吞吐量信息作为性能指标操作次数作为上下文信息两者合并发送给Server端。
在等待一个极短的时间后Client从server端获取下一个时间段内的读写缓存配置信息。它首先根据发来的配置信息计算出该在当前读写缓存配置基础上应当增加/减少的字节数,首先进行缩池操作以保证当前缓冲池不溢出,在缩池时始终为读写缓存保留一定空间以防止系统崩溃。之后对剩下的另一个缓存池进行增池操作,最后更新两者当前的配置信息。
## 第二节 算法和逻辑
本小节具体介绍了我们所提出的强化学习算法OLUCB的运行逻辑。
### 3.2.1 以任务为导向的多层次臂结构
在原始的LinUCB算法中每一个bandit都需要维护一个参数矩阵A一个参数向量b和一个概率分布矩阵p每次bandit选择一个臂a后都要相应地更新该臂对应的A和b参数。需要探索的资源划分方案的总数是非常巨大的。为每个划分方案构造一个臂的原始 LinUCB 方法是不切实际的。在多用户资源调优场景下总的资源分配方案复杂度为O(n!)级别以三个任务240单位资源为例分配方案数达到29000+。所以为了保持合理的臂的数量OLUCB为每个任务的每一维度资源维护一个bandit将每个bandit的复杂度降为O(n)。针对多个bandit的结果依据每个bandit得出的top_k的分配进行束搜索形成完整的全局配置。
### 3.2.2 奖励设计
在我们的奖励方案中使用读缓存和写缓存各自的吞吐量作为奖励值性能指标以最大化两者的吞吐量作为最终目的。为了防止奖励值的波动过于巨大我们在Client一侧把计算所得的写入吞吐量/1024/1024后使用```std::tanh```映射到[0,1]区间。而由于对于相同长度的数据在无需从远端获取的情况下读取时间约是写入时间的4倍故读取吞吐量在原始基础上/1024/256后使用```std::tanh```映射到[0,1]区间。最终奖励为两者取平均值。
### 3.2.3 上下文特征选择
OLUCB使用读写函数的调用次数作为上下文信息由Client端负责收集统计和传给Server。上下文特征能够指导对搜索空间的搜索。对每一个臂我们维护两个上下文矩阵分别为自身上下文信息和相较于其他任务的特征差值。纳入其他任务的特征的目的是让在不同资源之间对臂的选择不独立。
### 3.2.4 臂选择策略
我们认为读写缓存资源在此处为完全隔离地被划分的,即同一块缓存空间不可能同时被读缓存/写缓存利用。在进行臂的选择时采用了beam-search算法其主要思想是依次选择任务的臂始终保持目前为止的提供更大奖励的概率最高的前L个部分选择结果最终在这些候选结果中选择最佳的。
简单而言beam-search算法以顺序的方式来生成最终决策它先选出能为
第一个任务提供更大奖励的概率最高的前 L 个臂;再在这些情况下,选择第二个任务中提供更大奖励的概率最高的前 L 个臂;直至最后一个任务;最后将所有任务选择的臂对应的概率之和最高的可行方案作为最终的决策。虽然 beam-search算法是序列决策但是其实与任务的顺序无关因为似乎先选的任务有更多的分配选择但并不意味着他们有更高的概率获得更多的资源。所有的任务都共享相同的奖励而奖励是由所有任务的表现决定的也就是每个任务应该选择的臂是所有任务的表现共同决定的。不恰当的资源分配 (例如分配过多的资源到前面的任务) 会造成负面的奖励,这将使得各 bandit 立即调整他们的选择。
### 3.2.5 超立方体采样
OLUCB是一种平衡探索与利用的算法早期阶段的探索非常关键因此其在初始运行阶段我们采用超立方采样策略进行解的随机生成。随机均匀采样可以确保OLUCB从一开始就对整个输入空间有更全面的探索能够加速模型收敛从而减少不必要的探索开销提升整体效率。
### 3.2.6 动态负载检测
在实际运行环境中各个任务在不同运行阶段的访存特征各不相同因此最佳缓存配置也各不相同。由于负载特征的变化在数值上体现为reward的变化为了使得负载特征变化后模型能识别出环境发生变化并重新开始迭代学习OLUCB维护了一个滑动窗口用于记录历史奖励。当判别到窗口的前半段均值与后半段均值差异达到一定值时OLUCB判断负载场景可能发生了变化将重新回到采样阶段开始搜索和学习寻找新的最优解。
### 3.2.7 OLUCB阶段总结
OLUCB算法迭代过程大致可分为以下三个阶段
1. 初始化采样OLUCB将在可行解空间中进行均匀采样获取一定数量的采样点在模型启动的一定轮次内将依次给出采样的解不计算预期奖励根据环境中的反馈OLUCB将以采样点中的最优解为起点开始下一阶段的迭代训练。
2. 迭代训练:训练过程中,每一轮次将根据上下文信息,历史奖励,上置信区间计算所有配置的预期奖励,选择预期奖励最高的配置。随着轮次的增长,上置信区间在预期奖励计算中所占的比重将逐渐降低,模型会逐渐倾向选择历史平均奖励更高的配置。
3. 近似收敛OLUCB维护截至当前轮次取得的最优配置在一定轮次内如未探索到更优配置则判断进行近似收敛状态。近似收敛状态下每一次选择配置时有较大的概率如90%直接选择当前已探索的最优解否则10%)继续通过计算预期奖励的方式选择配置方案,保持对环境的进一步探索。如通过后者搜索到了相较当前配置的更优解,模型将重新回到迭代训练状态。
## 第三节 接口规范
### 3.3.1 通信设计
Server和Client之间使用网络套接字socket进行通信Server端保持监听```127.0.0.1```的```2333```端口。
通信内容格式规定如下:
#### Client通信格式设计
Client负责收集和统计当前缓存相关信息具体包括当前读写缓存的大小配置信息、吞吐量以及这段时间内读写操作的频繁程度。其格式如下
```bash
"WritePool:"writeCacheSize_/fixSize; // 当前写缓存大小
"ReadPool:"readCacheSize_/fixSize;// 当前读缓存大小
"WritePool:"writeThroughput; // 写入吞吐量
"ReadPool:"readThroughput;// 读取吞吐量
"WritePool:"writeCount_; // 写入函数调用次数
"ReadPool:"readCount_;// 读取函数调用次数
```
- 当前读写缓存大小配置1~2行
- fixSize为读写缓存的分配单位大小为256M。在调节读/写缓存大小时粒度对齐到以fixSize为单位便于强化学习模型进行决策。
- CacheSize_/fixSize结果为当前读/写缓存所占大小。
- 读/写吞吐量3~4行
- 所统计的吞吐量为上一次Resize操作后到下一次Resize操作前的时间间隔内数据由Client负责计算作为强化学习模型每一步决策的reward值。
- 原始信息为s3fs内部Put和Get函数所记录的读写长度与读写时间为了避免reward值过大及不稳定计算吞吐量时会将writeThroughput在原始基础上/1024/1024后使用```std::tanh```映射到[0,1]区间。而由于对于相同长度的数据在无需从远端获取的情况下读取时间约是写入时间的4倍故readThroughput在原始基础上/1024/256后使用```std::tanh```映射到[0,1]区间。
- 上下文信息5~6行
- Count_分别为s3fs中Put()函数和Get()函数的调用次数Client将其作为附加的上下文信息发给Server便于强化学习模型调优。
- Server端获取调用次数后使用```sigmoid```函数$f(x) = \frac{1}{1 + e^{-k(x - x_0)}}$ 进行平滑处理在s3fs中设置k=0.0003, x0=13000
#### Server通信格式设计
Server端在接收Client发来的缓存配置相关信息后将使用本地的强化学习模型OLUCB对其进行分析决策出下一轮次读写缓存的大小配置并将该信息发送给Client通信内容格式如下
```bash
"WritePool" "ReadPool"
WriteSize ReadSize
```
第一行为写缓存与读缓存的名称第二行为Server端使用强化学习模型OLUCB基于所收集信息做出的最终决策。
WriteSize/ReadSize为所获得资源单位个数。
### 3.3.2 OLUCB参数说明
本小节列举出了OLUCB模型中的超参数及其意义该部分实现在LinUCBSchedule/config.py文件中
| 参数 | 解释 | 类型 |
|-----------|----------| -------|
| SAMPLE_TIMES | 初始化采样阶段的轮次数 | int |
| SAMPLE_RATIO | 采样时的放缩比例若总数100放缩比例为10则只会出现102030... | int |
|APPROXIMATE_CONVERGENCE_THRESHOLD | 多少轮次内未观察到更优结果,将进入近似收敛状态 | int |
| PROBABILITY_THRESHOLD | 利用率X%的比例选择最优值,(100-X)%探索 | int |
| WORKLOAD_CHANGE | 一段时间内性能变化超过一定幅度时,判断发生了负载变化 | float |
| LOAD_CHANGE_THRESHOLD | 超过一定轮次后reward将大致稳定将开始判断是否发生负载变化 | int|
| HISTORY_REWARD_WINDOW | 用于检查工作负载是否发生变化的滑动窗口大小 | int |
### 3.3.3 OLUCB模型方法及其参数定义
1. 构造函数__init__
```json
"__init__":{
"description":"OLUCB模型构造方法",
"parameters":{
"all_apps":{
"type":"list<str>",
"description": "每个任务以一个字符串作为标识all_apps为包含所有任务标识符的字符串列表",
"example":['writePool', 'readPool']
},
"n_cache":{
"type":"int",
"description": "资源的总数如缓存资源84G缓存以256M作为一个单位共336单位的资源",
"example":336
},
"alpha":{
"type":"float",
"description": "超参数alpha上置信区间在预期奖励计算中所占的比重",
"minimum": 0
"maximum": 1.0
"example":0.95
},
"factor_alpha":{
"type":"float",
"description": "衰减因子超参数alpha将随着轮次增加而逐渐减小",
"minimum": 0
"maximum": 1.0
"example":0.98
},
"n_features":{
"type":"int",
"description": "上下文特征的维度数",
"example":1
},
"sample":{
"type":"bool",
"description": "可选,是否进行初始化采样,或直接以环境的当前配置作为初始解",
"default":True
}
}
}
```
2. 臂选择函数select_arm
```json
"select_arm":{
"description":"选择一个新的配置",
"parameters":None,
"return_value":{
"type":"list<int>",
"description": "按照all_apps的顺序分配给每一个任务的资源单位数",
"example":[176,176]
}
}
```
3. 奖励计算函数get_now_reward
```json
"get_now_reward":{
"description":"更新上下文信息根据性能指标计算reward奖励",
"parameters":{
"performance":{
"type":"list<float>",
"description":"包含各个任务性能指标的列表",
"example":[0.85, 0.50]
},
"context_info":{
"type":"list<list<float>>",
"description":"当前环境上下文信息二维列表行数为context维度列数为任务数",
"example":[
[feature1_writePool, feature1_readPool],
[feature1_writePool, feature2_readPool],
......
[featureN_writePool, featureN_readPool]
]
}
},
"return_value":{
"type":"float",
"description":"根据performance性能指标计算出的reward,介于0-1越接近于1性能越好",
"minimum": 0,
"maximum": 1.0,
"example": 0.55,
}
"remark":"get_now_reward方法需根据实际情况调整performance和reward之间的映射关系reward越接近1性能越好",
```
4. 更新函数update
```json
"update":{
"description":"根据reward更新模型中arm的参数",
"parameters":{
"reward":{
"type":"float",
"description":"get_now_reward函数中得出的reward信息",
"minimum": 0,
"maximum": 1.0,
"example": 0.55,
}
"chosen_arm":{
"type":"dict<str->int>",
"description":"环境中当前真实的资源分配情况",
"example":{'writePool':176, 'readPool':176}
}
}
}
```
# 四、使用指南
本节介绍了如何在JYCache中实施OLUCB策略具体如下
1. JYCache路径下
```bash
# 1.从源码构建
sh build.sh
# 2.系统安装
sh install.sh
# 3.开启LinUCB服务器端不开启LinUCB调整可跳过此项
cd LinUCBSchedule && python3 server.py
```
2. JYCache/JYCache_Env路径下
```bash
# 1.开启minio
cd ./minio && sh start.sh && cd ..
# 2.修改配置文件
vim conf/newcache.conf
# 下为新增:
WriteCacheConfig.EnableThrottle=0
EnableResize = 1
EnableLinUCB = 1
# 下为修改
UseGlobalCache=0 #不开启全局缓存
# 3.启动s3fs
sh start_s3fs.sh
```
# 五、实验评估
本节重点介绍了实施OLUCB方案后缓存系统相比较于原来的读写性能提升情况包含实验环境、测试参数以及测试结果等内容。
## 第一节 实验环境
- 实验平台Linux 5.4.0-80-generic (Optane07服务器)
- 资源:本地读写缓存
- 测试工具FIO
## 第一节 参数配置
测试方法我们使用FIO测试工具生成20个大小为8G的文件进行顺序写入操作在文件全部写入完成之后对这20个文件进行随机读取操作。
FIO测试配置如下
| 参数 | 值 |
|-----------|----------|
| iodepth | 5|
| bs | 128k|
|文件总数 | 20|
| size | 8G|
运行测试时对于OLUCB的```Server端```相关参数设置如下:
| 参数 | 解释 | 值 |
|-----------|----------| -------|
| SAMPLE_TIMES | 采样轮数 | 20 |
| SAMPLE_RATIO | 采样分配单位比例每X个单位作为一个可选的可能解 | 10 |
| WORKLOAD_CHANGE | 当reward的波动超过X时判定当前工作负载发生变化 | 0.30|
| LOAD_CHANGE_THRESHOLD | 自第X轮开始识别工作负载特征判定是否变化 | 50|
| HISTORY_REWARD_WINDOW | 历史奖励维护窗口 | 50 |
| PROBABILITY_THRESHOLD | 利用率X%的比例选择最优值,(100-X)%探索 | 80 |
运行测试时对于OLUCB的```Client端```相关参数设置如下:
| 参数 | 解释 | 值 |
|-----------|----------| -------|
|fixSize | 资源分配单位,读写缓存的分配粒度| 256M|
| reserveSize | 保留空间用于避免OLUCB决策将所有缓存单一分配造成的系统崩溃| 读缓存1024M写缓存512M|
| resizeInterval_ | 单轮配置生效时间 | 5s
## 第二节 结果展示
| OP | JYCache(普通模式) | JYCache(OLUCB) |
| ------------ | ------------ | ------------ |
| 顺序写 | 680.6MiB/s | 678.9MiB/s |
| 随机读 | 810MiB/s | 1344MiB/s |
其读写缓存大小变化如下图所示:
![](/doc/image/result_LinUCB.png)
根据上图结果以及测试特征、JYCache读写流程进行分析如下
- 在前20轮学习中OLUCB模型处于采样阶段因此ReadPool/WritePool的Size变化较为剧烈reward变化较大。
- 第21轮~第80轮之间为顺序写入阶段该阶段特征为均为写入操作只有当WritePool写入率达到60%或文件完成写入时才会触发一次对于整体文件的Flush操作。受限于S3FS特性当触发Flush时需要Read文件的全部数据以构建一个完整的文件之后才能写入故而读取吞吐量不再是0reward值亦发生变化。因此统计图中显示出读写缓存配置稳定不变而reward变化剧烈的情况。
- 第81轮开始为随机读取阶段观察到在101轮前出现读写缓存配置仍然相对稳定但reward值持续走低的情况。这是因为在这一阶段OLUCB需要一定时间才能识别出工作负载的变化历史滑动窗口大小为50因此其仍然保持一段时间的原最优配置。
- 第101轮~150轮OLUCB在第101轮根据历史滑动窗口判别当前工作负载发生变化因此重新初始化模型并回到采样阶段开始进行新一轮的学习。由于总读取量至少达到160G且为随机读取因此在初期需要大量从远端存储中Download数据故而计算出的读缓存吞吐量很低奖励值映射后较低从而导致模型学习速度较慢决策出的配置并不稳定。
- 第150轮之后随着读取时间的加长模型学习出当前为读频繁阶段不断尝试给读缓存增加容量并且根据环境反馈奖励值逐渐学习到最佳分配。
## 第三节 总结叙述
根据我们的算法设计以及实验结果可以证明OLUCB方案具有以下几个特性
1. 高效性OLUCB提出了任务导向的多层次臂架构使得在线配置计算的开销很小收敛速度较快。
2. 自治性OLUCB利用LinUCB算法进行在线采样不需要任何先验知识。
3. 智能性OLUCB利用运行时信息作为上下文特征信息智能地指导在解空间的搜索减少了无用的探索能够快速收敛并找到好的解。
4. 自适应性OLUCB能够进行在线的快速学习对动态的系统变化具有良好的适用性。

124
doc/项目PR说明.md Normal file
View File

@ -0,0 +1,124 @@
# 简略说明
### 修改:
1. 将Local WriteCache的写入限流机制结合到当前项目中
2. 接入基于LinUCB算法的ReadCache & WriteCache动态调整策略
### 变更内容:
1. 修改JYCache/local_cache文件夹下的config.h & config.cpp & write_cache.h &write_cache.cpp & read_cache.h & read_cache.cpp & page_cache.h & page_cache.cpp新增throttle.h & throttle.cpp
2. 修改JYCache/s3fs文件夹下的hybridcache_accessor_4_s3fs.h & hybridcache_accessor_4_s3fs.cpp
### 测试结果:
1. 对于写入限流机制,本地测试未发现问题。
2. 接入LinUCB算法后在小规模数据集上测试效果较优。
### 注意事项:
1. sh build.sh && sh install.sh后需要更新JYCache_Env/conf/newcache.conf根据自身需要开启/关闭限流机制以及LinUCB调整策略具体选项如/conf_spec/newcache.conf_spce所示
### 使用方法
1. JYCache路径下
```bash
# 1.从源码构建
sh build.sh
# 2.系统安装
sh install.sh
# 3.开启LinUCB服务器端不开启LinUCB调整可跳过此项
cd LinUCBSchedule && python3 server.py
```
2. JYCache/JYCache_Env路径下
```bash
# 1.开启minio
cd ./minio && sh start.sh && cd ..
# 2.修改配置文件
vim conf/newcache.conf
# 下为新增:
WriteCacheConfig.EnableThrottle=0
EnableResize = 1
EnableLinUCB = 1
# 下为修改
UseGlobalCache=0 #不开启全局缓存
# 3.启动s3fs
sh start_s3fs.sh
```
# 详细介绍
## 1.Local WriteCache写入限流机制
### 1.1 框架设计
JYCache是一款面向个人使用、大模型训练推理等多种场景适配大容量对象存储等多种底层存储形态高性能、易扩展的分布式缓存存储系统。JYCache在同一时刻支持多个进程同时对不同文件进行写入操作但受限于内存读写带宽的限制以及出于防止恶意攻击、滥用或过度消耗资源的目的此处增添了Local WriteCache层的写入限流机制。该机制运行流程如下
![](image\LocalWriteCache_throttle.png)
写入限流机制使用文件名称作为区分标识符即同一个文件在一定时间间隔内可写入的字节数目是有限的将令牌视同于带宽资源。当一个写入任务即客户端发起Put请求时会传递目标文件名称和该次写入字节长度等参数。WriteCache在接收到这些信息时运行流程如下
1. 该文件为首次写入:
1. 是->为其建立与令牌桶的映射,分配一定带宽资源。
2. 否->查找文件对应令牌桶。
2. 从令牌桶中消耗该次写入长度的令牌数目记录因获取令牌而消耗的时间BlockTime
3. 执行真正的写入操作。
同时在WriteCache被初始化时新增线程throttling_thread_用于对当前每个文件的写入带宽进行调节。每隔0.1s启动一次。此处认为BlockTime越长则说明这段时间内该文件发出的写入请求越多/饥饿情况越严重此时应当为其分配更多写入带宽资源反之亦然。因此使用各个文件的BlockTime/AvgBlockTime * TotalBandWidth作为下一轮次的写入带宽出于公平性考虑同时限制每个文件的写入带宽最高不能超过总带宽的50%不能低于总带宽的10%。详细介绍见[限流工具实现及使用说明](https://epr023ri66.feishu.cn/docx/MfbOdxoLboII2Ex7MfyccKjXnEg)
### 1.2 参数说明
只有单个任务时默认调节令牌桶的rate()和burst()为649651540实际运行日志中[WriteCache]Put统计为len:131072res:0writePagecnt:2time:0.201659ms即约每秒写入649651540Bytes/s
因此文件初次写入时设置rate()=32768000,burst()=65536000,capacity=32768000
### 1.3 使用说明
修改JYCache_Env/conf/newcache.conf中的WriteCacheConfig.EnableThrottle=1即可。
相关代码实现位于local_cache/throttle.h&throttle.cpp下同时修改了local_cache/write_cache.h&write_cache.cpp中写入、删除相关内容。
### 1.4 效果展示
在开启写入限流时,短时间内发起超量写入请求的文件将被阻塞一定时间后方能继续下一次写入。
![](image\result_throttle.png)
## 2.基于强化学习的Pool Resize设计
### 2.1 框架设计
应用程序在不同的运行阶段有着不同的访存模式,造成了不断变化的读写性能需求。然而,虽然读写缓冲池的大小对读写性能有着直接影响,但是现有缓存系统中的读写缓冲池的大小固定,无法跟随应用的性能需求而动态改变。为了解决这个问题,提出了 Pool Resize 设计,该设计收集当前缓冲池的配置信息和性能参数等信息,然后通过 socket 发送该信息给强化学习模型 OLUCB 生成新的缓冲池配置信息并对缓冲池大小进行改变。此设计通过实时检测和调整不同缓冲池大小的策略更好的满足了应用需求,提高了应用的存储性能,同时减少了资源空闲率。
![](image\OLUCB_frame.png)
整体框架采用 CS 架构实现,其中 Server 端运行 OLUCB 模型负责接收当前配置信息并计算新配置Client 端是运行在缓存系统中的一个线程负责接收新配置并进行资源调整。
在s3fs中资源调整线程为LinUCBThread运行周期与程序的生存周期一致。每隔resizeInterval_段时间该线程会与Server端进行一次交互Client向Server端发送信息如下
```bash
"WritePool:"writeCacheSize_/fixSize; // 当前WritePool大小
"ReadPool:"readCacheSize_/fixSize;// 当前ReadPool大小
"WritePool:"writeThroughput; // 写入吞吐量
"ReadPool:"readThroughput;// 读取吞吐量
"WritePool:"writeCount_; // Put函数调用次数
"ReadPool:"readCount_;// Get函数调用次数
```
- 1~2行当前WritePool和ReadPool各自占据的资源单位数目以256M为1个资源单位。
- 3~4行在这段时间间隔内的读写吞吐量。吞吐量计算方式如下相关变量在Put/Get函数内部调用时累积统计
- writeThroughput = (double)writeByteAcc_/(double)writeTimeAcc_/1024 / 1024;
- readThroughput = (double)readByteAcc_/(double)readTimeAcc_/ 1024 / 1024;
- 5~6行Put()函数和Get()函数的调用次数作为context信息发给Server
Server端使用强化学习模型 OLUCB进行单目标调优计算出下一轮次中WritePool和ReadPool应当获取的资源单位数目发送给Client。Client在根据发送来的信息调节Pool时流程如下
1. 先进行缩池操作对需要缩小的池计算当前剩余空间freeSize。如果当前剩余空间减去保留空间reserveSize的值小于要缩小的空间的值则说明其最多可以减少freeSize - reverseSize大小的空间将其对齐到256M以避免因为四舍五入导致总资源数量减少进行缩池操作。
2. 进行增池操作,使用上一步实际缩小的容量作为当前可增加的容量,防止超出上限。
### 2.2 参数说明
- Server端IP 地址为 127.0.0.1,端口号为 2333
- 模型说明:[OLUCB单目标调度算法](https://epr023ri66.feishu.cn/docx/KfsddCGbLoZjf0xgSOqcw0V8nZb)
- fixSize资源分配单位= 1024 * 1024 * 256即256M
- reserveSize保留单位用于防止Resize时出现某个Pool为0的情况= 1024 * 1024 * 512即512M
- resizeInterval_Resize的时间间隔设置为5s
### 2.3 使用说明
1. 修改JYCache_Env/conf/newcache.conf中的WriteCacheConfig.EnableLinUCB=1开启s3fs
2. 运行Server.py文件
3. 开始运行。
### 2.4 效果展示
![](image\result_LinUCB.png)

0
install.sh Executable file → Normal file
View File

View File

@ -66,6 +66,8 @@ bool GetHybridCacheConfig(const std::string& file, HybridCacheConfig& cfg) {
cfg.WriteCacheCfg.CacheCfg.EnableCAS);
conf.GetValueFatalIfFail("WriteCacheConfig.CacheSafeRatio",
cfg.WriteCacheCfg.CacheSafeRatio);
conf.GetValueFatalIfFail("WriteCacheConfig.EnableThrottle",
cfg.WriteCacheCfg.EnableThrottle);
// GlobalCache
conf.GetValueFatalIfFail("UseGlobalCache", cfg.UseGlobalCache);
@ -91,6 +93,10 @@ bool GetHybridCacheConfig(const std::string& file, HybridCacheConfig& cfg) {
conf.GetValueFatalIfFail("EnableLog", cfg.EnableLog);
conf.GetValueFatalIfFail("FlushToRead", cfg.FlushToRead);
conf.GetValueFatalIfFail("CleanCacheByOpen", cfg.CleanCacheByOpen);
//add by tqy
conf.GetValueFatalIfFail("EnableResize",cfg.EnableResize);
conf.GetValueFatalIfFail("EnableLinUCB",cfg.EnableLinUCB);
conf.PrintConfig();
return CheckConfig(cfg);

View File

@ -34,9 +34,11 @@ struct ReadCacheConfig {
uint64_t DownloadBurstFlowLimit;
};
struct WriteCacheConfig {
struct WriteCacheConfig {
CacheConfig CacheCfg;
uint32_t CacheSafeRatio; // cache safety concern threshold (percent)
//added by tqy
bool EnableThrottle;
};
struct GlobalCacheConfig {
@ -60,6 +62,9 @@ struct HybridCacheConfig {
bool UseGlobalCache = false;
bool FlushToRead = false; // write to read cache after flush
bool CleanCacheByOpen = false; // clean read cache when open file
//added by tqy
bool EnableResize; // 是否开启普通的Resize策略
bool EnableLinUCB; // 是否开启LinUCB
};
bool GetHybridCacheConfig(const std::string& file, HybridCacheConfig& cfg);

View File

@ -407,7 +407,8 @@ int PageCacheImpl::Delete(const std::string &key) {
return res;
}
Cache::WriteHandle PageCacheImpl::FindOrCreateWriteHandle(const std::string &key) {
Cache::WriteHandle PageCacheImpl::FindOrCreateWriteHandle(const std::string &key)
{
auto writeHandle = cache_->findToWrite(key);
if (!writeHandle) {
writeHandle = cache_->allocate(pool_, key, GetRealPageSize());
@ -424,7 +425,7 @@ Cache::WriteHandle PageCacheImpl::FindOrCreateWriteHandle(const std::string &key
pageNum_.fetch_add(1);
pagesList_.insert(key);
}
} else {
} else {//WriteCache
if (cache_->insert(writeHandle)) {
pageNum_.fetch_add(1);
pagesList_.insert(key);

View File

@ -96,7 +96,18 @@ class PageCacheImpl : public PageCache {
public:
PageCacheImpl(const CacheConfig& cfg): PageCache(cfg) {
bitmapSize_ = cfg_.PageBodySize / BYTE_LEN;
LOG(WARNING) << "[TestOutPut] PageCache Init with size : "<<GetCacheSize();
}
//added by tqy
PageCacheImpl(const CacheConfig& cfg, PoolId curr_pool_id_, std::shared_ptr<Cache> curr_cache_):PageCache(cfg)
{
bitmapSize_ = cfg_.PageBodySize / BYTE_LEN;
cache_ = curr_cache_;
pool_ = curr_pool_id_;
}
//added end
~PageCacheImpl() {}
int Init();
@ -148,6 +159,7 @@ class PageCacheImpl : public PageCache {
}
Cache::WriteHandle FindOrCreateWriteHandle(const std::string &key);
private:
std::shared_ptr<Cache> cache_;

View File

@ -5,9 +5,19 @@ namespace HybridCache {
ReadCache::ReadCache(const ReadCacheConfig& cfg,
std::shared_ptr<DataAdaptor> dataAdaptor,
std::shared_ptr<ThreadPool> executor) :
std::shared_ptr<ThreadPool> executor, //below added
PoolId curr_id_,
std::shared_ptr<Cache> curr_cache_) :
cfg_(cfg), dataAdaptor_(dataAdaptor), executor_(executor) {
Init();
// Init();
if(curr_cache_ == nullptr)
{
Init();
}
else
{
CombinedInit(curr_id_, curr_cache_);
}
}
folly::Future<int> ReadCache::Get(const std::string &key, size_t start,
@ -60,7 +70,8 @@ folly::Future<int> ReadCache::Get(const std::string &key, size_t start,
size_t fileStartOff = 0;
std::vector<folly::Future<int>> fs;
auto it = dataBoundary.begin();
while (remainLen > 0 && SUCCESS == res) {
while (remainLen > 0 && SUCCESS == res)
{
ByteBuffer stepBuffer(buffer.data + stepStart);
fileStartOff = start + stepStart;
if (it != dataBoundary.end()) {
@ -83,7 +94,6 @@ folly::Future<int> ReadCache::Get(const std::string &key, size_t start,
while(!this->tokenBucket_->consume(readLen));
return SUCCESS;
}).thenValue([this, key, fileStartOff, readLen, stepBuffer](int i) {
// LOG(INFO) << "Extra download: " << key << " " << readLen;
ByteBuffer tmpBuffer(stepBuffer.data, readLen);
return this->dataAdaptor_->DownLoad(key, fileStartOff, readLen, tmpBuffer).get();
}).thenValue([this, key, fileStartOff, readLen, stepBuffer](int downRes) {
@ -248,9 +258,23 @@ int ReadCache::Init() {
return res;
}
//added by tqy
int ReadCache::CombinedInit(PoolId curr_id_, std::shared_ptr<Cache> curr_cache_)
{
pageCache_ = std::make_shared<PageCacheImpl>(cfg_.CacheCfg, curr_id_, curr_cache_);
tokenBucket_ = std::make_shared<folly::TokenBucket>(
cfg_.DownloadNormalFlowLimit, cfg_.DownloadBurstFlowLimit);
LOG(WARNING) << "[ReadCache]Init, curr_id : "<< static_cast<int>(curr_id_) <<" curr_cache : "<< curr_cache_;
return SUCCESS;
}
//add end
std::string ReadCache::GetPageKey(const std::string &key, size_t pageIndex) {
std::string pageKey(key);
pageKey.append(std::string(1, PAGE_SEPARATOR)).append(std::to_string(pageIndex));
// pageKey.append(std::string(1, PAGE_SEPARATOR)).append(std::to_string(pageIndex));
pageKey.append(std::string(1, PAGE_SEPARATOR)).append("Read").append(std::to_string(pageIndex));
return pageKey;
}

View File

@ -17,7 +17,9 @@ class ReadCache {
public:
ReadCache(const ReadCacheConfig& cfg,
std::shared_ptr<DataAdaptor> dataAdaptor,
std::shared_ptr<ThreadPool> executor);
std::shared_ptr<ThreadPool> executor, //below added by tqy
PoolId curr_id_ = NULL,
std::shared_ptr<Cache> curr_cache_ = nullptr);
ReadCache() = default;
~ReadCache() { Close(); }
@ -41,6 +43,9 @@ class ReadCache {
private:
int Init();
//added by tqy
int CombinedInit(PoolId curr_id_, std::shared_ptr<Cache> curr_cache_);
//added end
std::string GetPageKey(const std::string &key, size_t pageIndex);

195
local_cache/throttle.cpp Normal file
View File

@ -0,0 +1,195 @@
#include "throttle.h"
#include "errorcode.h"
#include "glog/logging.h"
#include "common.h"
#include "config.h"
namespace HybridCache {
void Throttle::CleanBlockTime()
{
for(const auto& curr_ : this->job_waiting_)
{
job_waiting_.assign(curr_.first, 0.0);
}
job_bandwidth_.clear();
}
std::vector<std::string> ToSplitString(const std::string &input)
{
std::vector<std::string> result;
std::stringstream ss(input);
std::string item;
while (std::getline(ss, item, ',')) {
result.push_back(item);
}
return result;
}
int Throttle::SetNewLimits(const std::string &input)
{
//no change
if(input == "")
{
return 0;
}
//传送格式filename,rate,burst
std::vector<std::string> result = ToSplitString(input);
std::string file_name;
double rate;
double burst;
std::ostringstream oss;
std::string printres="";
if(result.size()%3 != 0)
{
LOG(ERROR) << "[Throttle] The format of new limits is Wrong";
return -1;
}
auto it = job_tokenlimit_.begin();
for(size_t i = 0; i < result.size(); i +=3)
{
file_name = result[i];
rate = std::stod(result[i + 1]);
burst = std::stod(result[i + 2]);
auto temp_it = job_tokenlimit_.find(file_name);//last tokenbucket
if(temp_it != job_tokenlimit_.end())
{
//temp_it->second->reset(rate,burst);
double curr_available = temp_it->second->available();
auto token_bucket = std::make_shared<folly::TokenBucket>(rate, burst);
token_bucket->setCapacity(curr_available,std::chrono::duration<double>(std::chrono::steady_clock::now().time_since_epoch()).count() );
job_tokenlimit_.assign(file_name, token_bucket);
}
}
for(auto printtemp : job_tokenlimit_)
{
oss.str(""); // 清空流内容
oss.clear(); // 清除错误状态
oss<<printtemp.first<<"\trate:"<<printtemp.second->rate()
<<"\tburst:"<<printtemp.second->burst()
<<"\tavailable:"<<printtemp.second->available()<<"\t\t";
printres += oss.str();
}
if(EnableLogging)
LOG(INFO) << "[Throttle] Current Resize Each File's Flow : \t"<<printres;
return 0;
}
//blocktime高于平均数的增加对应比例的带宽
//低于平均数的减少对应比例的带宽
std::string Throttle::Cal_New4Test()
{
std::string stringres = "";
double total_waiting_time = 0.0;
int job_num = 0;
double rate;
double TotalBw = 649651540; //当前轮次可调整带宽资源
std::ostringstream oss;
folly::ConcurrentHashMap<std::string, double> curr_for_cal;
if(job_waiting_.size() == 0) //当前无任务
{
return stringres;
}
if(job_waiting_.size() == 1) //单个任务,调整到最大
{
oss.str(""); // 清空流内容
oss.clear(); // 清除错误状态
//传送格式filename,rate,burst
oss<<job_waiting_.cbegin()->first<<",";
oss<<649651540<<","<<649651540<<",";
stringres += oss.str();
return stringres;
}
//由于是并行执行,复制一个副本防止计算错误
for(auto& curr_ : job_waiting_)
{
if(curr_.second == 0) //在这段时间内未执行写入/未执行完,暂不进行修改,留到下一轮
continue;
curr_for_cal.insert(curr_.first, curr_.second);
++job_num;
total_waiting_time += curr_.second;
if(job_bandwidth_[curr_.first] > TotalBw) //防止为0
TotalBw = job_bandwidth_[curr_.first];
}
if(job_num == 0)
return stringres;
//在waiting中的在job_tokenlimit_中
for(auto& curr_ : curr_for_cal)
{
rate = curr_.second / total_waiting_time * TotalBw;
rate = std::max(rate, TotalBw / 10);
rate = std::min(rate, TotalBw / 2);
oss.str(""); // 清空流内容
oss.clear(); // 清除错误状态
//传送格式filename,rate,burst
oss << curr_.first << "," << rate << "," << TotalBw <<",";
stringres += oss.str();
}
// if(EnableLogging)
// LOG(INFO) << "[Throttle]"<<__func__<<" Resize Each File's Flow : "<<stringres;
return stringres;
}
int Throttle::Put_Consume(const std::string& file_name, size_t sizes)
{
int temp = false;
auto result = this->job_tokenlimit_.find(file_name);
if(result == job_tokenlimit_.end())
{
//日志len:131072res:0writePagecnt:2time:0.201659ms
//大概是619.38 MB/s 649651540
auto token_bucket = std::make_shared<folly::TokenBucket>(32768000, 65536000);
token_bucket->setCapacity(32768000,std::chrono::duration<double>(std::chrono::steady_clock::now().time_since_epoch()).count() );
job_tokenlimit_.insert_or_assign(file_name, token_bucket);
job_waiting_.insert_or_assign(file_name, 0.0);
temp = true;
}
auto insert_result = job_tokenlimit_.find(file_name);
std::chrono::steady_clock::time_point BlockTimeStart = std::chrono::steady_clock::now();
while( !(insert_result->second->consume(sizes)) );//consume
double BlockTime = std::chrono::duration<double, std::milli>(std::chrono::steady_clock::now() - BlockTimeStart).count();
job_waiting_.assign(file_name, job_waiting_[file_name] + BlockTime);
// if(EnableLogging)
// {
// LOG(INFO)<<"[Throttle] "<<file_name <<" BlockTime : "<<BlockTime;
// }
// if (EnableLogging)
// {
// LOG(INFO)<<"[LocalWriteCache]"<<file_name<<" burst : "<<job_tokenlimit_[file_name]->burst()<<" rate : "
// <<job_tokenlimit_[file_name]->rate()<<" BlockTime : "<<BlockTime<<"ms "
// <<"TotalBlockTime : "<<job_waiting_[file_name]<<"ms "
// <<"newly insert : "<<temp
// <<" size : "<<sizes
// <<" avalible : "<<job_tokenlimit_[file_name]->available();
// }
return SUCCESS;
}
//当文件被删除/flush时会用到
void Throttle::Del_File(const std::string& file_name)
{
job_waiting_.erase(file_name);
job_tokenlimit_.erase(file_name);
job_bandwidth_.erase(file_name);
}
void Throttle::Close()
{
job_tokenlimit_.clear();
job_waiting_.clear();
job_bandwidth_.clear();
}
}

27
local_cache/throttle.h Normal file
View File

@ -0,0 +1,27 @@
#ifndef HYBRIDCACHE_THROTTLE_H_
#define HYBRIDCACHE_THROTTLE_H_
#include "folly/TokenBucket.h"
#include "folly/concurrency/ConcurrentHashMap.h"
namespace HybridCache {
class Throttle
{
public :
folly::ConcurrentHashMap<std::string, std::shared_ptr<folly::TokenBucket>> job_tokenlimit_; //文件和每个任务的写上限
folly::ConcurrentHashMap<std::string, double> job_waiting_; //文件和阻塞时间
folly::ConcurrentHashMap<std::string, double> job_bandwidth_;
//std::atomic<bool> is_resetting_{false};//正在重新调整各个文件的带宽
Throttle() { ; }
int SetNewLimits(const std::string &input);//带宽重分配
int Put_Consume(const std::string&, size_t);
void Del_File(const std::string&);
void Close();
std::string Cal_New4Test();
void CleanBlockTime();
};
}
#endif // HYBRIDCACHE_THROTTLE_H_

View File

@ -22,6 +22,10 @@ int WriteCache::Put(const std::string &key, size_t start, size_t len,
while (remainLen > 0) {
writeLen = pagePos + remainLen > pageSize ? pageSize - pagePos : remainLen;
std::string pageKey = std::move(GetPageKey(key, index));
//----------tqy--------
if(cfg_.EnableThrottle)
this->throttling.Put_Consume(key, writeLen);
//---------------------
res = pageCache_->Write(pageKey, pagePos, writeLen,
(buffer.data + writeOffset));
if (SUCCESS != res) break;
@ -148,7 +152,10 @@ int WriteCache::Delete(const std::string &key, LockType type) {
if (LockType::ALREADY_LOCKED != type) {
Lock(key);
}
//-----tqy------
if(cfg_.EnableThrottle)
throttling.Del_File(key);
//--------------
keys_.erase(key);
size_t delPageNum = 0;
std::string firstPage = std::move(GetPageKey(key, 0));
@ -242,6 +249,9 @@ int WriteCache::GetAllKeys(std::map<std::string, time_t>& keys) {
for (auto& it : keys_) {
keys[it.first] = it.second;
//---------tqy-------
if(cfg_.EnableThrottle)
throttling.Del_File(it.first);
}
if (EnableLogging) {
double totalTime = std::chrono::duration<double, std::milli>(
@ -255,6 +265,16 @@ int WriteCache::GetAllKeys(std::map<std::string, time_t>& keys) {
void WriteCache::Close() {
pageCache_->Close();
keys_.clear();
//------------------tqy------------
if(cfg_.EnableThrottle)
{
throttling.Close();
throttling_thread_running_.store(false, std::memory_order_release);//线程终止
if (throttling_thread_.joinable()) {
throttling_thread_.join();
}
}
LOG(WARNING) << "[WriteCache]Close";
}
@ -269,6 +289,7 @@ size_t WriteCache::GetCacheMaxSize() {
int WriteCache::Init() {
pageCache_ = std::make_shared<PageCacheImpl>(cfg_.CacheCfg);
int res = pageCache_->Init();
LOG(WARNING) << "[WriteCache]Init, res:" << res;
return res;
}
@ -279,8 +300,47 @@ void WriteCache::Lock(const std::string &key) {
std::string WriteCache::GetPageKey(const std::string &key, size_t pageIndex) {
std::string pageKey(key);
pageKey.append(std::string(1, PAGE_SEPARATOR)).append(std::to_string(pageIndex));
// pageKey.append(std::string(1, PAGE_SEPARATOR)).append(std::to_string(pageIndex));
pageKey.append(std::string(1, PAGE_SEPARATOR)).append("Write").append(std::to_string(pageIndex));// tqy add for cachelib combination
return pageKey;
}
//---------------------tqy----------------------
int WriteCache::CombinedInit (PoolId curr_id_, std::shared_ptr<Cache> curr_cache_)
{
this->pageCache_ = std::make_shared<PageCacheImpl>(cfg_.CacheCfg, curr_id_, curr_cache_);
LOG(WARNING) << "[WriteCache]Init, curr_id : "<< static_cast<int>(curr_id_) <<" curr_cache : "<< curr_cache_;
return SUCCESS;
}
//开一个线程负责记录文件的流量,并且后续与调度器进行交互
void WriteCache::Dealing_throttling()
{
if(!cfg_.EnableThrottle)
{
return;
}
LOG(WARNING) << "[WriteCache] Throttling Thread start";
//memory_order_release确保在此操作之前的所有写操作在其他线程中可见。
throttling_thread_running_.store(true, std::memory_order_release);
while(throttling_thread_running_.load(std::memory_order_acquire))
{
//接收到调度器传来的新的带宽
//memory_order_acquire确保此操作之后的所有读写操作在其他线程中可见。
std::string new_limit = throttling.Cal_New4Test();
//new_bw = 0;
throttling.SetNewLimits(new_limit);
throttling.CleanBlockTime();
std::this_thread::sleep_for(std::chrono::milliseconds(100));//每0.1s Resize一次
// for(auto temp : throttling.job_bandwidth_)
// {
// new_bw += temp.second;
// }
// curr_bw = std::max(new_bw, 649651540.0);
}
LOG(WARNING) << "[WriteCache] Throttling Thread end";
}
} // namespace HybridCache

View File

@ -9,12 +9,37 @@
#include "folly/concurrency/ConcurrentHashMap.h"
#include "page_cache.h"
//-----------tqy--------------
#include "throttle.h"
namespace HybridCache {
class WriteCache {
public:
WriteCache(const WriteCacheConfig& cfg) : cfg_(cfg) { Init(); }
// WriteCache(const WriteCacheConfig& cfg) : cfg_(cfg) { Init(); }
//added by tqy
WriteCache(const WriteCacheConfig& cfg, PoolId curr_id_ = NULL, std::shared_ptr<Cache> curr_cache_ = nullptr) : cfg_(cfg)
{
if(curr_cache_ == nullptr)
{
Init();
}
else
{
CombinedInit(curr_id_, curr_cache_);
}
//Throttle
if(cfg_.EnableThrottle)
{
throttling_thread_ = std::thread(&WriteCache::Dealing_throttling, this);
LOG(WARNING) <<"[WriteCache] USE_THROTTLING";
}
else
{
LOG(WARNING) <<"[WriteCache] NO USE_THROTTLING";
}
}
//---------------
WriteCache() = default;
~WriteCache() { Close(); }
@ -58,6 +83,11 @@ class WriteCache {
private:
int Init();
//added by tqy
int CombinedInit(PoolId curr_id_, std::shared_ptr<Cache> curr_cache_);
void Dealing_throttling();
//added end
void Lock(const std::string &key);
std::string GetPageKey(const std::string &key, size_t pageIndex);
@ -67,6 +97,10 @@ class WriteCache {
std::shared_ptr<PageCache> pageCache_;
folly::ConcurrentHashMap<std::string, time_t> keys_; // <key, create_time>
StringSkipList::Accessor keyLocks_ = StringSkipList::create(SKIP_LIST_HEIGHT); // presence key indicates lock
//------------------tqy---------------
HybridCache::Throttle throttling;
std::thread throttling_thread_;//改成一个单独的线程
std::atomic<bool> throttling_thread_running_{false};//调度线程是否启用
};
} // namespace HybridCache

View File

@ -48,16 +48,113 @@ void HybridCacheAccessor4S3fs::Init() {
executor_ = std::make_shared<HybridCache::ThreadPool>(cfg_.ThreadNum);
dataAdaptor_->SetExecutor(executor_);
writeCache_ = std::make_shared<WriteCache>(cfg_.WriteCacheCfg);
readCache_ = std::make_shared<ReadCache>(cfg_.ReadCacheCfg, dataAdaptor_,
executor_);
// ---deleted by tqy
// writeCache_ = std::make_shared<WriteCache>(cfg_.WriteCacheCfg);
// readCache_ = std::make_shared<ReadCache>(cfg_.ReadCacheCfg, dataAdaptor_,
// executor_);
InitCache();
tokenBucket_ = std::make_shared<folly::TokenBucket>(
cfg_.UploadNormalFlowLimit, cfg_.UploadBurstFlowLimit);
toStop_.store(false, std::memory_order_release);
bgFlushThread_ = std::thread(&HybridCacheAccessor4S3fs::BackGroundFlush, this);
//added by tqy referring to xyq
if(cfg_.EnableLinUCB)
{
stopLinUCBThread = false;
LinUCBThread = std::thread(&HybridCacheAccessor4S3fs::LinUCBClient, this);
}
LOG(WARNING) << "[Accessor]Init, useGlobalCache:" << cfg_.UseGlobalCache;
}
//added by tqy referring to xyq
int HybridCacheAccessor4S3fs::InitCache()
{
// LOG(WARNING) << "ENABLERESIZE : " << cfg_.EnableResize;
if(cfg_.EnableResize || cfg_.EnableLinUCB) //放进同一个cachelib
{
//in page implement
const uint64_t REDUNDANT_SIZE = 1024 * 1024 * 1024;
const unsigned bucketsPower = 25;
const unsigned locksPower = 15;
const std::string COM_CACHE_NAME = "ComCache";//WriteCache和ReadCache都变为其中一部分
const std::string WRITE_POOL_NAME = "WritePool";
const std::string READ_POOL_NAME = "ReadPool";
//Resize Total Cache
Cache::Config config;
//TOFIXUPREDUNDANT_SIZE需要*2吗
config
.setCacheSize(cfg_.WriteCacheCfg.CacheCfg.MaxCacheSize + cfg_.ReadCacheCfg.CacheCfg.MaxCacheSize + 2 * REDUNDANT_SIZE)
.setCacheName(COM_CACHE_NAME)
.setAccessConfig({bucketsPower, locksPower})
.validate();
std::shared_ptr<Cache> comCache_ = std::make_unique<Cache>(config);
ResizeWriteCache_ = comCache_;
if (comCache_->getPoolId(WRITE_POOL_NAME) != -1)
{
LOG(WARNING) << "Pool with the same name "<<WRITE_POOL_NAME<<" already exists.";
writePoolId_ = comCache_->getPoolId(WRITE_POOL_NAME);
}
else
{
writePoolId_ = comCache_->addPool(WRITE_POOL_NAME, cfg_.WriteCacheCfg.CacheCfg.MaxCacheSize);
}
writeCache_ = std::make_shared<WriteCache>(cfg_.WriteCacheCfg, writePoolId_, comCache_);
//实际上只有ReadCache可能需要NVM
if(cfg_.ReadCacheCfg.CacheCfg.CacheLibCfg.EnableNvmCache)
{
Cache::NvmCacheConfig nvmConfig;
std::vector<std::string> raidPaths;
//Read Path
for (int i=0; i<cfg_.ReadCacheCfg.CacheCfg.CacheLibCfg.RaidFileNum; ++i)
{
raidPaths.push_back(cfg_.ReadCacheCfg.CacheCfg.CacheLibCfg.RaidPath + std::to_string(i));
}
nvmConfig.navyConfig.setRaidFiles(raidPaths,
cfg_.ReadCacheCfg.CacheCfg.CacheLibCfg.RaidFileSize, false);
nvmConfig.navyConfig.blockCache()
.setDataChecksum(cfg_.ReadCacheCfg.CacheCfg.CacheLibCfg.DataChecksum);
nvmConfig.navyConfig.setReaderAndWriterThreads(1, 1, 0, 0);
config.enableNvmCache(nvmConfig).validate();
}
ResizeReadCache_ = comCache_;
if (comCache_->getPoolId(READ_POOL_NAME) != -1)
{
LOG(WARNING) << "Pool with the same name "<<READ_POOL_NAME<<" already exists.";
readPoolId_ = comCache_->getPoolId(READ_POOL_NAME);
}
else
{
readPoolId_ = comCache_->addPool(READ_POOL_NAME, cfg_.ReadCacheCfg.CacheCfg.MaxCacheSize);
}
readCache_ = std::make_shared<ReadCache>(cfg_.ReadCacheCfg, dataAdaptor_,
executor_, readPoolId_, comCache_);
LOG(WARNING) << "[Accessor]Init Cache in Combined Way.";
}
else //沿用原来的方式
{
writeCache_ = std::make_shared<WriteCache>(cfg_.WriteCacheCfg);
readCache_ = std::make_shared<ReadCache>(cfg_.ReadCacheCfg, dataAdaptor_,
executor_);
ResizeWriteCache_ = nullptr;
ResizeReadCache_ = nullptr;
LOG(WARNING) << "[Accessor]Init Cache in Initial Way.";
}
writeCacheSize_ = cfg_.WriteCacheCfg.CacheCfg.MaxCacheSize;
readCacheSize_ = cfg_.ReadCacheCfg.CacheCfg.MaxCacheSize;
return SUCCESS;
}
void HybridCacheAccessor4S3fs::Stop() {
toStop_.store(true, std::memory_order_release);
if (bgFlushThread_.joinable()) {
@ -66,6 +163,11 @@ void HybridCacheAccessor4S3fs::Stop() {
executor_->stop();
writeCache_.reset();
readCache_.reset();
// added by tqy referring to xyq
stopLinUCBThread.store(true, std::memory_order_release);
if(cfg_.EnableLinUCB && LinUCBThread.joinable()){
LinUCBThread.join();
}
LOG(WARNING) << "[Accessor]Stop";
}
@ -76,9 +178,22 @@ int HybridCacheAccessor4S3fs::Put(const std::string &key, size_t start,
// When the write cache is full,
// block waiting for asynchronous flush to release the write cache space.
while(IsWriteCacheFull(len)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
// while(IsWriteCacheFull(len)) {
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
// }
if(cfg_.EnableLinUCB || cfg_.EnableResize)
{
while(IsWritePoolFull(len)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
else
{
while(IsWriteCacheFull(len)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
++writeCount_;
// shared lock
auto fileLock = fileLock_.find(key);
@ -97,6 +212,7 @@ int HybridCacheAccessor4S3fs::Put(const std::string &key, size_t start,
break;
}
// LOG(WARNING) << "[TestOutPut]Accessor Put, Get File "<<key<<"'s Lock";
int res = writeCache_->Put(key, start, len, ByteBuffer(const_cast<char *>(buf), len));
int fd = -1;
@ -112,13 +228,17 @@ int HybridCacheAccessor4S3fs::Put(const std::string &key, size_t start,
fileLock->second->fetch_sub(1); // release shared lock
if (EnableLogging) {
double totalTime = std::chrono::duration<double, std::milli>(
// LOG(WARNING) << "[TestOutPut]Accessor Put, Release File "<<key<<"'s Lock";
double totalTime = std::chrono::duration<double, std::milli>(
std::chrono::steady_clock::now() - startTime).count();
if (EnableLogging) {
LOG(INFO) << "[Accessor]Put, key:" << key << ", start:" << start
<< ", len:" << len << ", res:" << res
<< ", time:" << totalTime << "ms";
}
// added by tqy referring to xyq
writeByteAcc_ += len;
writeTimeAcc_ += totalTime;
return res;
}
@ -126,6 +246,8 @@ int HybridCacheAccessor4S3fs::Get(const std::string &key, size_t start,
size_t len, char* buf) {
std::chrono::steady_clock::time_point startTime;
if (EnableLogging) startTime = std::chrono::steady_clock::now();
++readCount_;
// LOG(INFO) << "[TestOutPut] Get start, key:" << key;
int res = SUCCESS;
ByteBuffer buffer(buf, len);
@ -171,14 +293,16 @@ int HybridCacheAccessor4S3fs::Get(const std::string &key, size_t start,
res = tmpRes;
}
}
if (EnableLogging) {
double totalTime = std::chrono::duration<double, std::milli>(
double totalTime = std::chrono::duration<double, std::milli>(
std::chrono::steady_clock::now() - startTime).count();
if (EnableLogging) {
LOG(INFO) << "[Accessor]Get, key:" << key << ", start:" << start
<< ", len:" << len << ", res:" << res
<< ", time:" << totalTime << "ms";
}
// added by tqy referring to xyq
readByteAcc_ += len;
readTimeAcc_ += totalTime;
return res;
}
@ -188,6 +312,7 @@ int HybridCacheAccessor4S3fs::Flush(const std::string &key) {
startTime = std::chrono::steady_clock::now();
LOG(INFO) << "[Accessor]Flush start, key:" << key;
}
// LOG(WARNING) << "[TestOutPut] Flush start, key:" << key;
// exclusive lock
auto fileLock = fileLock_.find(key);
@ -212,12 +337,16 @@ int HybridCacheAccessor4S3fs::Flush(const std::string &key) {
if (nullptr == (ent = FdManager::get()->GetFdEntity(
key.c_str(), fd, false, AutoLock::ALREADY_LOCKED))) {
res = -EIO;
LOG(ERROR) << "[Accessor]Flush, can't find opened path, file:" << key;
LOG(ERROR) << "[Accessor]Flush, can't find opened path, file:" << key;//error here
// LOG(ERROR) << "[TestOutPut]Accessor Flush, fileLock is "<<fileLock->second->load();
}
size_t realSize = 0;
std::map<std::string, std::string> realHeaders;
if (SUCCESS == res) {
realSize = ent->GetRealsize();
// file size >= 10G stop LinUCB
if(realSize >= 10737418240)
stopLinUCBThread.store(true, std::memory_order_release);
for (auto &it : ent->GetOriginalHeaders()) {
realHeaders[it.first] = it.second;
}
@ -244,7 +373,7 @@ int HybridCacheAccessor4S3fs::Flush(const std::string &key) {
char *buf = nullptr;
while(0 != posix_memalign((void **) &buf, 4096, realSize));
ByteBuffer buffer(buf, realSize);
if (SUCCESS == res) {
if (SUCCESS == res) {//try to get
const size_t chunkSize = GetGlobalConfig().write_chunk_size * 2;
const uint64_t chunkNum = realSize / chunkSize + (realSize % chunkSize == 0 ? 0 : 1);
std::vector<Json::Value> jsonRoots(chunkNum);
@ -273,6 +402,8 @@ int HybridCacheAccessor4S3fs::Flush(const std::string &key) {
}
}
// LOG(WARNING) << "[TestOutPut]Flush, Get File "<<key<<" From WriteCache/ReadCache res : "<< res;
if (SUCCESS == res && !cfg_.UseGlobalCache) { // Get success
while(!tokenBucket_->consume(realSize)); // upload flow control
res = dataAdaptor_->UpLoad(key, realSize, buffer, realHeaders).get();
@ -285,12 +416,13 @@ int HybridCacheAccessor4S3fs::Flush(const std::string &key) {
// folly via is not executed immediately, so use separate thread
std::thread t([this, key, res]() {
if (SUCCESS == res) // upload success
writeCache_->Delete(key);
writeCache_->Delete(key); //delete here
auto fileLock = fileLock_.find(key);
if (fileLock_.end() != fileLock) {
fileLock->second->store(0);
fileLock_.erase(fileLock); // release exclusive lock
}
// LOG(WARNING) << "[TestOutPut]Flush, Delete File "<<key<<" From WriteCache success";
});
t.detach();
@ -314,12 +446,14 @@ int HybridCacheAccessor4S3fs::Flush(const std::string &key) {
if (buf) free(buf);
});
}
// LOG(WARNING) << "[TestOutPut]Flush, Flush File "<<key<<" To ReadCache success";
if (EnableLogging) {
double totalTime = std::chrono::duration<double, std::milli>(
std::chrono::steady_clock::now() - startTime).count();
LOG(INFO) << "[Accessor]Flush end, key:" << key << ", size:" << realSize
<< ", res:" << res << ", time:" << totalTime << "ms";
LOG(WARNING) << "[Accessor]Flush end, key:" << key << ", size:" << realSize
<< ", res:" << res << ", time:" << totalTime << "ms";
}
return res;
}
@ -482,6 +616,7 @@ int HybridCacheAccessor4S3fs::FsSync() {
if (backFlushRunning_.compare_exchange_weak(expected, true))
break;
}
// LOG(WARNING) << "[TestOutPut] expected locked";
std::map<std::string, time_t> files;
writeCache_->GetAllKeys(files);
@ -490,16 +625,20 @@ int HybridCacheAccessor4S3fs::FsSync() {
[](std::pair<std::string, time_t> lhs, std::pair<std::string, time_t> rhs) {
return lhs.second < rhs.second;
});
// LOG(WARNING) << "[TestOutPut] Sorting finished";
std::vector<folly::Future<int>> fs;
for (auto& file : filesVec) {
std::string key = file.first;
// LOG(WARNING) << "[TestOutPut] FsSync, start flush file:" << key;
fs.emplace_back(folly::via(executor_.get(), [this, key]() {
int res = this->Flush(key);
if (res) {
LOG(ERROR) << "[Accessor]FsSync, flush error in FsSync, file:" << key
<< ", res:" << res;
}
// else
// LOG(WARNING) << "[TestOutPut] FsSync, flush success in FsSync, file:" << key;
return res;
}));
}
@ -520,19 +659,60 @@ bool HybridCacheAccessor4S3fs::UseGlobalCache() {
return cfg_.UseGlobalCache;
}
void HybridCacheAccessor4S3fs::BackGroundFlush() {
void HybridCacheAccessor4S3fs::BackGroundFlush()
{
LOG(WARNING) << "[Accessor]BackGroundFlush start";
while (!toStop_.load(std::memory_order_acquire)) {
if (WriteCacheRatio() < cfg_.BackFlushCacheRatio) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
continue;
// while (!toStop_.load(std::memory_order_acquire)) {
// if (WriteCacheRatio() < cfg_.BackFlushCacheRatio) {
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
// continue;
// }
// LOG(WARNING) << "[Accessor]BackGroundFlush radically, write cache ratio:"
// << WriteCacheRatio();
// FsSync();
// }
// if (0 < writeCache_->GetCacheSize()) {
// FsSync();
// }
// LOG(WARNING) << "[Accessor]BackGroundFlush end";
if(cfg_.EnableResize || cfg_.EnableResize)
{
while (!toStop_.load(std::memory_order_acquire))
{
if (WritePoolRatio() < cfg_.BackFlushCacheRatio)
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
continue;
}
LOG(WARNING) << "[Accessor]BackGroundFlush radically, write pool ratio:"<< WritePoolRatio();
// LOG(WARNING) << "[TestOutPut] Before Flush, Write Pool size : " <<writeCache_->GetCacheSize();
FsSync();
// LOG(WARNING) << "[TestOutPut] After Flush, Write Pool size : "<<writeCache_->GetCacheSize()<<" , Ratio : "<<WritePoolRatio();
}
if (0 < writeCache_->GetCacheSize())
{
FsSync();
}
LOG(WARNING) << "[Accessor]BackGroundFlush radically, write cache ratio:"
<< WriteCacheRatio();
FsSync();
}
if (0 < writeCache_->GetCacheSize()) {
FsSync();
else
{
while (!toStop_.load(std::memory_order_acquire))
{
if (WriteCacheRatio() < cfg_.BackFlushCacheRatio)
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
continue;
}
LOG(WARNING) << "[Accessor]BackGroundFlush radically, write cache ratio:"<< WriteCacheRatio();
// LOG(WARNING) << "[TestOutPut] Before Flush, Write cache size : "<<writeCache_->GetCacheSize();
FsSync();
// LOG(WARNING) << "[TestOutPut] After Flush, Write cache size : "<<writeCache_->GetCacheSize()<<" , Ratio : "<<WriteCacheRatio();
}
if (0 < writeCache_->GetCacheSize()) //仍有文件未存储
{
// LOG(WARNING) << "[TestOutPut] Final BackGroundFlush ";
FsSync();
}
}
LOG(WARNING) << "[Accessor]BackGroundFlush end";
}
@ -544,11 +724,219 @@ void HybridCacheAccessor4S3fs::InitLog() {
google::InitGoogleLogging("hybridcache");
}
uint32_t HybridCacheAccessor4S3fs::WriteCacheRatio() {
uint32_t HybridCacheAccessor4S3fs::WriteCacheRatio()
{
return writeCache_->GetCacheSize() * 100 / writeCache_->GetCacheMaxSize();
}
bool HybridCacheAccessor4S3fs::IsWriteCacheFull(size_t len) {
//cfg_.EnableResize || cfg_.EnableResize
uint32_t HybridCacheAccessor4S3fs::WritePoolRatio()
{
return writeCache_->GetCacheSize() * 100 / writeCacheSize_ ;
}
bool HybridCacheAccessor4S3fs::IsWriteCacheFull(size_t len)
{
return writeCache_->GetCacheSize() + len >=
(writeCache_->GetCacheMaxSize() * cfg_.WriteCacheCfg.CacheSafeRatio / 100);
}
//if(cfg_.EnableResize || cfg_.EnableLinUCB)
bool HybridCacheAccessor4S3fs::IsWritePoolFull(size_t len)
{
return writeCache_->GetCacheSize() + len >=
(writeCacheSize_ * cfg_.WriteCacheCfg.CacheSafeRatio / 100);
}
//--------------------added by tqy referring to xyq-----------
void HybridCacheAccessor4S3fs::LinUCBClient()
{
LOG(WARNING) << "[LinUCB] LinUCBThread start";
uint32_t resizeInterval_ = 5;//暂定5s
while(!stopLinUCBThread)
{
int client_socket;
struct sockaddr_in server_address;
//为空则不调节
if(writeByteAcc_ == 0 && readByteAcc_ == 0)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
}
if ((client_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
LOG(WARNING) << "[LinUCB] Socket creation error";
break;
}
memset(&server_address, '0', sizeof(server_address));
server_address.sin_family = AF_INET;
server_address.sin_port = htons(IP_PORT);
if (inet_pton(AF_INET, IP_ADDR, &server_address.sin_addr) <= 0) {
LOG(WARNING) << "[LinUCB] Invalid address/ Address not supported";
break;
}
// 连接服务器
if (connect(client_socket, (struct sockaddr *)&server_address, sizeof(server_address)) < 0) {
LOG(WARNING) << "[LinUCB] Connection Failed";
break;
}
LOG(INFO) << "[LinUCB] poolStats begin sent";
// 发送每个Pool的Throughput
LOG(INFO) << "[LinUCB] writeByteAcc_ : " << writeByteAcc_ << ", writeTimeAcc_ :" << writeTimeAcc_;
LOG(INFO) << "[LinUCB] readByteAcc_ : " << readByteAcc_ << ", readTimeAcc_ :" << readTimeAcc_;
if(writeTimeAcc_ == 0) writeTimeAcc_ = 1;
if(readTimeAcc_ == 0) readTimeAcc_ = 1;
// double writeThroughput = (double)writeByteAcc_/(double)writeTimeAcc_/1024/1024;
// double readThroughput = (double)readByteAcc_/(double)readTimeAcc_/ 1024/1024;
double writeThroughput = (double)writeByteAcc_/(double)writeTimeAcc_/1024 / 1024 ;// 20643184640
double readThroughput = (double)readByteAcc_/(double)readTimeAcc_/ 1024 / 1024;
writeThroughput = std::tanh(writeThroughput);
readThroughput = std::tanh(readThroughput);
LOG(INFO) << "[LinUCB] writeThroughput : " << writeThroughput
<< " readThroughput : " << readThroughput;
// 清空
writeByteAcc_ = 0;
writeTimeAcc_ = 0;
readByteAcc_ = 0;
readTimeAcc_ = 0;
// 发送poolStats数据
std::ostringstream oss;
oss << "WritePool" << ":" << writeCacheSize_/fixSize << ";" // 当前配置
<< "ReadPool" << ":" << readCacheSize_/fixSize << ";"
<< "WritePool" << ":" << writeThroughput << ";" // 吞吐量
<< "ReadPool" << ":" << readThroughput << ";"
<< "WritePool" << ":" << writeCount_ << ";" // context
<< "ReadPool" << ":" << readCount_ << ";";
writeCount_ = 0;
readCount_ = 0;
std::string poolStats = oss.str();
LOG(INFO) << "[LinUCB] " << oss.str(); // log
if(send(client_socket, poolStats.c_str(), poolStats.size(), 0) < 0){
LOG(INFO) << "[LinUCB] Error sending data.";
break;
}
LOG(INFO) << "[LinUCB] poolStats message sent successfully";
// 接收new pool size数据
char newSize[1024] = {0};
auto readLen = read(client_socket, newSize, 1024);
std::string serialized_data(newSize);
LOG(INFO) << "[LinUCB] Received data: " << serialized_data << std::endl; // log
std::istringstream iss(serialized_data);
// 解析poolName
std::vector<std::string> workloads;
std::string line;
std::getline(iss, line); // 读取第一行
std::istringstream lineStream(line);
std::string poolName;
while (lineStream >> poolName) {
workloads.push_back(poolName);
}
// 解析cache_size
std::vector<int64_t> cache_sizes;
std::getline(iss, line); // 读取第二行
std::istringstream cacheSizeStream(line);
int64_t cacheSize;
while (cacheSizeStream >> cacheSize) {
cache_sizes.push_back(cacheSize);
}
// for(int i = 0; i < workloads.size(); ++i){
// LOG(INFO) << "[LinUCB] " << workloads[i] << ":" << cache_sizes[i];
// }
// LOG(WARNING) << "[LinUCB] newConfig end recv";
close(client_socket);
LOG(INFO) << "[LinUCB] Before Resize, Write Pool Size is "<<writeCacheSize_
<<" , Read Pool Size is "<<readCacheSize_;
// 调整 pool size
int64_t deltaWrite = (int64_t)cache_sizes[0]*fixSize - ResizeWriteCache_->getPoolStats(writePoolId_).poolSize;
int64_t deltaRead = (int64_t)cache_sizes[1]*fixSize - ResizeReadCache_->getPoolStats(readPoolId_).poolSize;
bool writeRes = false;
bool readRes = false;
int64_t shrinkSize = 0;
//先shrink后grow
if (deltaWrite < 0) {
//如果writecache要缩小超过可分配容量的部分应当先发起FsSync
// LOG(WARNING) << "[LinUCB]Request FsSync first";
// FsSync();
int64_t freeSize = ResizeWriteCache_->getPoolStats(writePoolId_).poolSize - writeCache_->GetCacheSize();
LOG(INFO) << "[LinUCB] WritePool Free Size : " << freeSize;
if(freeSize - reserveSize < -deltaWrite){
deltaWrite = -(freeSize - reserveSize)/fixSize * fixSize ;
}
LOG(INFO) << "[LinUCB] WriteCache shrinkPool size:" << deltaWrite;
writeRes = ResizeWriteCache_->shrinkPool(writePoolId_, -deltaWrite);
if(writeRes){
LOG(INFO) << "[LinUCB] WriteCache shrinkPool succ";
shrinkSize = shrinkSize +(-deltaWrite);
}
else
{
LOG(ERROR) << "[LinUCB] WriteCache shrinkPool failed";
}
}
if (deltaRead < 0) {
if(ResizeReadCache_->getPoolStats(readPoolId_).poolSize - 2*reserveSize < -deltaRead)
{
deltaRead = -(ResizeReadCache_->getPoolStats(readPoolId_).poolSize - 2*reserveSize)/fixSize*fixSize;
}
LOG(INFO) << "[LinUCB] ReadCache shrinkPool size:" << deltaRead;
readRes = ResizeReadCache_->shrinkPool(readPoolId_, -deltaRead);
if(readRes){
LOG(INFO) << "[LinUCB] ReadCache shrinkPool succ";
shrinkSize = shrinkSize + (-deltaRead);
}
else
{
LOG(ERROR) << "[LinUCB] ReadCache shrinkPool failed";
}
}
//grow
if (deltaWrite > 0)
{
if(deltaWrite > shrinkSize){
deltaWrite = shrinkSize;
}
LOG(INFO) << "[LinUCB] WriteCache growPool size:" << deltaWrite;
writeRes = ResizeWriteCache_->growPool(writePoolId_, deltaWrite);
if(writeRes){
shrinkSize = shrinkSize - deltaWrite;
}
}
if (deltaRead > 0) {
if(deltaRead > shrinkSize){
deltaRead = shrinkSize;
}
LOG(INFO) << "[LinUCB] ReadCache growPool size:" << deltaRead;
readRes = ResizeReadCache_->growPool(readPoolId_, deltaRead);
if(readRes){
shrinkSize = shrinkSize - deltaRead;
}
}
writeCacheSize_ = ResizeWriteCache_->getPoolStats(writePoolId_).poolSize;
readCacheSize_ = ResizeReadCache_->getPoolStats(readPoolId_).poolSize;
LOG(INFO) << "[LinUCB] After Resize, Write Pool Size is "<<writeCacheSize_
<<" , Read Pool Size is "<<readCacheSize_;
// 每隔 resizeInterval_ 秒调整一次
std::this_thread::sleep_for(std::chrono::seconds(resizeInterval_));
// LOG(INFO) << "[LinUCB] " << butil::cpuwide_time_us();
}
LOG(WARNING) << "[LinUCB] LinUCBThread stop";
}

View File

@ -12,6 +12,13 @@
#include "accessor.h"
using atomic_ptr_t = std::shared_ptr<std::atomic<int>>;
//added by tqy referring to xyq
using Cache = facebook::cachelib::LruAllocator;
using facebook::cachelib::PoolId;
#define MAXSIZE 1024
#define IP_ADDR "127.0.0.1"
#define IP_PORT 2333 // 服务器端口
//added end---------
class HybridCacheAccessor4S3fs : public HybridCache::HybridCacheAccessor {
public:
@ -19,6 +26,11 @@ class HybridCacheAccessor4S3fs : public HybridCache::HybridCacheAccessor {
~HybridCacheAccessor4S3fs();
void Init();
//added by tqy referring to xyq
int InitCache();
void LinUCBClient();
//end-------------
void Stop();
int Put(const std::string &key, size_t start, size_t len, const char* buf);
@ -60,6 +72,28 @@ class HybridCacheAccessor4S3fs : public HybridCache::HybridCacheAccessor {
std::atomic<bool> toStop_{false};
std::atomic<bool> backFlushRunning_{false};
std::thread bgFlushThread_;
//----------added by tqy referring to xyq for Resizing--------
std::shared_ptr<Cache> ResizeWriteCache_;
std::shared_ptr<Cache> ResizeReadCache_;
PoolId writePoolId_;
PoolId readPoolId_;
uint64_t writeCount_ = 0;
uint64_t readCount_ = 0;
uint64_t writeCacheSize_;
uint64_t readCacheSize_;
int64_t fixSize = 1024 * 1024 * 256; // 256M为分配单位
int64_t reserveSize = 1024 * 1024 * 512; //512M保留空间
//----------added by tqy referring to xyq for LinUCB--------
std::thread LinUCBThread;
std::atomic<bool> stopLinUCBThread{false};
uint64_t writeByteAcc_ = 0;
uint64_t readByteAcc_ = 0;
uint64_t writeTimeAcc_ = 0;
uint64_t readTimeAcc_ = 0;
uint32_t resizeInterval_;
uint32_t WritePoolRatio();
bool IsWritePoolFull(size_t len);
};
#endif // HYBRIDCACHE_ACCESSOR_4_S3FS_H_