style: change to flake8 style

This commit is contained in:
zweien 2021-07-12 16:03:42 +08:00
parent 897293b698
commit e4c1a46501
10 changed files with 1056 additions and 617 deletions

View File

@ -1,2 +1,2 @@
[flake8] [flake8]
ignore = E203, W503, E501, E231, F401, F403 ignore = E203, W503, E501, E231, F401, F403, E722, E731

View File

@ -64,7 +64,7 @@ html_theme = 'sphinx_rtd_theme'
html_static_path = ['_static'] html_static_path = ['_static']
# for MarkdownParser # for MarkdownParser
from sphinx_markdown_parser.parser import MarkdownParser from sphinx_markdown_parser.parser import MarkdownParser # noqa
# def setup(app): # def setup(app):

View File

@ -8,16 +8,16 @@ import os
import torch import torch
# parameter phase # parameter phase
L = 1. L = 1.0
# define geometry # define geometry
geo = sc.Line1D(-1.0, 1.0) geo = sc.Line1D(-1.0, 1.0)
# define sympy varaibles to parametize domain curves # define sympy varaibles to parametize domain curves
t_symbol = Symbol('t') t_symbol = Symbol("t")
x = Symbol('x') x = Symbol("x")
u = sp.Function('u')(x, t_symbol) u = sp.Function("u")(x, t_symbol)
up = sp.Function('up')(x, t_symbol) up = sp.Function("up")(x, t_symbol)
time_range = {t_symbol: (0, L)} time_range = {t_symbol: (0, L)}
@ -25,52 +25,62 @@ time_range = {t_symbol: (0, L)}
@sc.datanode @sc.datanode
class AllenInit(sc.SampleDomain): class AllenInit(sc.SampleDomain):
def sampling(self, *args, **kwargs): def sampling(self, *args, **kwargs):
return geo.sample_interior(density=300, param_ranges={t_symbol: 0.0}), \ return geo.sample_interior(density=300, param_ranges={t_symbol: 0.0}), {
{'u': x ** 2 * sp.cos(sp.pi * x), 'lambda_u': 100} "u": x ** 2 * sp.cos(sp.pi * x),
"lambda_u": 100,
}
@sc.datanode @sc.datanode
class AllenBc(sc.SampleDomain): class AllenBc(sc.SampleDomain):
def sampling(self, *args, **kwargs): def sampling(self, *args, **kwargs):
return geo.sample_boundary(density=200, sieve=sp.Eq(x, -1), param_ranges=time_range), \ return geo.sample_boundary(
{'difference_u_up': 0, density=200, sieve=sp.Eq(x, -1), param_ranges=time_range
'difference_diff_u_diff_up': 0, ), {
"difference_u_up": 0,
"difference_diff_u_diff_up": 0,
} }
@sc.datanode(name='allen_domain') @sc.datanode(name="allen_domain")
class AllenEq(sc.SampleDomain): class AllenEq(sc.SampleDomain):
def __init__(self): def __init__(self):
self.points = geo.sample_interior(density=2000, param_ranges=time_range, low_discrepancy=True) self.points = geo.sample_interior(
density=2000, param_ranges=time_range, low_discrepancy=True
)
def sampling(self, *args, **kwargs): def sampling(self, *args, **kwargs):
constraints = {'AllenCahn_u': 0} constraints = {"AllenCahn_u": 0}
return self.points, constraints return self.points, constraints
@sc.datanode(name='data_evaluate') @sc.datanode(name="data_evaluate")
class AllenPointsInference(sc.SampleDomain): class AllenPointsInference(sc.SampleDomain):
def __init__(self): def __init__(self):
self.points = geo.sample_interior(density=5000, param_ranges=time_range, low_discrepancy=True) self.points = geo.sample_interior(
density=5000, param_ranges=time_range, low_discrepancy=True
)
self.points = sc.Variables(self.points).to_torch_tensor_() self.points = sc.Variables(self.points).to_torch_tensor_()
self.constraints = {'AllenCahn_u': torch.zeros_like(self.points['x'])} self.constraints = {"AllenCahn_u": torch.zeros_like(self.points["x"])}
def sampling(self, *args, **kwargs): def sampling(self, *args, **kwargs):
return self.points, self.constraints return self.points, self.constraints
@sc.datanode(name='re_sampling_domain') @sc.datanode(name="re_sampling_domain")
class SpaceAdaptiveSampling(sc.SampleDomain): class SpaceAdaptiveSampling(sc.SampleDomain):
def __init__(self): def __init__(self):
self.points = geo.sample_interior(density=100, param_ranges=time_range, low_discrepancy=True) self.points = geo.sample_interior(
density=100, param_ranges=time_range, low_discrepancy=True
)
self.points = sc.Variables(self.points).to_torch_tensor_() self.points = sc.Variables(self.points).to_torch_tensor_()
self.constraints = {'AllenCahn_u': torch.zeros_like(self.points['x'])} self.constraints = {"AllenCahn_u": torch.zeros_like(self.points["x"])}
def sampling(self, *args, **kwargs): def sampling(self, *args, **kwargs):
return self.points, self.constraints return self.points, self.constraints
@sc.datanode(name='allen_test') @sc.datanode(name="allen_test")
def generate_plot_data(): def generate_plot_data():
x = np.linspace(-1.0, 1.0, 100) x = np.linspace(-1.0, 1.0, 100)
t = np.linspace(0, 1.0, 100) t = np.linspace(0, 1.0, 100)
@ -82,76 +92,122 @@ def generate_plot_data():
# computational node phase # computational node phase
net_u = sc.MLP([2, 128, 128, 128, 128, 2], activation=sc.Activation.tanh) net_u = sc.MLP([2, 128, 128, 128, 128, 2], activation=sc.Activation.tanh)
net_u = sc.NetNode(inputs=('x', 't',), outputs=('u',), name='net1', net=net_u) net_u = sc.NetNode(
xp = sc.ExpressionNode(name='xp', expression=x + 2) inputs=(
get_tilde_u = sc.get_shared_net_node(net_u, inputs=('xp', 't',), outputs=('up',), name='net2', arch='mlp') "x",
"t",
),
outputs=("u",),
name="net1",
net=net_u,
)
xp = sc.ExpressionNode(name="xp", expression=x + 2)
get_tilde_u = sc.get_shared_net_node(
net_u,
inputs=(
"xp",
"t",
),
outputs=("up",),
name="net2",
arch="mlp",
)
diff_u = sc.ExpressionNode(expression=u.diff(x), name='diff_u') diff_u = sc.ExpressionNode(expression=u.diff(x), name="diff_u")
diff_up = sc.ExpressionNode(expression=up.diff(x), name='diff_up') diff_up = sc.ExpressionNode(expression=up.diff(x), name="diff_up")
pde = sc.AllenCahnNode(u='u', gamma_1=0.0001, gamma_2=5) pde = sc.AllenCahnNode(u="u", gamma_1=0.0001, gamma_2=5)
boundary_up = sc.Difference(T='diff_u', S='diff_up') boundary_up = sc.Difference(T="diff_u", S="diff_up")
boundary_u = sc.Difference(T='u', S='up') boundary_u = sc.Difference(T="u", S="up")
# Receiver hook phase # Receiver hook phase
class SpaceAdaptiveReceiver(sc.Receiver): class SpaceAdaptiveReceiver(sc.Receiver):
def receive_notify(self, solver, message): def receive_notify(self, solver, message):
if sc.Signal.TRAIN_PIPE_END in message.keys() and solver.global_step % 1000 == 0: if (
sc.logger.info('space adaptive sampling...') sc.Signal.TRAIN_PIPE_END in message.keys()
results = solver.infer_step({'data_evaluate': ['x', 't', 'sdf', 'AllenCahn_u']}) and solver.global_step % 1000 == 0
residual_data = results['data_evaluate']['AllenCahn_u'].detach().cpu().numpy().ravel() ):
sc.logger.info("space adaptive sampling...")
results = solver.infer_step(
{"data_evaluate": ["x", "t", "sdf", "AllenCahn_u"]}
)
residual_data = (
results["data_evaluate"]["AllenCahn_u"].detach().cpu().numpy().ravel()
)
# sort the points by residual loss # sort the points by residual loss
index = np.argsort(-1. * np.abs(residual_data))[:200] index = np.argsort(-1.0 * np.abs(residual_data))[:200]
_points = {key: values[index].detach().cpu().numpy() for key, values in results['data_evaluate'].items()} _points = {
_points.pop('AllenCahn_u') key: values[index].detach().cpu().numpy()
_points['area'] = np.zeros_like(_points['sdf']) + (1.0 / 200) for key, values in results["data_evaluate"].items()
solver.set_domain_parameter('re_sampling_domain', {'points': _points}) }
_points.pop("AllenCahn_u")
_points["area"] = np.zeros_like(_points["sdf"]) + (1.0 / 200)
solver.set_domain_parameter("re_sampling_domain", {"points": _points})
class PostProcessReceiver(sc.Receiver): class PostProcessReceiver(sc.Receiver):
def __init__(self): def __init__(self):
if not os.path.exists('image'): if not os.path.exists("image"):
os.mkdir('image') os.mkdir("image")
def receive_notify(self, solver, message): def receive_notify(self, solver, message):
if sc.Signal.TRAIN_PIPE_END in message.keys() and solver.global_step % 1000 == 1: if (
sc.logger.info('Post Processing...') sc.Signal.TRAIN_PIPE_END in message.keys()
points = s.infer_step({'allen_test': ['x', 't', 'u']}) and solver.global_step % 1000 == 1
triang_total = tri.Triangulation(points['allen_test']['t'].detach().cpu().numpy().ravel(), ):
points['allen_test']['x'].detach().cpu().numpy().ravel(), ) sc.logger.info("Post Processing...")
plt.tricontourf(triang_total, points['allen_test']['u'].detach().cpu().numpy().ravel(), 100, vmin=-1, points = s.infer_step({"allen_test": ["x", "t", "u"]})
vmax=1) triang_total = tri.Triangulation(
points["allen_test"]["t"].detach().cpu().numpy().ravel(),
points["allen_test"]["x"].detach().cpu().numpy().ravel(),
)
plt.tricontourf(
triang_total,
points["allen_test"]["u"].detach().cpu().numpy().ravel(),
100,
vmin=-1,
vmax=1,
)
tc_bar = plt.colorbar() tc_bar = plt.colorbar()
tc_bar.ax.tick_params(labelsize=12) tc_bar.ax.tick_params(labelsize=12)
_points = solver.get_domain_parameter('re_sampling_domain', 'points') _points = solver.get_domain_parameter("re_sampling_domain", "points")
if not isinstance(_points['t'], torch.Tensor): if not isinstance(_points["t"], torch.Tensor):
plt.scatter(_points['t'].ravel(), _points['x'].ravel(), marker='x', s=8) plt.scatter(_points["t"].ravel(), _points["x"].ravel(), marker="x", s=8)
else: else:
plt.scatter(_points['t'].detach().cpu().numpy().ravel(), plt.scatter(
_points['x'].detach().cpu().numpy().ravel(), marker='x', s=8) _points["t"].detach().cpu().numpy().ravel(),
_points["x"].detach().cpu().numpy().ravel(),
marker="x",
s=8,
)
plt.xlabel('$t$') plt.xlabel("$t$")
plt.ylabel('$x$') plt.ylabel("$x$")
plt.title('$u(x,t)$') plt.title("$u(x,t)$")
plt.savefig(f'image/result_{solver.global_step}.png') plt.savefig(f"image/result_{solver.global_step}.png")
plt.show() plt.show()
# Solver phase # Solver phase
s = sc.Solver(sample_domains=(AllenInit(), s = sc.Solver(
sample_domains=(
AllenInit(),
AllenBc(), AllenBc(),
AllenEq(), AllenEq(),
AllenPointsInference(), AllenPointsInference(),
SpaceAdaptiveSampling(), SpaceAdaptiveSampling(),
generate_plot_data()), generate_plot_data(),
),
netnodes=[net_u, get_tilde_u], netnodes=[net_u, get_tilde_u],
pdes=[pde, xp, diff_up, diff_u, boundary_up, boundary_u], pdes=[pde, xp, diff_up, diff_u, boundary_up, boundary_u],
max_iter=60000, max_iter=60000,
loading=True) loading=True,
)
s.register_receiver(SpaceAdaptiveReceiver()) s.register_receiver(SpaceAdaptiveReceiver())
s.register_receiver(PostProcessReceiver()) s.register_receiver(PostProcessReceiver())

View File

@ -13,7 +13,7 @@ __all__ = ['GradientReceiver', 'SummaryReceiver', 'HandleResultReceiver']
class GradientReceiver(Receiver): class GradientReceiver(Receiver):
"""Register the receiver to monitor gradient norm on the Tensorboard.""" """Register the receiver to monitor gradient norm on the Tensorboard."""
def receive_notify(self, solver: 'Solver', message): def receive_notify(self, solver: 'Solver', message): # noqa
if not (Signal.TRAIN_PIPE_END in message): if not (Signal.TRAIN_PIPE_END in message):
return return
for netnode in solver.netnodes: for netnode in solver.netnodes:
@ -34,7 +34,7 @@ class SummaryReceiver(SummaryWriter, Receiver):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
SummaryWriter.__init__(self, *args, **kwargs) SummaryWriter.__init__(self, *args, **kwargs)
def receive_notify(self, solver: 'Solver', message: Dict): def receive_notify(self, solver: 'Solver', message: Dict): # noqa
if Signal.AFTER_COMPUTE_LOSS in message.keys(): if Signal.AFTER_COMPUTE_LOSS in message.keys():
loss_component = message[Signal.AFTER_COMPUTE_LOSS] loss_component = message[Signal.AFTER_COMPUTE_LOSS]
self.add_scalars('loss_overview', loss_component, solver.global_step) self.add_scalars('loss_overview', loss_component, solver.global_step)
@ -51,7 +51,7 @@ class HandleResultReceiver(Receiver):
def __init__(self, result_dir): def __init__(self, result_dir):
self.result_dir = result_dir self.result_dir = result_dir
def receive_notify(self, solver: 'Solver', message: Dict): def receive_notify(self, solver: 'Solver', message: Dict): # noqa
if Signal.SOLVE_END in message.keys(): if Signal.SOLVE_END in message.keys():
samples = solver.sample_variables_from_domains() samples = solver.sample_variables_from_domains()
in_var, _, lambda_out = solver.generate_in_out_dict(samples) in_var, _, lambda_out = solver.generate_in_out_dict(samples)

View File

@ -23,7 +23,7 @@ class CheckMeta(type):
class AbsGeoObj(metaclass=abc.ABCMeta): class AbsGeoObj(metaclass=abc.ABCMeta):
@abc.abstractmethod @abc.abstractmethod
def rotation(self, angle: float, axis: str = 'z'): def rotation(self, angle: float, axis: str = "z"):
pass pass
@abc.abstractmethod @abc.abstractmethod
@ -43,16 +43,24 @@ class Edge(AbsGeoObj):
@property @property
def axes(self) -> List[str]: def axes(self) -> List[str]:
return [key for key in self.functions if not key.startswith('normal')] return [key for key in self.functions if not key.startswith("normal")]
def rotation(self, angle: float, axis: str = 'z'): def rotation(self, angle: float, axis: str = "z"):
assert len(self.axes) > 1, 'Cannot rotate a object with dim<2' assert len(self.axes) > 1, "Cannot rotate a object with dim<2"
rotated_dims = [key for key in self.axes if key != axis] rotated_dims = [key for key in self.axes if key != axis]
rd1, rd2, n = rotated_dims[0], rotated_dims[1], 'normal_' rd1, rd2, n = rotated_dims[0], rotated_dims[1], "normal_"
self.functions[rd1] = (cos(angle) * self.functions[rd1] - sin(angle) * self.functions[rd2]) self.functions[rd1] = (
self.functions[n + rd1] = cos(angle) * self.functions[n + rd1] - sin(angle) * self.functions[n + rd2] cos(angle) * self.functions[rd1] - sin(angle) * self.functions[rd2]
self.functions[rd2] = (sin(angle) * self.functions[rd1] + cos(angle) * self.functions[rd2]) )
self.functions[n + rd2] = sin(angle) * self.functions[n + rd1] + cos(angle) * self.functions[n + rd2] self.functions[n + rd1] = (
cos(angle) * self.functions[n + rd1] - sin(angle) * self.functions[n + rd2]
)
self.functions[rd2] = (
sin(angle) * self.functions[rd1] + cos(angle) * self.functions[rd2]
)
self.functions[n + rd2] = (
sin(angle) * self.functions[n + rd1] + cos(angle) * self.functions[n + rd2]
)
return self return self
def scaling(self, scale: float): def scaling(self, scale: float):
@ -62,20 +70,28 @@ class Edge(AbsGeoObj):
return self return self
def translation(self, direction): def translation(self, direction):
assert len(direction) == len(self.axes), 'Moving direction must have the save dimension with the object' assert len(direction) == len(
self.axes
), "Moving direction must have the save dimension with the object"
for key, x in zip(self.axes, direction): for key, x in zip(self.axes, direction):
self.functions[key] += x self.functions[key] += x
return self return self
def sample(self, density: int, param_ranges=None, low_discrepancy=False) -> Dict[str, np.ndarray]: def sample(
self, density: int, param_ranges=None, low_discrepancy=False
) -> Dict[str, np.ndarray]:
param_ranges = {} if param_ranges is None else param_ranges param_ranges = {} if param_ranges is None else param_ranges
inputs = {**self.ranges, **param_ranges}.keys() inputs = {**self.ranges, **param_ranges}.keys()
area_fn = lambdify_np(self.area, inputs) area_fn = lambdify_np(self.area, inputs)
param_points = _ranged_sample(100, ranges={**self.ranges, **param_ranges}) param_points = _ranged_sample(100, ranges={**self.ranges, **param_ranges})
nr_points = int(density * (np.mean(area_fn(**param_points)))) nr_points = int(density * (np.mean(area_fn(**param_points))))
lambdify_functions = {'area': lambda **x: area_fn(**x) / next(iter(x.values())).shape[0]} lambdify_functions = {
param_points = _ranged_sample(nr_points, {**self.ranges, **param_ranges}, low_discrepancy) "area": lambda **x: area_fn(**x) / next(iter(x.values())).shape[0]
}
param_points = _ranged_sample(
nr_points, {**self.ranges, **param_ranges}, low_discrepancy
)
data_var = {} data_var = {}
for key, function in self.functions.items(): for key, function in self.functions.items():
@ -105,104 +121,138 @@ class Geometry(AbsGeoObj, metaclass=AbsCheckMix):
if type(self) in [Geometry, Geometry1D, Geometry2D, Geometry3D]: if type(self) in [Geometry, Geometry1D, Geometry2D, Geometry3D]:
return return
if self.edges is None: if self.edges is None:
raise NotImplementedError('Geometry must define edges') raise NotImplementedError("Geometry must define edges")
if self.bounds is None: if self.bounds is None:
raise NotImplementedError('Geometry must define bounds') raise NotImplementedError("Geometry must define bounds")
if self.sdf is None: if self.sdf is None:
raise NotImplementedError('Geometry must define sdf') raise NotImplementedError("Geometry must define sdf")
@property @property
def axes(self) -> List[str]: def axes(self) -> List[str]:
return self.edges[0].axes return self.edges[0].axes
def translation(self, direction: Union[List, Tuple]) -> 'Geometry': def translation(self, direction: Union[List, Tuple]) -> "Geometry":
assert len(direction) == len(self.axes) assert len(direction) == len(self.axes)
[edge.translation(direction) for edge in self.edges] [edge.translation(direction) for edge in self.edges]
self.sdf = self.sdf.subs([(Symbol(dim), Symbol(dim) - x) for dim, x in zip(self.axes, direction)]) self.sdf = self.sdf.subs(
self.bounds = {dim: (self.bounds[dim][0] + x, self.bounds[dim][1] + x) for dim, x in zip(self.axes, direction)} [(Symbol(dim), Symbol(dim) - x) for dim, x in zip(self.axes, direction)]
)
self.bounds = {
dim: (self.bounds[dim][0] + x, self.bounds[dim][1] + x)
for dim, x in zip(self.axes, direction)
}
return self return self
def rotation(self, angle: float, axis: str = 'z', center=None) -> 'Geometry': def rotation(self, angle: float, axis: str = "z", center=None) -> "Geometry":
if center is not None: if center is not None:
self.translation([-x for x in center]) self.translation([-x for x in center])
[edge.rotation(angle, axis) for edge in self.edges] [edge.rotation(angle, axis) for edge in self.edges]
rotated_dims = [key for key in self.axes if key != axis] rotated_dims = [key for key in self.axes if key != axis]
sp_0 = Symbol(rotated_dims[0]) sp_0 = Symbol(rotated_dims[0])
_sp_0 = Symbol('tmp_0') _sp_0 = Symbol("tmp_0")
sp_1 = Symbol(rotated_dims[1]) sp_1 = Symbol(rotated_dims[1])
_sp_1 = Symbol('tmp_1') _sp_1 = Symbol("tmp_1")
self.sdf = self.sdf.subs({sp_0: cos(angle) * _sp_0 + sin(angle) * _sp_1, self.sdf = self.sdf.subs(
sp_1: - sin(angle) * _sp_0 + cos(angle) * _sp_1}) {
sp_0: cos(angle) * _sp_0 + sin(angle) * _sp_1,
sp_1: -sin(angle) * _sp_0 + cos(angle) * _sp_1,
}
)
self.sdf = self.sdf.subs({_sp_0: sp_0, _sp_1: sp_1}) self.sdf = self.sdf.subs({_sp_0: sp_0, _sp_1: sp_1})
self.bounds[rotated_dims[0]], self.bounds[rotated_dims[1]] = _rotate_rec(self.bounds[rotated_dims[0]], self.bounds[rotated_dims[0]], self.bounds[rotated_dims[1]] = _rotate_rec(
self.bounds[rotated_dims[1]], self.bounds[rotated_dims[0]], self.bounds[rotated_dims[1]], angle=angle
angle=angle) )
if center is not None: if center is not None:
self.translation(center) self.translation(center)
return self return self
def scaling(self, scale: float, center: Tuple = None) -> 'Geometry': def scaling(self, scale: float, center: Tuple = None) -> "Geometry":
assert scale > 0, 'scaling must be positive' assert scale > 0, "scaling must be positive"
if center is not None: if center is not None:
self.translation(tuple([-x for x in center])) self.translation(tuple([-x for x in center]))
[edge.scaling(scale) for edge in self.edges] [edge.scaling(scale) for edge in self.edges]
self.sdf = self.sdf.subs({Symbol(dim): Symbol(dim) / scale for dim in self.axes}) self.sdf = self.sdf.subs(
{Symbol(dim): Symbol(dim) / scale for dim in self.axes}
)
self.sdf = scale * self.sdf self.sdf = scale * self.sdf
for dim in self.axes: for dim in self.axes:
self.bounds[dim] = (self.bounds[dim][0] * scale, self.bounds[dim][1] * scale) self.bounds[dim] = (
self.bounds[dim][0] * scale,
self.bounds[dim][1] * scale,
)
if center is not None: if center is not None:
self.translation(center) self.translation(center)
return self return self
def duplicate(self) -> 'Geometry': def duplicate(self) -> "Geometry":
return copy.deepcopy(self) return copy.deepcopy(self)
def sample_boundary(self, density: int, sieve=None, param_ranges: Dict = None, low_discrepancy=False) -> Dict[ def sample_boundary(
str, np.ndarray]: self, density: int, sieve=None, param_ranges: Dict = None, low_discrepancy=False
) -> Dict[str, np.ndarray]:
param_ranges = dict() if param_ranges is None else param_ranges param_ranges = dict() if param_ranges is None else param_ranges
points_list = [edge.sample(density, param_ranges, low_discrepancy) for edge in points_list = [
self.edges] edge.sample(density, param_ranges, low_discrepancy) for edge in self.edges
points = reduce(lambda e1, e2: {_k: np.concatenate([e1[_k], e2[_k]], axis=0) for _k in e1}, points_list) ]
points = reduce(
lambda e1, e2: {_k: np.concatenate([e1[_k], e2[_k]], axis=0) for _k in e1},
points_list,
)
points = self._sieve_points(points, sieve, sign=-1, tol=1e-4) points = self._sieve_points(points, sieve, sign=-1, tol=1e-4)
return points return points
def _sieve_points(self, points, sieve, tol=1e-4, sign=1.): def _sieve_points(self, points, sieve, tol=1e-4, sign=1.0):
sdf_fn = lambdify_np(self.sdf, points.keys()) sdf_fn = lambdify_np(self.sdf, points.keys())
points['sdf'] = sdf_fn(**points) points["sdf"] = sdf_fn(**points)
criteria_fn = lambdify_np(True if sieve is None else sieve, points.keys()) criteria_fn = lambdify_np(True if sieve is None else sieve, points.keys())
criteria_index = np.logical_and(np.greater(points['sdf'], -tol), criteria_fn(**points)) criteria_index = np.logical_and(
np.greater(points["sdf"], -tol), criteria_fn(**points)
)
if sign == -1: if sign == -1:
criteria_index = np.logical_and(np.less(points['sdf'], tol), criteria_index) criteria_index = np.logical_and(np.less(points["sdf"], tol), criteria_index)
points = {k: v[criteria_index[:, 0], :] for k, v in points.items()} points = {k: v[criteria_index[:, 0], :] for k, v in points.items()}
return points return points
def sample_interior(self, density: int, bounds: Dict = None, sieve=None, param_ranges: Dict = None, def sample_interior(
low_discrepancy=False) -> Dict[str, np.ndarray]: self,
density: int,
bounds: Dict = None,
sieve=None,
param_ranges: Dict = None,
low_discrepancy=False,
) -> Dict[str, np.ndarray]:
bounds = self.bounds if bounds is None else bounds bounds = self.bounds if bounds is None else bounds
bounds = {Symbol(key) if isinstance(key, str) else key: value for key, value in bounds.items()} bounds = {
Symbol(key) if isinstance(key, str) else key: value
for key, value in bounds.items()
}
param_ranges = {} if param_ranges is None else param_ranges param_ranges = {} if param_ranges is None else param_ranges
measure = np.prod([value[1] - value[0] for value in bounds.values()]) measure = np.prod([value[1] - value[0] for value in bounds.values()])
nr_points = int(measure * density) nr_points = int(measure * density)
points = _ranged_sample(nr_points, {**bounds, **param_ranges}, low_discrepancy=low_discrepancy) points = _ranged_sample(
nr_points, {**bounds, **param_ranges}, low_discrepancy=low_discrepancy
)
assert len(points.keys()) >= 0, "No points have been sampled!" assert len(points.keys()) >= 0, "No points have been sampled!"
points = self._sieve_points(points, sieve, tol=0.) points = self._sieve_points(points, sieve, tol=0.0)
points['area'] = np.zeros_like(points['sdf']) + (1.0 / density) points["area"] = np.zeros_like(points["sdf"]) + (1.0 / density)
return points return points
def __add__(self, other: 'Geometry') -> 'Geometry': def __add__(self, other: "Geometry") -> "Geometry":
geo = self.generate_geo_obj(other) geo = self.generate_geo_obj(other)
geo.edges = self.edges + other.edges geo.edges = self.edges + other.edges
geo.sdf = WrapMax(self.sdf, other.sdf) geo.sdf = WrapMax(self.sdf, other.sdf)
geo.bounds = dict() geo.bounds = dict()
for key, value in self.bounds.items(): for key, value in self.bounds.items():
geo.bounds[key] = ( geo.bounds[key] = (
min(other.bounds[key][0], self.bounds[key][0]), max(other.bounds[key][1], self.bounds[key][1])) min(other.bounds[key][0], self.bounds[key][0]),
max(other.bounds[key][1], self.bounds[key][1]),
)
return geo return geo
def generate_geo_obj(self, other=None): def generate_geo_obj(self, other=None):
@ -219,7 +269,7 @@ class Geometry(AbsGeoObj, metaclass=AbsCheckMix):
raise TypeError raise TypeError
return geo return geo
def __sub__(self, other: 'Geometry') -> 'Geometry': def __sub__(self, other: "Geometry") -> "Geometry":
geo = self.generate_geo_obj(other) geo = self.generate_geo_obj(other)
geo.edges = self.edges + [_inverse_edge(edge) for edge in other.edges] geo.edges = self.edges + [_inverse_edge(edge) for edge in other.edges]
@ -229,22 +279,24 @@ class Geometry(AbsGeoObj, metaclass=AbsCheckMix):
geo.bounds[key] = (self.bounds[key][0], self.bounds[key][1]) geo.bounds[key] = (self.bounds[key][0], self.bounds[key][1])
return geo return geo
def __invert__(self) -> 'Geometry': def __invert__(self) -> "Geometry":
geo = self.generate_geo_obj() geo = self.generate_geo_obj()
geo.edges = [_inverse_edge(edge) for edge in self.edges] geo.edges = [_inverse_edge(edge) for edge in self.edges]
geo.sdf = WrapMul(-1, self.sdf) geo.sdf = WrapMul(-1, self.sdf)
for key, value in self.bounds.items(): for key, value in self.bounds.items():
geo.bounds[key] = (-float('inf'), float('inf')) geo.bounds[key] = (-float("inf"), float("inf"))
return geo return geo
def __and__(self, other: 'Geometry') -> 'Geometry': def __and__(self, other: "Geometry") -> "Geometry":
geo = self.generate_geo_obj(other) geo = self.generate_geo_obj(other)
geo.edges = self.edges + other.edges geo.edges = self.edges + other.edges
geo.sdf = WrapMin(self.sdf, other.sdf) geo.sdf = WrapMin(self.sdf, other.sdf)
geo.bounds = dict() geo.bounds = dict()
for key, value in self.bounds.items(): for key, value in self.bounds.items():
geo.bounds[key] = ( geo.bounds[key] = (
max(other.bounds[key][0], self.bounds[key][0]), min(other.bounds[key][1], self.bounds[key][1])) max(other.bounds[key][0], self.bounds[key][0]),
min(other.bounds[key][1], self.bounds[key][1]),
)
return geo return geo
@ -261,14 +313,16 @@ class Geometry3D(Geometry):
# todo: sample in cuda device # todo: sample in cuda device
def _ranged_sample(batch_size: int, ranges: Dict, low_discrepancy: bool = False) -> Dict[str, np.ndarray]: def _ranged_sample(
batch_size: int, ranges: Dict, low_discrepancy: bool = False
) -> Dict[str, np.ndarray]:
points = dict() points = dict()
low_discrepancy_stack = [] low_discrepancy_stack = []
for key, value in ranges.items(): for key, value in ranges.items():
if isinstance(value, (float, int)): if isinstance(value, (float, int)):
samples = np.ones((batch_size, 1)) * value samples = np.ones((batch_size, 1)) * value
elif isinstance(value, tuple): elif isinstance(value, tuple):
assert len(value) == 2, 'Tuple: length of range should be 2!' assert len(value) == 2, "Tuple: length of range should be 2!"
if low_discrepancy: if low_discrepancy:
low_discrepancy_stack.append((key.name, value)) low_discrepancy_stack.append((key.name, value))
continue continue
@ -277,10 +331,12 @@ def _ranged_sample(batch_size: int, ranges: Dict, low_discrepancy: bool = False)
elif isinstance(value, collections.Callable): elif isinstance(value, collections.Callable):
samples = value(batch_size) samples = value(batch_size)
else: else:
raise TypeError(f'range type {type(value)} not supported!') raise TypeError(f"range type {type(value)} not supported!")
points[key.name] = samples points[key.name] = samples
if low_discrepancy: if low_discrepancy:
low_discrepancy_points_dict = _low_discrepancy_sampling(batch_size, low_discrepancy_stack) low_discrepancy_points_dict = _low_discrepancy_sampling(
batch_size, low_discrepancy_stack
)
points.update(low_discrepancy_points_dict) points.update(low_discrepancy_points_dict)
for key, v in points.items(): for key, v in points.items():
points[key] = v.astype(np.float64) points[key] = v.astype(np.float64)
@ -289,8 +345,8 @@ def _ranged_sample(batch_size: int, ranges: Dict, low_discrepancy: bool = False)
def _rotate_rec(x: Tuple, y: Tuple, angle: float): def _rotate_rec(x: Tuple, y: Tuple, angle: float):
points = itertools.product(x, y) points = itertools.product(x, y)
min_x, min_y = float('inf'), float('inf') min_x, min_y = float("inf"), float("inf")
max_x, max_y = -float('inf'), -float('inf') max_x, max_y = -float("inf"), -float("inf")
try: try:
for x, y in points: for x, y in points:
new_x = cos(angle) * x - sin(angle) * y new_x = cos(angle) * x - sin(angle) * y
@ -327,7 +383,11 @@ def _low_discrepancy_sampling(n_points, low_discrepancy_stack: List[Tuple]):
for i in range(len(q) - 1): for i in range(len(q) - 1):
for j in range(dims): for j in range(dims):
x[q[i]:q[i + 1], j] = (x[q[i]:q[i + 1], j] - rmin[j]) / 2 + rmin[j] + ((i >> j) & 1) * bi_range x[q[i] : q[i + 1], j] = (
(x[q[i] : q[i + 1], j] - rmin[j]) / 2
+ rmin[j]
+ ((i >> j) & 1) * bi_range
)
rmin_sub = [v + bi_range * ((i >> j) & 1) for j, v in enumerate(rmin)] rmin_sub = [v + bi_range * ((i >> j) & 1) for j, v in enumerate(rmin)]
uniform(x, q[i], q[i + 1], rmin_sub, bi_range=bi_range / 2) uniform(x, q[i], q[i + 1], rmin_sub, bi_range=bi_range / 2)
return x return x
@ -337,11 +397,15 @@ def _low_discrepancy_sampling(n_points, low_discrepancy_stack: List[Tuple]):
uniform(points, start=0, end=n, rmin=[0] * dim) uniform(points, start=0, end=n, rmin=[0] * dim)
points_dict = {} points_dict = {}
for i, (key, bi_range) in enumerate(low_discrepancy_stack): for i, (key, bi_range) in enumerate(low_discrepancy_stack):
points_dict[key] = points[:, i:i + 1] * (bi_range[1] - bi_range[0]) + bi_range[0] points_dict[key] = (
points[:, i : i + 1] * (bi_range[1] - bi_range[0]) + bi_range[0]
)
return points_dict return points_dict
def _inverse_edge(edge: Edge): def _inverse_edge(edge: Edge):
new_functions = {k: -v if k.startswith('normal_') else v for k, v in edge.functions.items()} new_functions = {
k: -v if k.startswith("normal_") else v for k, v in edge.functions.items()
}
edge = Edge(functions=new_functions, ranges=edge.ranges, area=edge.area) edge = Edge(functions=new_functions, ranges=edge.ranges, area=edge.area)
return edge return edge

File diff suppressed because it is too large Load Diff

View File

@ -6,7 +6,7 @@ import inspect
import math import math
from typing import Dict from typing import Dict
__all__ = ['get_available_class', 'Optimizable'] __all__ = ["get_available_class", "Optimizable"]
def get_available_class(module, class_name) -> Dict[str, type]: def get_available_class(module, class_name) -> Dict[str, type]:
@ -19,20 +19,28 @@ def get_available_class(module, class_name) -> Dict[str, type]:
:return: A dict mapping from subclass.name to subclass :return: A dict mapping from subclass.name to subclass
:rtype: Dict[str, type] :rtype: Dict[str, type]
""" """
return dict(filter( return dict(
filter(
lambda x: inspect.isclass(x[1]) lambda x: inspect.isclass(x[1])
and issubclass(x[1], class_name) and issubclass(x[1], class_name)
and (not x[1] == class_name), and (not x[1] == class_name),
inspect.getmembers(module))) inspect.getmembers(module),
)
)
class Optimizable(metaclass=abc.ABCMeta): class Optimizable(metaclass=abc.ABCMeta):
"""An abstract class for organizing optimization related configuration and operations. """An abstract class for organizing optimization related configuration and operations.
The interface is implemented by solver.Solver The interface is implemented by solver.Solver
""" """
OPTIMIZER_MAP = get_available_class(module=torch.optim, class_name=torch.optim.Optimizer)
SCHEDULE_MAP = get_available_class(module=torch.optim.lr_scheduler, OPTIMIZER_MAP = get_available_class(
class_name=torch.optim.lr_scheduler._LRScheduler) module=torch.optim, class_name=torch.optim.Optimizer
)
SCHEDULE_MAP = get_available_class(
module=torch.optim.lr_scheduler,
class_name=torch.optim.lr_scheduler._LRScheduler,
)
@property @property
def optimizers(self): def optimizers(self):
@ -60,23 +68,25 @@ class Optimizable(metaclass=abc.ABCMeta):
self.configure_optimizers() self.configure_optimizers()
def parse_optimizer(self, **kwargs): def parse_optimizer(self, **kwargs):
default_config = dict(optimizer='Adam', lr=1e-3) default_config = dict(optimizer="Adam", lr=1e-3)
default_config.update(kwargs.get('opt_config', {})) default_config.update(kwargs.get("opt_config", {}))
self.optimizer_config = default_config self.optimizer_config = default_config
def parse_lr_schedule(self, **kwargs): def parse_lr_schedule(self, **kwargs):
default_config = dict(scheduler='ExponentialLR', gamma=math.pow(0.95, 0.001), last_epoch=-1) default_config = dict(
default_config.update(kwargs.get('schedule_config', {})) scheduler="ExponentialLR", gamma=math.pow(0.95, 0.001), last_epoch=-1
)
default_config.update(kwargs.get("schedule_config", {}))
self.schedule_config = default_config self.schedule_config = default_config
def __str__(self): def __str__(self):
if 'optimizer_config' in self.__dict__: if "optimizer_config" in self.__dict__:
opt_str = str(self.optimizer_config) opt_str = str(self.optimizer_config)
else: else:
opt_str = str('optimizer is empty...') opt_str = str("optimizer is empty...")
if 'schedule_config' in self.__dict__: if "schedule_config" in self.__dict__:
schedule_str = str(self.schedule_config) schedule_str = str(self.schedule_config)
else: else:
schedule_str = str('scheduler is empty...') schedule_str = str("scheduler is empty...")
return "\n".join([opt_str, schedule_str]) return "\n".join([opt_str, schedule_str])

View File

@ -5,7 +5,14 @@ from sympy import Function, Number, symbols
from idrlnet.pde import PdeNode from idrlnet.pde import PdeNode
__all__ = ['DiffusionNode', 'NavierStokesNode', 'WaveNode', 'BurgersNode', 'SchrodingerNode', 'AllenCahnNode'] __all__ = [
"DiffusionNode",
"NavierStokesNode",
"WaveNode",
"BurgersNode",
"SchrodingerNode",
"AllenCahnNode",
]
def symbolize(s, input_variables=None): def symbolize(s, input_variables=None):
@ -19,134 +26,172 @@ def symbolize(s, input_variables=None):
class DiffusionNode(PdeNode): class DiffusionNode(PdeNode):
def __init__(self, T='T', D='D', Q=0, dim=3, time=True, **kwargs): def __init__(self, T="T", D="D", Q=0, dim=3, time=True, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
self.T = T self.T = T
x, y, z, t = symbols('x y z t') x, y, z, t = symbols("x y z t")
input_variables = {'x': x, 'y': y, 'z': z, 't': t} input_variables = {"x": x, "y": y, "z": z, "t": t}
assert type(T) == str, "T should be string" assert type(T) == str, "T should be string"
T = symbolize(T, input_variables=input_variables) T = symbolize(T, input_variables=input_variables)
D = symbolize(D, input_variables=input_variables) D = symbolize(D, input_variables=input_variables)
Q = symbolize(Q, input_variables=input_variables) Q = symbolize(Q, input_variables=input_variables)
self.equations = {'diffusion_' + self.T: -Q} self.equations = {"diffusion_" + self.T: -Q}
if time: if time:
self.equations['diffusion_' + self.T] += T.diff(t) self.equations["diffusion_" + self.T] += T.diff(t)
coord = [x, y, z] coord = [x, y, z]
for i in range(dim): for i in range(dim):
s = coord[i] s = coord[i]
self.equations['diffusion_' + self.T] -= (D * T.diff(s)).diff(s) self.equations["diffusion_" + self.T] -= (D * T.diff(s)).diff(s)
self.make_nodes() self.make_nodes()
class NavierStokesNode(PdeNode): class NavierStokesNode(PdeNode):
def __init__(self, nu=0.1, rho=1., dim=2., time=False, **kwargs): def __init__(self, nu=0.1, rho=1.0, dim=2.0, time=False, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
self.dim = dim self.dim = dim
assert self.dim in [2, 3], "dim should be 2 or 3" assert self.dim in [2, 3], "dim should be 2 or 3"
self.time = time self.time = time
x, y, z, t = symbols('x y z t') x, y, z, t = symbols("x y z t")
input_variables = {'x': x, 'y': y, 'z': z, 't': t} input_variables = {"x": x, "y": y, "z": z, "t": t}
if self.dim == 2: if self.dim == 2:
input_variables.pop('z') input_variables.pop("z")
if not self.time: if not self.time:
input_variables.pop('t') input_variables.pop("t")
u = symbolize('u', input_variables) u = symbolize("u", input_variables)
v = symbolize('v', input_variables) v = symbolize("v", input_variables)
w = symbolize('w', input_variables) if self.dim == 3 else Number(0) w = symbolize("w", input_variables) if self.dim == 3 else Number(0)
p = symbolize('p', input_variables) p = symbolize("p", input_variables)
nu = symbolize(nu, input_variables) nu = symbolize(nu, input_variables)
rho = symbolize(rho, input_variables) rho = symbolize(rho, input_variables)
mu = rho * nu mu = rho * nu
self.equations = {'continuity': rho.diff(t) + (rho * u).diff(x) + (rho * v).diff(y) + (rho * w).diff(z), self.equations = {
'momentum_x': ((rho * u).diff(t) "continuity": rho.diff(t)
+ (u * ((rho * u).diff(x)) + v * ((rho * u).diff(y)) + w * ((rho * u).diff(z))) + (rho * u).diff(x)
+ (rho * v).diff(y)
+ (rho * w).diff(z),
"momentum_x": (
(rho * u).diff(t)
+ (
u * ((rho * u).diff(x))
+ v * ((rho * u).diff(y))
+ w * ((rho * u).diff(z))
)
+ p.diff(x) + p.diff(x)
- (mu * u.diff(x)).diff(x) - (mu * u.diff(x)).diff(x)
- (mu * u.diff(y)).diff(y) - (mu * u.diff(y)).diff(y)
- (mu * u.diff(z)).diff(z)), - (mu * u.diff(z)).diff(z)
'momentum_y': ((rho * v).diff(t) ),
+ (u * ((rho * v).diff(x)) + v * ((rho * v).diff(y)) + w * ((rho * v).diff(z))) "momentum_y": (
(rho * v).diff(t)
+ (
u * ((rho * v).diff(x))
+ v * ((rho * v).diff(y))
+ w * ((rho * v).diff(z))
)
+ p.diff(y) + p.diff(y)
- (mu * v.diff(x)).diff(x) - (mu * v.diff(x)).diff(x)
- (mu * v.diff(y)).diff(y) - (mu * v.diff(y)).diff(y)
- (mu * v.diff(z)).diff(z)), } - (mu * v.diff(z)).diff(z)
),
}
if self.dim == 3: if self.dim == 3:
self.equations['momentum_z'] = ((rho * w).diff(t) self.equations["momentum_z"] = (
+ (u * ((rho * w).diff(x)) + v * ((rho * w).diff(y)) + w * ( (rho * w).diff(t)
(rho * w).diff(z))) + p.diff(z) - (mu * w.diff(x)).diff(x) - (mu * w.diff(y)).diff(y) - ( + (
mu * w.diff(z)).diff(z)) u * ((rho * w).diff(x))
+ v * ((rho * w).diff(y))
+ w * ((rho * w).diff(z))
)
+ p.diff(z)
- (mu * w.diff(x)).diff(x)
- (mu * w.diff(y)).diff(y)
- (mu * w.diff(z)).diff(z)
)
self.make_nodes() self.make_nodes()
class WaveNode(PdeNode): class WaveNode(PdeNode):
def __init__(self, u='u', c='c', dim=3, time=True, **kwargs): def __init__(self, u="u", c="c", dim=3, time=True, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
self.u = u self.u = u
self.dim = dim self.dim = dim
self.time = time self.time = time
x, y, z, t = symbols('x y z t') x, y, z, t = symbols("x y z t")
input_variables = {'x': x, 'y': y, 'z': z, 't': t} input_variables = {"x": x, "y": y, "z": z, "t": t}
assert self.dim in [1, 2, 3], "dim should be 1, 2 or 3." assert self.dim in [1, 2, 3], "dim should be 1, 2 or 3."
if self.dim == 1: if self.dim == 1:
input_variables.pop('y') input_variables.pop("y")
input_variables.pop('z') input_variables.pop("z")
elif self.dim == 2: elif self.dim == 2:
input_variables.pop('z') input_variables.pop("z")
if not self.time: if not self.time:
input_variables.pop('t') input_variables.pop("t")
assert type(u) == str, "u should be string" assert type(u) == str, "u should be string"
u = symbolize(u, input_variables) u = symbolize(u, input_variables)
c = symbolize(c, input_variables) c = symbolize(c, input_variables)
self.equations = {'wave_equation': (u.diff(t, 2) self.equations = {
"wave_equation": (
u.diff(t, 2)
- (c ** 2 * u.diff(x)).diff(x) - (c ** 2 * u.diff(x)).diff(x)
- (c ** 2 * u.diff(y)).diff(y) - (c ** 2 * u.diff(y)).diff(y)
- (c ** 2 * u.diff(z)).diff(z))} - (c ** 2 * u.diff(z)).diff(z)
)
}
self.make_nodes() self.make_nodes()
class BurgersNode(PdeNode): class BurgersNode(PdeNode):
def __init__(self, u: str = 'u', v='v'): def __init__(self, u: str = "u", v="v"):
super().__init__() super().__init__()
x, t = symbols('x t') x, t = symbols("x t")
input_variables = {'x': x, 't': t} input_variables = {"x": x, "t": t}
assert type(u) == str, "u needs to be string" assert type(u) == str, "u needs to be string"
u = symbolize(u, input_variables) u = symbolize(u, input_variables)
v = symbolize(v, input_variables) v = symbolize(v, input_variables)
self.equations = {f'burgers_{str(u)}': (u.diff(t) + u * u.diff(x) - v * (u.diff(x)).diff(x))} self.equations = {
f"burgers_{str(u)}": (u.diff(t) + u * u.diff(x) - v * (u.diff(x)).diff(x))
}
self.make_nodes() self.make_nodes()
class SchrodingerNode(PdeNode): class SchrodingerNode(PdeNode):
def __init__(self, u='u', v='v', c=0.5): def __init__(self, u="u", v="v", c=0.5):
super().__init__() super().__init__()
self.c = c self.c = c
x, t = symbols('x t') x, t = symbols("x t")
input_variables = {'x': x, 't': t} input_variables = {"x": x, "t": t}
assert type(u) == str, "u should be string" assert type(u) == str, "u should be string"
u = symbolize(u, input_variables) u = symbolize(u, input_variables)
assert type(v) == str, "v should be string" assert type(v) == str, "v should be string"
v = symbolize(v, input_variables) v = symbolize(v, input_variables)
self.equations = {'real': u.diff(t) + self.c * v.diff(x, 2) + (u ** 2 + v ** 2) * v, self.equations = {
'imaginary': v.diff(t) - self.c * u.diff(x, 2) - (u ** 2 + v ** 2) * u} "real": u.diff(t) + self.c * v.diff(x, 2) + (u ** 2 + v ** 2) * v,
"imaginary": v.diff(t) - self.c * u.diff(x, 2) - (u ** 2 + v ** 2) * u,
}
self.make_nodes() self.make_nodes()
class AllenCahnNode(PdeNode): class AllenCahnNode(PdeNode):
def __init__(self, u='u', gamma_1=0.0001, gamma_2=5): def __init__(self, u="u", gamma_1=0.0001, gamma_2=5):
super().__init__() super().__init__()
self.gama_1 = gamma_1 self.gama_1 = gamma_1
self.gama_2 = gamma_2 self.gama_2 = gamma_2
x, t = symbols('x t') x, t = symbols("x t")
input_variables = {'x': x, 't': t} input_variables = {"x": x, "t": t}
assert type(u) == str, "u should be string" assert type(u) == str, "u should be string"
u = symbolize(u, input_variables) u = symbolize(u, input_variables)
self.equations = {'AllenCahn_' + str(u): u.diff(t) - self.gama_1 * u.diff(x, 2) - self.gama_2 * (u - u ** 3)} self.equations = {
"AllenCahn_"
+ str(u): u.diff(t)
- self.gama_1 * u.diff(x, 2)
- self.gama_2 * (u - u ** 3)
}
self.make_nodes() self.make_nodes()

View File

@ -11,7 +11,16 @@ from typing import Union, List
from idrlnet.torch_util import integral, _replace_derivatives, torch_lambdify from idrlnet.torch_util import integral, _replace_derivatives, torch_lambdify
from idrlnet.variable import Variables from idrlnet.variable import Variables
__all__ = ['NormalGradient', 'Difference', 'Derivative', 'Curl', 'Divergence', 'ICNode', 'Int1DNode', 'IntEq'] __all__ = [
"NormalGradient",
"Difference",
"Derivative",
"Curl",
"Divergence",
"ICNode",
"Int1DNode",
"IntEq",
]
class NormalGradient(PdeNode): class NormalGradient(PdeNode):
@ -21,48 +30,53 @@ class NormalGradient(PdeNode):
self.dim = dim self.dim = dim
self.time = time self.time = time
x, y, z, normal_x, normal_y, normal_z, t = symbols('x y z normal_x normal_y normal_z t') x, y, z, normal_x, normal_y, normal_z, t = symbols(
"x y z normal_x normal_y normal_z t"
)
input_variables = {'x': x, input_variables = {"x": x, "y": y, "z": z, "t": t}
'y': y,
'z': z,
't': t}
if self.dim == 1: if self.dim == 1:
input_variables.pop('y') input_variables.pop("y")
input_variables.pop('z') input_variables.pop("z")
elif self.dim == 2: elif self.dim == 2:
input_variables.pop('z') input_variables.pop("z")
if not self.time: if not self.time:
input_variables.pop('t') input_variables.pop("t")
T = Function(T)(*input_variables) T = Function(T)(*input_variables)
self.equations = {'normal_gradient_' + self.T: (normal_x * T.diff(x) self.equations = {
+ normal_y * T.diff(y) "normal_gradient_"
+ normal_z * T.diff(z))} + self.T: (
normal_x * T.diff(x) + normal_y * T.diff(y) + normal_z * T.diff(z)
)
}
self.make_nodes() self.make_nodes()
class Difference(PdeNode): class Difference(PdeNode):
def __init__(self, T: Union[str, Symbol, float, int], S: Union[str, Symbol, float, int], dim=3, time=True): def __init__(
self,
T: Union[str, Symbol, float, int],
S: Union[str, Symbol, float, int],
dim=3,
time=True,
):
super().__init__() super().__init__()
self.T = T self.T = T
self.S = S self.S = S
self.dim = dim self.dim = dim
self.time = time self.time = time
x, y, z = symbols('x y z') x, y, z = symbols("x y z")
t = Symbol('t') t = Symbol("t")
input_variables = {'x': x, input_variables = {"x": x, "y": y, "z": z, "t": t}
'y': y,
'z': z,
't': t}
if self.dim == 1: if self.dim == 1:
input_variables.pop('y') input_variables.pop("y")
input_variables.pop('z') input_variables.pop("z")
elif self.dim == 2: elif self.dim == 2:
input_variables.pop('z') input_variables.pop("z")
if not self.time: if not self.time:
input_variables.pop('t') input_variables.pop("t")
# variables to set the gradients (example Temperature) # variables to set the gradients (example Temperature)
T = Function(T)(*input_variables) T = Function(T)(*input_variables)
@ -70,32 +84,35 @@ class Difference(PdeNode):
# set equations # set equations
self.equations = {} self.equations = {}
self.equations['difference_' + self.T + '_' + self.S] = T - S self.equations["difference_" + self.T + "_" + self.S] = T - S
self.make_nodes() self.make_nodes()
class Derivative(PdeNode): class Derivative(PdeNode):
def __init__(self, T: Union[str, Symbol, float, int], p: Union[str, Symbol], S: Union[str, Symbol, float, int] = 0., def __init__(
dim=3, time=True): self,
T: Union[str, Symbol, float, int],
p: Union[str, Symbol],
S: Union[str, Symbol, float, int] = 0.0,
dim=3,
time=True,
):
super().__init__() super().__init__()
self.T = T self.T = T
self.S = S self.S = S
self.dim = dim self.dim = dim
self.time = time self.time = time
x, y, z = symbols('x y z') x, y, z = symbols("x y z")
t = Symbol('t') t = Symbol("t")
input_variables = {'x': x, input_variables = {"x": x, "y": y, "z": z, "t": t}
'y': y,
'z': z,
't': t}
if self.dim == 1: if self.dim == 1:
input_variables.pop('y') input_variables.pop("y")
input_variables.pop('z') input_variables.pop("z")
elif self.dim == 2: elif self.dim == 2:
input_variables.pop('z') input_variables.pop("z")
if not self.time: if not self.time:
input_variables.pop('t') input_variables.pop("t")
if type(S) is str: if type(S) is str:
S = Function(S)(*input_variables) S = Function(S)(*input_variables)
elif type(S) in [float, int]: elif type(S) in [float, int]:
@ -105,9 +122,11 @@ class Derivative(PdeNode):
T = Function(T)(*input_variables) T = Function(T)(*input_variables)
self.equations = {} self.equations = {}
if isinstance(S, Function): if isinstance(S, Function):
self.equations['derivative_' + self.T + ':' + str(p) + '_' + str(self.S)] = T.diff(p) - S self.equations[
"derivative_" + self.T + ":" + str(p) + "_" + str(self.S)
] = (T.diff(p) - S)
else: else:
self.equations['derivative_' + self.T + ':' + str(p)] = T.diff(p) - S self.equations["derivative_" + self.T + ":" + str(p)] = T.diff(p) - S
self.make_nodes() self.make_nodes()
@ -115,9 +134,9 @@ class Curl(PdeNode):
def __init__(self, vector, curl_name=None): def __init__(self, vector, curl_name=None):
super().__init__() super().__init__()
if curl_name is None: if curl_name is None:
curl_name = ['u', 'v', 'w'] curl_name = ["u", "v", "w"]
x, y, z = symbols('x y z') x, y, z = symbols("x y z")
input_variables = {'x': x, 'y': y, 'z': z} input_variables = {"x": x, "y": y, "z": z}
v_0 = vector[0] v_0 = vector[0]
v_1 = vector[1] v_1 = vector[1]
@ -146,11 +165,11 @@ class Curl(PdeNode):
class Divergence(PdeNode): class Divergence(PdeNode):
def __init__(self, vector, div_name='div_v'): def __init__(self, vector, div_name="div_v"):
super().__init__() super().__init__()
x, y, z = symbols('x y z') x, y, z = symbols("x y z")
input_variables = {'x': x, 'y': y, 'z': z} input_variables = {"x": x, "y": y, "z": z}
v_0 = vector[0] v_0 = vector[0]
v_1 = vector[1] v_1 = vector[1]
@ -174,9 +193,13 @@ class Divergence(PdeNode):
class ICNode(PdeNode): class ICNode(PdeNode):
def __init__(self, T: Union[str, Symbol, int, float, List[Union[str, Symbol, int, float]]], dim: int = 2, def __init__(
self,
T: Union[str, Symbol, int, float, List[Union[str, Symbol, int, float]]],
dim: int = 2,
time: bool = False, time: bool = False,
reduce_name: str = None): reduce_name: str = None,
):
super().__init__() super().__init__()
if reduce_name is None: if reduce_name is None:
reduce_name = str(T) reduce_name = str(T)
@ -185,28 +208,26 @@ class ICNode(PdeNode):
self.time = time self.time = time
self.reduce_name = reduce_name self.reduce_name = reduce_name
x, y, z = symbols('x y z') x, y, z = symbols("x y z")
normal_x = Symbol('normal_x') normal_x = Symbol("normal_x")
normal_y = Symbol('normal_y') normal_y = Symbol("normal_y")
normal_z = Symbol('normal_z') normal_z = Symbol("normal_z")
area = Symbol('area') area = Symbol("area")
t = Symbol('t') t = Symbol("t")
input_variables = {'x': x, input_variables = {"x": x, "y": y, "z": z, "t": t}
'y': y,
'z': z,
't': t}
if self.dim == 1: if self.dim == 1:
input_variables.pop('y') input_variables.pop("y")
input_variables.pop('z') input_variables.pop("z")
elif self.dim == 2: elif self.dim == 2:
input_variables.pop('z') input_variables.pop("z")
if not self.time: if not self.time:
input_variables.pop('t') input_variables.pop("t")
def sympify_T(T: Union[str, Symbol, int, float, List[Union[str, Symbol, int, float]]]) -> Union[ def sympify_T(
Symbol, List[Symbol]]: T: Union[str, Symbol, int, float, List[Union[str, Symbol, int, float]]]
) -> Union[Symbol, List[Symbol]]:
if isinstance(T, list): if isinstance(T, list):
return [sympify_T(_T) for _T in T] return [sympify_T(_T) for _T in T]
elif type(T) is str: elif type(T) is str:
@ -220,23 +241,33 @@ class ICNode(PdeNode):
self.equations = {} self.equations = {}
if isinstance(T, list): if isinstance(T, list):
if self.dim == 3: if self.dim == 3:
self.equations['integral_' + self.reduce_name] = integral((normal_x * T[0] self.equations["integral_" + self.reduce_name] = integral(
+ normal_y * T[1] (normal_x * T[0] + normal_y * T[1] + normal_z * T[2]) * area
+ normal_z * T[2]) * area) )
if self.dim == 2: if self.dim == 2:
self.equations['integral_' + self.reduce_name] = integral((normal_x * T[0] self.equations["integral_" + self.reduce_name] = integral(
+ normal_y * T[1]) * area) (normal_x * T[0] + normal_y * T[1]) * area
)
else: else:
self.equations['integral_' + self.reduce_name] = integral(T * area) self.equations["integral_" + self.reduce_name] = integral(T * area)
self.make_nodes() self.make_nodes()
class Int1DNode(PdeNode): class Int1DNode(PdeNode):
counter = 0 counter = 0
def __init__(self, expression, expression_name, lb, ub, var: Union[str, sp.Symbol] = 's', degree=20, **kwargs): def __init__(
self,
expression,
expression_name,
lb,
ub,
var: Union[str, sp.Symbol] = "s",
degree=20,
**kwargs
):
super().__init__(**kwargs) super().__init__(**kwargs)
x = sp.Symbol('x') x = sp.Symbol("x")
self.equations = {} self.equations = {}
self.var = sp.Symbol(var) if isinstance(var, str) else var self.var = sp.Symbol(var) if isinstance(var, str) else var
self.degree = degree self.degree = degree
@ -265,13 +296,19 @@ class Int1DNode(PdeNode):
else: else:
raise raise
if 'funs' in kwargs.keys(): if "funs" in kwargs.keys():
self.funs = kwargs['funs'] self.funs = kwargs["funs"]
else: else:
self.funs = {} self.funs = {}
self.computable_name = set(*[fun['output_map'].values() for _, fun in self.funs.items()]) self.computable_name = set(
*[fun["output_map"].values() for _, fun in self.funs.items()]
)
self.fun_require_input = set( self.fun_require_input = set(
*[set(fun['eval'].inputs) - set(fun['input_map'].keys()) for _, fun in self.funs.items()]) *[
set(fun["eval"].inputs) - set(fun["input_map"].keys())
for _, fun in self.funs.items()
]
)
self.make_nodes() self.make_nodes()
@ -300,13 +337,22 @@ class Int1DNode(PdeNode):
self.derivatives = [] self.derivatives = []
self.outputs = [x for x in name_set] self.outputs = [x for x in name_set]
def new_node(self, name: str = None, tf_eq: sp.Expr = None, free_symbols: List[str] = None, *args, **kwargs): def new_node(
self,
name: str = None,
tf_eq: sp.Expr = None,
free_symbols: List[str] = None,
*args,
**kwargs
):
out_symbols = [x for x in free_symbols if x not in self.funs.keys()] out_symbols = [x for x in free_symbols if x not in self.funs.keys()]
lb_lambda = torch_lambdify(out_symbols, self.lb) lb_lambda = torch_lambdify(out_symbols, self.lb)
ub_lambda = torch_lambdify(out_symbols, self.ub) ub_lambda = torch_lambdify(out_symbols, self.ub)
eq_lambda = torch_lambdify([*free_symbols, self.var.name], tf_eq) eq_lambda = torch_lambdify([*free_symbols, self.var.name], tf_eq)
node = Node() node = Node()
node.evaluate = IntEq(self, lb_lambda, ub_lambda, out_symbols, free_symbols, eq_lambda, name) node.evaluate = IntEq(
self, lb_lambda, ub_lambda, out_symbols, free_symbols, eq_lambda, name
)
node.inputs = [x for x in free_symbols if x not in self.funs.keys()] node.inputs = [x for x in free_symbols if x not in self.funs.keys()]
node.derivatives = [] node.derivatives = []
node.outputs = [name] node.outputs = [name]
@ -315,7 +361,16 @@ class Int1DNode(PdeNode):
class IntEq: class IntEq:
def __init__(self, binding_node, lb_lambda, ub_lambda, out_symbols, free_symbols, eq_lambda, name): def __init__(
self,
binding_node,
lb_lambda,
ub_lambda,
out_symbols,
free_symbols,
eq_lambda,
name,
):
self.binding_node = binding_node self.binding_node = binding_node
self.lb_lambda = lb_lambda self.lb_lambda = lb_lambda
self.ub_lambda = ub_lambda self.ub_lambda = ub_lambda
@ -326,8 +381,12 @@ class IntEq:
def __call__(self, var: Variables): def __call__(self, var: Variables):
var = {k: v for k, v in var.items()} var = {k: v for k, v in var.items()}
lb_value = self.lb_lambda(**{k: v for k, v in var.items() if k in self.out_symbols}) lb_value = self.lb_lambda(
ub_value = self.ub_lambda(**{k: v for k, v in var.items() if k in self.out_symbols}) **{k: v for k, v in var.items() if k in self.out_symbols}
)
ub_value = self.ub_lambda(
**{k: v for k, v in var.items() if k in self.out_symbols}
)
xx = dict() xx = dict()
for syp in self.free_symbols: for syp in self.free_symbols:
@ -347,19 +406,21 @@ class IntEq:
new_var = dict() new_var = dict()
for _, fun in self.binding_node.funs.items(): for _, fun in self.binding_node.funs.items():
input_map = fun['input_map'] input_map = fun["input_map"]
output_map = fun['output_map'] output_map = fun["output_map"]
tmp_var = dict() tmp_var = dict()
for k, v in xx.items(): for k, v in xx.items():
tmp_var[k] = v tmp_var[k] = v
for k, v in input_map.items(): for k, v in input_map.items():
tmp_var[k] = quad_s tmp_var[k] = quad_s
res = fun['eval'].evaluate(tmp_var) res = fun["eval"].evaluate(tmp_var)
for k, v in output_map.items(): for k, v in output_map.items():
res[v] = res.pop(k) res[v] = res.pop(k)
new_var.update(res) new_var.update(res)
xx.update(new_var) xx.update(new_var)
values = quad_w * self.eq_lambda(**dict(**{self.binding_node.var.name: quad_s}, **xx)) values = quad_w * self.eq_lambda(
**dict(**{self.binding_node.var.name: quad_s}, **xx)
)
values = values.reshape(shape) values = values.reshape(shape)
return {self.name: values.sum(1, keepdim=True)} return {self.name: values.sum(1, keepdim=True)}