diff --git a/.flake8 b/.flake8 index 0739a98..7a06c2c 100644 --- a/.flake8 +++ b/.flake8 @@ -1,2 +1,2 @@ [flake8] -ignore = E203, W503, E501, E231, F401, F403 +ignore = E203, W503, E501, E231, F401, F403, E722, E731 diff --git a/docs/conf.py b/docs/conf.py index 2f1f833..13afc9f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -64,7 +64,7 @@ html_theme = 'sphinx_rtd_theme' html_static_path = ['_static'] # for MarkdownParser -from sphinx_markdown_parser.parser import MarkdownParser +from sphinx_markdown_parser.parser import MarkdownParser # noqa # def setup(app): diff --git a/examples/allen_cahn/allen_cahn.py b/examples/allen_cahn/allen_cahn.py index d12f068..f25251c 100644 --- a/examples/allen_cahn/allen_cahn.py +++ b/examples/allen_cahn/allen_cahn.py @@ -8,16 +8,16 @@ import os import torch # parameter phase -L = 1. +L = 1.0 # define geometry geo = sc.Line1D(-1.0, 1.0) # define sympy varaibles to parametize domain curves -t_symbol = Symbol('t') -x = Symbol('x') -u = sp.Function('u')(x, t_symbol) -up = sp.Function('up')(x, t_symbol) +t_symbol = Symbol("t") +x = Symbol("x") +u = sp.Function("u")(x, t_symbol) +up = sp.Function("up")(x, t_symbol) time_range = {t_symbol: (0, L)} @@ -25,52 +25,62 @@ time_range = {t_symbol: (0, L)} @sc.datanode class AllenInit(sc.SampleDomain): def sampling(self, *args, **kwargs): - return geo.sample_interior(density=300, param_ranges={t_symbol: 0.0}), \ - {'u': x ** 2 * sp.cos(sp.pi * x), 'lambda_u': 100} + return geo.sample_interior(density=300, param_ranges={t_symbol: 0.0}), { + "u": x ** 2 * sp.cos(sp.pi * x), + "lambda_u": 100, + } @sc.datanode class AllenBc(sc.SampleDomain): def sampling(self, *args, **kwargs): - return geo.sample_boundary(density=200, sieve=sp.Eq(x, -1), param_ranges=time_range), \ - {'difference_u_up': 0, - 'difference_diff_u_diff_up': 0, - } + return geo.sample_boundary( + density=200, sieve=sp.Eq(x, -1), param_ranges=time_range + ), { + "difference_u_up": 0, + "difference_diff_u_diff_up": 0, + } -@sc.datanode(name='allen_domain') +@sc.datanode(name="allen_domain") class AllenEq(sc.SampleDomain): def __init__(self): - self.points = geo.sample_interior(density=2000, param_ranges=time_range, low_discrepancy=True) + self.points = geo.sample_interior( + density=2000, param_ranges=time_range, low_discrepancy=True + ) def sampling(self, *args, **kwargs): - constraints = {'AllenCahn_u': 0} + constraints = {"AllenCahn_u": 0} return self.points, constraints -@sc.datanode(name='data_evaluate') +@sc.datanode(name="data_evaluate") class AllenPointsInference(sc.SampleDomain): def __init__(self): - self.points = geo.sample_interior(density=5000, param_ranges=time_range, low_discrepancy=True) + self.points = geo.sample_interior( + density=5000, param_ranges=time_range, low_discrepancy=True + ) self.points = sc.Variables(self.points).to_torch_tensor_() - self.constraints = {'AllenCahn_u': torch.zeros_like(self.points['x'])} + self.constraints = {"AllenCahn_u": torch.zeros_like(self.points["x"])} def sampling(self, *args, **kwargs): return self.points, self.constraints -@sc.datanode(name='re_sampling_domain') +@sc.datanode(name="re_sampling_domain") class SpaceAdaptiveSampling(sc.SampleDomain): def __init__(self): - self.points = geo.sample_interior(density=100, param_ranges=time_range, low_discrepancy=True) + self.points = geo.sample_interior( + density=100, param_ranges=time_range, low_discrepancy=True + ) self.points = sc.Variables(self.points).to_torch_tensor_() - self.constraints = {'AllenCahn_u': torch.zeros_like(self.points['x'])} + self.constraints = {"AllenCahn_u": torch.zeros_like(self.points["x"])} def sampling(self, *args, **kwargs): return self.points, self.constraints -@sc.datanode(name='allen_test') +@sc.datanode(name="allen_test") def generate_plot_data(): x = np.linspace(-1.0, 1.0, 100) t = np.linspace(0, 1.0, 100) @@ -82,76 +92,122 @@ def generate_plot_data(): # computational node phase net_u = sc.MLP([2, 128, 128, 128, 128, 2], activation=sc.Activation.tanh) -net_u = sc.NetNode(inputs=('x', 't',), outputs=('u',), name='net1', net=net_u) -xp = sc.ExpressionNode(name='xp', expression=x + 2) -get_tilde_u = sc.get_shared_net_node(net_u, inputs=('xp', 't',), outputs=('up',), name='net2', arch='mlp') +net_u = sc.NetNode( + inputs=( + "x", + "t", + ), + outputs=("u",), + name="net1", + net=net_u, +) +xp = sc.ExpressionNode(name="xp", expression=x + 2) +get_tilde_u = sc.get_shared_net_node( + net_u, + inputs=( + "xp", + "t", + ), + outputs=("up",), + name="net2", + arch="mlp", +) -diff_u = sc.ExpressionNode(expression=u.diff(x), name='diff_u') -diff_up = sc.ExpressionNode(expression=up.diff(x), name='diff_up') +diff_u = sc.ExpressionNode(expression=u.diff(x), name="diff_u") +diff_up = sc.ExpressionNode(expression=up.diff(x), name="diff_up") -pde = sc.AllenCahnNode(u='u', gamma_1=0.0001, gamma_2=5) +pde = sc.AllenCahnNode(u="u", gamma_1=0.0001, gamma_2=5) -boundary_up = sc.Difference(T='diff_u', S='diff_up') -boundary_u = sc.Difference(T='u', S='up') +boundary_up = sc.Difference(T="diff_u", S="diff_up") +boundary_u = sc.Difference(T="u", S="up") # Receiver hook phase + class SpaceAdaptiveReceiver(sc.Receiver): def receive_notify(self, solver, message): - if sc.Signal.TRAIN_PIPE_END in message.keys() and solver.global_step % 1000 == 0: - sc.logger.info('space adaptive sampling...') - results = solver.infer_step({'data_evaluate': ['x', 't', 'sdf', 'AllenCahn_u']}) - residual_data = results['data_evaluate']['AllenCahn_u'].detach().cpu().numpy().ravel() + if ( + sc.Signal.TRAIN_PIPE_END in message.keys() + and solver.global_step % 1000 == 0 + ): + sc.logger.info("space adaptive sampling...") + results = solver.infer_step( + {"data_evaluate": ["x", "t", "sdf", "AllenCahn_u"]} + ) + residual_data = ( + results["data_evaluate"]["AllenCahn_u"].detach().cpu().numpy().ravel() + ) # sort the points by residual loss - index = np.argsort(-1. * np.abs(residual_data))[:200] - _points = {key: values[index].detach().cpu().numpy() for key, values in results['data_evaluate'].items()} - _points.pop('AllenCahn_u') - _points['area'] = np.zeros_like(_points['sdf']) + (1.0 / 200) - solver.set_domain_parameter('re_sampling_domain', {'points': _points}) + index = np.argsort(-1.0 * np.abs(residual_data))[:200] + _points = { + key: values[index].detach().cpu().numpy() + for key, values in results["data_evaluate"].items() + } + _points.pop("AllenCahn_u") + _points["area"] = np.zeros_like(_points["sdf"]) + (1.0 / 200) + solver.set_domain_parameter("re_sampling_domain", {"points": _points}) class PostProcessReceiver(sc.Receiver): def __init__(self): - if not os.path.exists('image'): - os.mkdir('image') + if not os.path.exists("image"): + os.mkdir("image") def receive_notify(self, solver, message): - if sc.Signal.TRAIN_PIPE_END in message.keys() and solver.global_step % 1000 == 1: - sc.logger.info('Post Processing...') - points = s.infer_step({'allen_test': ['x', 't', 'u']}) - triang_total = tri.Triangulation(points['allen_test']['t'].detach().cpu().numpy().ravel(), - points['allen_test']['x'].detach().cpu().numpy().ravel(), ) - plt.tricontourf(triang_total, points['allen_test']['u'].detach().cpu().numpy().ravel(), 100, vmin=-1, - vmax=1) + if ( + sc.Signal.TRAIN_PIPE_END in message.keys() + and solver.global_step % 1000 == 1 + ): + sc.logger.info("Post Processing...") + points = s.infer_step({"allen_test": ["x", "t", "u"]}) + triang_total = tri.Triangulation( + points["allen_test"]["t"].detach().cpu().numpy().ravel(), + points["allen_test"]["x"].detach().cpu().numpy().ravel(), + ) + plt.tricontourf( + triang_total, + points["allen_test"]["u"].detach().cpu().numpy().ravel(), + 100, + vmin=-1, + vmax=1, + ) tc_bar = plt.colorbar() tc_bar.ax.tick_params(labelsize=12) - _points = solver.get_domain_parameter('re_sampling_domain', 'points') - if not isinstance(_points['t'], torch.Tensor): - plt.scatter(_points['t'].ravel(), _points['x'].ravel(), marker='x', s=8) + _points = solver.get_domain_parameter("re_sampling_domain", "points") + if not isinstance(_points["t"], torch.Tensor): + plt.scatter(_points["t"].ravel(), _points["x"].ravel(), marker="x", s=8) else: - plt.scatter(_points['t'].detach().cpu().numpy().ravel(), - _points['x'].detach().cpu().numpy().ravel(), marker='x', s=8) + plt.scatter( + _points["t"].detach().cpu().numpy().ravel(), + _points["x"].detach().cpu().numpy().ravel(), + marker="x", + s=8, + ) - plt.xlabel('$t$') - plt.ylabel('$x$') - plt.title('$u(x,t)$') - plt.savefig(f'image/result_{solver.global_step}.png') + plt.xlabel("$t$") + plt.ylabel("$x$") + plt.title("$u(x,t)$") + plt.savefig(f"image/result_{solver.global_step}.png") plt.show() # Solver phase -s = sc.Solver(sample_domains=(AllenInit(), - AllenBc(), - AllenEq(), - AllenPointsInference(), - SpaceAdaptiveSampling(), - generate_plot_data()), - netnodes=[net_u, get_tilde_u], - pdes=[pde, xp, diff_up, diff_u, boundary_up, boundary_u], - max_iter=60000, - loading=True) +s = sc.Solver( + sample_domains=( + AllenInit(), + AllenBc(), + AllenEq(), + AllenPointsInference(), + SpaceAdaptiveSampling(), + generate_plot_data(), + ), + netnodes=[net_u, get_tilde_u], + pdes=[pde, xp, diff_up, diff_u, boundary_up, boundary_u], + max_iter=60000, + loading=True, +) s.register_receiver(SpaceAdaptiveReceiver()) s.register_receiver(PostProcessReceiver()) diff --git a/idrlnet/callbacks.py b/idrlnet/callbacks.py index e3d21f4..7311134 100644 --- a/idrlnet/callbacks.py +++ b/idrlnet/callbacks.py @@ -13,7 +13,7 @@ __all__ = ['GradientReceiver', 'SummaryReceiver', 'HandleResultReceiver'] class GradientReceiver(Receiver): """Register the receiver to monitor gradient norm on the Tensorboard.""" - def receive_notify(self, solver: 'Solver', message): + def receive_notify(self, solver: 'Solver', message): # noqa if not (Signal.TRAIN_PIPE_END in message): return for netnode in solver.netnodes: @@ -34,7 +34,7 @@ class SummaryReceiver(SummaryWriter, Receiver): def __init__(self, *args, **kwargs): SummaryWriter.__init__(self, *args, **kwargs) - def receive_notify(self, solver: 'Solver', message: Dict): + def receive_notify(self, solver: 'Solver', message: Dict): # noqa if Signal.AFTER_COMPUTE_LOSS in message.keys(): loss_component = message[Signal.AFTER_COMPUTE_LOSS] self.add_scalars('loss_overview', loss_component, solver.global_step) @@ -51,7 +51,7 @@ class HandleResultReceiver(Receiver): def __init__(self, result_dir): self.result_dir = result_dir - def receive_notify(self, solver: 'Solver', message: Dict): + def receive_notify(self, solver: 'Solver', message: Dict): # noqa if Signal.SOLVE_END in message.keys(): samples = solver.sample_variables_from_domains() in_var, _, lambda_out = solver.generate_in_out_dict(samples) diff --git a/idrlnet/geo_utils/geo.py b/idrlnet/geo_utils/geo.py index 1022b40..43e0def 100644 --- a/idrlnet/geo_utils/geo.py +++ b/idrlnet/geo_utils/geo.py @@ -23,7 +23,7 @@ class CheckMeta(type): class AbsGeoObj(metaclass=abc.ABCMeta): @abc.abstractmethod - def rotation(self, angle: float, axis: str = 'z'): + def rotation(self, angle: float, axis: str = "z"): pass @abc.abstractmethod @@ -43,16 +43,24 @@ class Edge(AbsGeoObj): @property def axes(self) -> List[str]: - return [key for key in self.functions if not key.startswith('normal')] + return [key for key in self.functions if not key.startswith("normal")] - def rotation(self, angle: float, axis: str = 'z'): - assert len(self.axes) > 1, 'Cannot rotate a object with dim<2' + def rotation(self, angle: float, axis: str = "z"): + assert len(self.axes) > 1, "Cannot rotate a object with dim<2" rotated_dims = [key for key in self.axes if key != axis] - rd1, rd2, n = rotated_dims[0], rotated_dims[1], 'normal_' - self.functions[rd1] = (cos(angle) * self.functions[rd1] - sin(angle) * self.functions[rd2]) - self.functions[n + rd1] = cos(angle) * self.functions[n + rd1] - sin(angle) * self.functions[n + rd2] - self.functions[rd2] = (sin(angle) * self.functions[rd1] + cos(angle) * self.functions[rd2]) - self.functions[n + rd2] = sin(angle) * self.functions[n + rd1] + cos(angle) * self.functions[n + rd2] + rd1, rd2, n = rotated_dims[0], rotated_dims[1], "normal_" + self.functions[rd1] = ( + cos(angle) * self.functions[rd1] - sin(angle) * self.functions[rd2] + ) + self.functions[n + rd1] = ( + cos(angle) * self.functions[n + rd1] - sin(angle) * self.functions[n + rd2] + ) + self.functions[rd2] = ( + sin(angle) * self.functions[rd1] + cos(angle) * self.functions[rd2] + ) + self.functions[n + rd2] = ( + sin(angle) * self.functions[n + rd1] + cos(angle) * self.functions[n + rd2] + ) return self def scaling(self, scale: float): @@ -62,20 +70,28 @@ class Edge(AbsGeoObj): return self def translation(self, direction): - assert len(direction) == len(self.axes), 'Moving direction must have the save dimension with the object' + assert len(direction) == len( + self.axes + ), "Moving direction must have the save dimension with the object" for key, x in zip(self.axes, direction): self.functions[key] += x return self - def sample(self, density: int, param_ranges=None, low_discrepancy=False) -> Dict[str, np.ndarray]: + def sample( + self, density: int, param_ranges=None, low_discrepancy=False + ) -> Dict[str, np.ndarray]: param_ranges = {} if param_ranges is None else param_ranges inputs = {**self.ranges, **param_ranges}.keys() area_fn = lambdify_np(self.area, inputs) param_points = _ranged_sample(100, ranges={**self.ranges, **param_ranges}) nr_points = int(density * (np.mean(area_fn(**param_points)))) - lambdify_functions = {'area': lambda **x: area_fn(**x) / next(iter(x.values())).shape[0]} - param_points = _ranged_sample(nr_points, {**self.ranges, **param_ranges}, low_discrepancy) + lambdify_functions = { + "area": lambda **x: area_fn(**x) / next(iter(x.values())).shape[0] + } + param_points = _ranged_sample( + nr_points, {**self.ranges, **param_ranges}, low_discrepancy + ) data_var = {} for key, function in self.functions.items(): @@ -105,104 +121,138 @@ class Geometry(AbsGeoObj, metaclass=AbsCheckMix): if type(self) in [Geometry, Geometry1D, Geometry2D, Geometry3D]: return if self.edges is None: - raise NotImplementedError('Geometry must define edges') + raise NotImplementedError("Geometry must define edges") if self.bounds is None: - raise NotImplementedError('Geometry must define bounds') + raise NotImplementedError("Geometry must define bounds") if self.sdf is None: - raise NotImplementedError('Geometry must define sdf') + raise NotImplementedError("Geometry must define sdf") @property def axes(self) -> List[str]: return self.edges[0].axes - def translation(self, direction: Union[List, Tuple]) -> 'Geometry': + def translation(self, direction: Union[List, Tuple]) -> "Geometry": assert len(direction) == len(self.axes) [edge.translation(direction) for edge in self.edges] - self.sdf = self.sdf.subs([(Symbol(dim), Symbol(dim) - x) for dim, x in zip(self.axes, direction)]) - self.bounds = {dim: (self.bounds[dim][0] + x, self.bounds[dim][1] + x) for dim, x in zip(self.axes, direction)} + self.sdf = self.sdf.subs( + [(Symbol(dim), Symbol(dim) - x) for dim, x in zip(self.axes, direction)] + ) + self.bounds = { + dim: (self.bounds[dim][0] + x, self.bounds[dim][1] + x) + for dim, x in zip(self.axes, direction) + } return self - def rotation(self, angle: float, axis: str = 'z', center=None) -> 'Geometry': + def rotation(self, angle: float, axis: str = "z", center=None) -> "Geometry": if center is not None: self.translation([-x for x in center]) [edge.rotation(angle, axis) for edge in self.edges] rotated_dims = [key for key in self.axes if key != axis] sp_0 = Symbol(rotated_dims[0]) - _sp_0 = Symbol('tmp_0') + _sp_0 = Symbol("tmp_0") sp_1 = Symbol(rotated_dims[1]) - _sp_1 = Symbol('tmp_1') - self.sdf = self.sdf.subs({sp_0: cos(angle) * _sp_0 + sin(angle) * _sp_1, - sp_1: - sin(angle) * _sp_0 + cos(angle) * _sp_1}) + _sp_1 = Symbol("tmp_1") + self.sdf = self.sdf.subs( + { + sp_0: cos(angle) * _sp_0 + sin(angle) * _sp_1, + sp_1: -sin(angle) * _sp_0 + cos(angle) * _sp_1, + } + ) self.sdf = self.sdf.subs({_sp_0: sp_0, _sp_1: sp_1}) - self.bounds[rotated_dims[0]], self.bounds[rotated_dims[1]] = _rotate_rec(self.bounds[rotated_dims[0]], - self.bounds[rotated_dims[1]], - angle=angle) + self.bounds[rotated_dims[0]], self.bounds[rotated_dims[1]] = _rotate_rec( + self.bounds[rotated_dims[0]], self.bounds[rotated_dims[1]], angle=angle + ) if center is not None: self.translation(center) return self - def scaling(self, scale: float, center: Tuple = None) -> 'Geometry': - assert scale > 0, 'scaling must be positive' + def scaling(self, scale: float, center: Tuple = None) -> "Geometry": + assert scale > 0, "scaling must be positive" if center is not None: self.translation(tuple([-x for x in center])) [edge.scaling(scale) for edge in self.edges] - self.sdf = self.sdf.subs({Symbol(dim): Symbol(dim) / scale for dim in self.axes}) + self.sdf = self.sdf.subs( + {Symbol(dim): Symbol(dim) / scale for dim in self.axes} + ) self.sdf = scale * self.sdf for dim in self.axes: - self.bounds[dim] = (self.bounds[dim][0] * scale, self.bounds[dim][1] * scale) + self.bounds[dim] = ( + self.bounds[dim][0] * scale, + self.bounds[dim][1] * scale, + ) if center is not None: self.translation(center) return self - def duplicate(self) -> 'Geometry': + def duplicate(self) -> "Geometry": return copy.deepcopy(self) - def sample_boundary(self, density: int, sieve=None, param_ranges: Dict = None, low_discrepancy=False) -> Dict[ - str, np.ndarray]: + def sample_boundary( + self, density: int, sieve=None, param_ranges: Dict = None, low_discrepancy=False + ) -> Dict[str, np.ndarray]: param_ranges = dict() if param_ranges is None else param_ranges - points_list = [edge.sample(density, param_ranges, low_discrepancy) for edge in - self.edges] - points = reduce(lambda e1, e2: {_k: np.concatenate([e1[_k], e2[_k]], axis=0) for _k in e1}, points_list) + points_list = [ + edge.sample(density, param_ranges, low_discrepancy) for edge in self.edges + ] + points = reduce( + lambda e1, e2: {_k: np.concatenate([e1[_k], e2[_k]], axis=0) for _k in e1}, + points_list, + ) points = self._sieve_points(points, sieve, sign=-1, tol=1e-4) return points - def _sieve_points(self, points, sieve, tol=1e-4, sign=1.): + def _sieve_points(self, points, sieve, tol=1e-4, sign=1.0): sdf_fn = lambdify_np(self.sdf, points.keys()) - points['sdf'] = sdf_fn(**points) + points["sdf"] = sdf_fn(**points) criteria_fn = lambdify_np(True if sieve is None else sieve, points.keys()) - criteria_index = np.logical_and(np.greater(points['sdf'], -tol), criteria_fn(**points)) + criteria_index = np.logical_and( + np.greater(points["sdf"], -tol), criteria_fn(**points) + ) if sign == -1: - criteria_index = np.logical_and(np.less(points['sdf'], tol), criteria_index) + criteria_index = np.logical_and(np.less(points["sdf"], tol), criteria_index) points = {k: v[criteria_index[:, 0], :] for k, v in points.items()} return points - def sample_interior(self, density: int, bounds: Dict = None, sieve=None, param_ranges: Dict = None, - low_discrepancy=False) -> Dict[str, np.ndarray]: + def sample_interior( + self, + density: int, + bounds: Dict = None, + sieve=None, + param_ranges: Dict = None, + low_discrepancy=False, + ) -> Dict[str, np.ndarray]: bounds = self.bounds if bounds is None else bounds - bounds = {Symbol(key) if isinstance(key, str) else key: value for key, value in bounds.items()} + bounds = { + Symbol(key) if isinstance(key, str) else key: value + for key, value in bounds.items() + } param_ranges = {} if param_ranges is None else param_ranges measure = np.prod([value[1] - value[0] for value in bounds.values()]) nr_points = int(measure * density) - points = _ranged_sample(nr_points, {**bounds, **param_ranges}, low_discrepancy=low_discrepancy) + points = _ranged_sample( + nr_points, {**bounds, **param_ranges}, low_discrepancy=low_discrepancy + ) assert len(points.keys()) >= 0, "No points have been sampled!" - points = self._sieve_points(points, sieve, tol=0.) + points = self._sieve_points(points, sieve, tol=0.0) - points['area'] = np.zeros_like(points['sdf']) + (1.0 / density) + points["area"] = np.zeros_like(points["sdf"]) + (1.0 / density) return points - def __add__(self, other: 'Geometry') -> 'Geometry': + def __add__(self, other: "Geometry") -> "Geometry": geo = self.generate_geo_obj(other) geo.edges = self.edges + other.edges geo.sdf = WrapMax(self.sdf, other.sdf) geo.bounds = dict() for key, value in self.bounds.items(): geo.bounds[key] = ( - min(other.bounds[key][0], self.bounds[key][0]), max(other.bounds[key][1], self.bounds[key][1])) + min(other.bounds[key][0], self.bounds[key][0]), + max(other.bounds[key][1], self.bounds[key][1]), + ) return geo def generate_geo_obj(self, other=None): @@ -219,7 +269,7 @@ class Geometry(AbsGeoObj, metaclass=AbsCheckMix): raise TypeError return geo - def __sub__(self, other: 'Geometry') -> 'Geometry': + def __sub__(self, other: "Geometry") -> "Geometry": geo = self.generate_geo_obj(other) geo.edges = self.edges + [_inverse_edge(edge) for edge in other.edges] @@ -229,22 +279,24 @@ class Geometry(AbsGeoObj, metaclass=AbsCheckMix): geo.bounds[key] = (self.bounds[key][0], self.bounds[key][1]) return geo - def __invert__(self) -> 'Geometry': + def __invert__(self) -> "Geometry": geo = self.generate_geo_obj() geo.edges = [_inverse_edge(edge) for edge in self.edges] geo.sdf = WrapMul(-1, self.sdf) for key, value in self.bounds.items(): - geo.bounds[key] = (-float('inf'), float('inf')) + geo.bounds[key] = (-float("inf"), float("inf")) return geo - def __and__(self, other: 'Geometry') -> 'Geometry': + def __and__(self, other: "Geometry") -> "Geometry": geo = self.generate_geo_obj(other) geo.edges = self.edges + other.edges geo.sdf = WrapMin(self.sdf, other.sdf) geo.bounds = dict() for key, value in self.bounds.items(): geo.bounds[key] = ( - max(other.bounds[key][0], self.bounds[key][0]), min(other.bounds[key][1], self.bounds[key][1])) + max(other.bounds[key][0], self.bounds[key][0]), + min(other.bounds[key][1], self.bounds[key][1]), + ) return geo @@ -261,14 +313,16 @@ class Geometry3D(Geometry): # todo: sample in cuda device -def _ranged_sample(batch_size: int, ranges: Dict, low_discrepancy: bool = False) -> Dict[str, np.ndarray]: +def _ranged_sample( + batch_size: int, ranges: Dict, low_discrepancy: bool = False +) -> Dict[str, np.ndarray]: points = dict() low_discrepancy_stack = [] for key, value in ranges.items(): if isinstance(value, (float, int)): samples = np.ones((batch_size, 1)) * value elif isinstance(value, tuple): - assert len(value) == 2, 'Tuple: length of range should be 2!' + assert len(value) == 2, "Tuple: length of range should be 2!" if low_discrepancy: low_discrepancy_stack.append((key.name, value)) continue @@ -277,10 +331,12 @@ def _ranged_sample(batch_size: int, ranges: Dict, low_discrepancy: bool = False) elif isinstance(value, collections.Callable): samples = value(batch_size) else: - raise TypeError(f'range type {type(value)} not supported!') + raise TypeError(f"range type {type(value)} not supported!") points[key.name] = samples if low_discrepancy: - low_discrepancy_points_dict = _low_discrepancy_sampling(batch_size, low_discrepancy_stack) + low_discrepancy_points_dict = _low_discrepancy_sampling( + batch_size, low_discrepancy_stack + ) points.update(low_discrepancy_points_dict) for key, v in points.items(): points[key] = v.astype(np.float64) @@ -289,8 +345,8 @@ def _ranged_sample(batch_size: int, ranges: Dict, low_discrepancy: bool = False) def _rotate_rec(x: Tuple, y: Tuple, angle: float): points = itertools.product(x, y) - min_x, min_y = float('inf'), float('inf') - max_x, max_y = -float('inf'), -float('inf') + min_x, min_y = float("inf"), float("inf") + max_x, max_y = -float("inf"), -float("inf") try: for x, y in points: new_x = cos(angle) * x - sin(angle) * y @@ -327,7 +383,11 @@ def _low_discrepancy_sampling(n_points, low_discrepancy_stack: List[Tuple]): for i in range(len(q) - 1): for j in range(dims): - x[q[i]:q[i + 1], j] = (x[q[i]:q[i + 1], j] - rmin[j]) / 2 + rmin[j] + ((i >> j) & 1) * bi_range + x[q[i] : q[i + 1], j] = ( + (x[q[i] : q[i + 1], j] - rmin[j]) / 2 + + rmin[j] + + ((i >> j) & 1) * bi_range + ) rmin_sub = [v + bi_range * ((i >> j) & 1) for j, v in enumerate(rmin)] uniform(x, q[i], q[i + 1], rmin_sub, bi_range=bi_range / 2) return x @@ -337,11 +397,15 @@ def _low_discrepancy_sampling(n_points, low_discrepancy_stack: List[Tuple]): uniform(points, start=0, end=n, rmin=[0] * dim) points_dict = {} for i, (key, bi_range) in enumerate(low_discrepancy_stack): - points_dict[key] = points[:, i:i + 1] * (bi_range[1] - bi_range[0]) + bi_range[0] + points_dict[key] = ( + points[:, i : i + 1] * (bi_range[1] - bi_range[0]) + bi_range[0] + ) return points_dict def _inverse_edge(edge: Edge): - new_functions = {k: -v if k.startswith('normal_') else v for k, v in edge.functions.items()} + new_functions = { + k: -v if k.startswith("normal_") else v for k, v in edge.functions.items() + } edge = Edge(functions=new_functions, ranges=edge.ranges, area=edge.area) return edge diff --git a/idrlnet/geo_utils/geo_obj.py b/idrlnet/geo_utils/geo_obj.py index ef7e88b..9f7cd73 100644 --- a/idrlnet/geo_utils/geo_obj.py +++ b/idrlnet/geo_utils/geo_obj.py @@ -8,112 +8,151 @@ from sympy import symbols, Abs, sqrt, Max, Min, cos, sin, log, sign, Heaviside from sympy.vector import CoordSys3D from .geo import Edge, Geometry1D, Geometry2D, Geometry3D -__all__ = ['Line1D', 'Line', 'Tube2D', 'Rectangle', 'Circle', 'Heart', 'Triangle', 'Polygon', 'Plane', 'Tube3D', 'Tube', - 'CircularTube', 'Box', 'Sphere', 'Cylinder'] +__all__ = [ + "Line1D", + "Line", + "Tube2D", + "Rectangle", + "Circle", + "Heart", + "Triangle", + "Polygon", + "Plane", + "Tube3D", + "Tube", + "CircularTube", + "Box", + "Sphere", + "Cylinder", +] class Line1D(Geometry1D): - def __init__(self, point_1, point_2): - x, none = symbols('x none') + x, none = symbols("x none") ranges = {none: (0, 1)} - edge_1 = Edge(functions={'x': point_1, - 'normal_x': -1}, - area=1.0, - ranges=ranges) - edge_2 = Edge(functions={'x': point_2, - 'normal_x': 1}, - area=1.0, - ranges=ranges) + edge_1 = Edge(functions={"x": point_1, "normal_x": -1}, area=1.0, ranges=ranges) + edge_2 = Edge(functions={"x": point_2, "normal_x": 1}, area=1.0, ranges=ranges) self.edges = [edge_1, edge_2] dist = point_2 - point_1 center_x = point_1 + dist / 2 self.sdf = dist / 2 - Abs(x - center_x) - self.bounds = {'x': (point_1, point_2)} + self.bounds = {"x": (point_1, point_2)} class Line(Geometry2D): def __init__(self, point_1, point_2, normal=1): - x, y, l = symbols('x y l') + x, y, l = symbols("x y l") # noqa ranges = {l: (0, 1)} dist_x = point_2[0] - point_1[0] dist_y = point_2[1] - point_1[1] normal_vector = (-dist_y * normal, dist_x * normal) normal_norm = math.sqrt(normal_vector[0] ** 2 + normal_vector[1] ** 2) normal_vector = (normal_vector[0] / normal_norm, normal_vector[1] / normal_norm) - line_1 = Edge(functions={'x': point_1[0] + l * dist_x, - 'y': point_1[1] + l * dist_y, - 'normal_x': normal_vector[0], - 'normal_y': normal_vector[1]}, - ranges=ranges, - area=normal_norm) + line_1 = Edge( + functions={ + "x": point_1[0] + l * dist_x, + "y": point_1[1] + l * dist_y, + "normal_x": normal_vector[0], + "normal_y": normal_vector[1], + }, + ranges=ranges, + area=normal_norm, + ) self.edges = [line_1] self.sdf = ((x - point_1[0]) * dist_y - (y - point_1[1]) * dist_x) / normal_norm - self.bounds = {'x': (min(point_1[0], point_2[0]), max(point_1[0], point_2[0])), - 'y': (min(point_1[1], point_2[1]), max(point_1[1], point_2[1]))} + self.bounds = { + "x": (min(point_1[0], point_2[0]), max(point_1[0], point_2[0])), + "y": (min(point_1[1], point_2[1]), max(point_1[1], point_2[1])), + } class Tube2D(Geometry2D): - def __init__(self, point_1, point_2): - l, y = symbols('l y') + l, y = symbols("l y") ranges = {l: (0, 1)} dist_x = point_2[0] - point_1[0] dist_y = point_2[1] - point_1[1] - line_1 = Edge(functions={'x': l * dist_x + point_1[0], - 'y': point_1[1], - 'normal_x': 0, - 'normal_y': -1}, - ranges=ranges, - area=dist_x) - line_2 = Edge(functions={'x': l * dist_x + point_1[0], - 'y': point_2[1], - 'normal_x': 0, - 'normal_y': 1}, - ranges=ranges, - area=dist_x) + line_1 = Edge( + functions={ + "x": l * dist_x + point_1[0], + "y": point_1[1], + "normal_x": 0, + "normal_y": -1, + }, + ranges=ranges, + area=dist_x, + ) + line_2 = Edge( + functions={ + "x": l * dist_x + point_1[0], + "y": point_2[1], + "normal_x": 0, + "normal_y": 1, + }, + ranges=ranges, + area=dist_x, + ) self.edges = [line_1, line_2] center_y = point_1[1] + (dist_y) / 2 y_diff = Abs(y - center_y) - (point_2[1] - center_y) outside_distance = sqrt(Max(y_diff, 0) ** 2) inside_distance = Min(y_diff, 0) - self.sdf = - (outside_distance + inside_distance) - self.bounds = {'x': (min(point_1[0], point_2[0]), max(point_1[0], point_2[0])), - 'y': (min(point_1[1], point_2[1]), max(point_1[1], point_2[1]))} + self.sdf = -(outside_distance + inside_distance) + self.bounds = { + "x": (min(point_1[0], point_2[0]), max(point_1[0], point_2[0])), + "y": (min(point_1[1], point_2[1]), max(point_1[1], point_2[1])), + } class Rectangle(Geometry2D): def __init__(self, point_1, point_2): - l, x, y = symbols('l x y') + l, x, y = symbols("l x y") ranges = {l: (0, 1)} dist_x = point_2[0] - point_1[0] dist_y = point_2[1] - point_1[1] - edge_1 = Edge(functions={'x': l * dist_x + point_1[0], - 'y': point_1[1], - 'normal_x': 0, - 'normal_y': -1}, - ranges=ranges, - area=dist_x) - edge_2 = Edge(functions={'x': point_2[0], - 'y': l * dist_y + point_1[1], - 'normal_x': 1, - 'normal_y': 0}, - ranges=ranges, - area=dist_y) - edge_3 = Edge(functions={'x': l * dist_x + point_1[0], - 'y': point_2[1], - 'normal_x': 0, - 'normal_y': 1}, - ranges=ranges, - area=dist_x) - edge_4 = Edge(functions={'x': point_1[0], - 'y': -l * dist_y + point_2[1], - 'normal_x': -1, - 'normal_y': 0}, - ranges=ranges, - area=dist_y) + edge_1 = Edge( + functions={ + "x": l * dist_x + point_1[0], + "y": point_1[1], + "normal_x": 0, + "normal_y": -1, + }, + ranges=ranges, + area=dist_x, + ) + edge_2 = Edge( + functions={ + "x": point_2[0], + "y": l * dist_y + point_1[1], + "normal_x": 1, + "normal_y": 0, + }, + ranges=ranges, + area=dist_y, + ) + edge_3 = Edge( + functions={ + "x": l * dist_x + point_1[0], + "y": point_2[1], + "normal_x": 0, + "normal_y": 1, + }, + ranges=ranges, + area=dist_x, + ) + edge_4 = Edge( + functions={ + "x": point_1[0], + "y": -l * dist_y + point_2[1], + "normal_x": -1, + "normal_y": 0, + }, + ranges=ranges, + area=dist_y, + ) self.edges = [edge_1, edge_2, edge_3, edge_4] center_x = point_1[0] + (dist_x) / 2 center_y = point_1[1] + (dist_y) / 2 @@ -121,81 +160,126 @@ class Rectangle(Geometry2D): y_diff = Abs(y - center_y) - (point_2[1] - center_y) outside_distance = sqrt(Max(x_diff, 0) ** 2 + Max(y_diff, 0) ** 2) inside_distance = Min(Max(x_diff, y_diff), 0) - self.sdf = - (outside_distance + inside_distance) - self.bounds = {'x': (min(point_1[0], point_2[0]), max(point_1[0], point_2[0])), - 'y': (min(point_1[1], point_2[1]), max(point_1[1], point_2[1]))} + self.sdf = -(outside_distance + inside_distance) + self.bounds = { + "x": (min(point_1[0], point_2[0]), max(point_1[0], point_2[0])), + "y": (min(point_1[1], point_2[1]), max(point_1[1], point_2[1])), + } class Circle(Geometry2D): - def __init__(self, center, radius): - theta, x, y = symbols('theta x y') + theta, x, y = symbols("theta x y") ranges = {theta: (0, 2 * math.pi)} - edge = Edge(functions={'x': center[0] + radius * cos(theta), - 'y': center[1] + radius * sin(theta), - 'normal_x': 1 * cos(theta), - 'normal_y': 1 * sin(theta)}, - ranges=ranges, - area=2 * pi * radius) + edge = Edge( + functions={ + "x": center[0] + radius * cos(theta), + "y": center[1] + radius * sin(theta), + "normal_x": 1 * cos(theta), + "normal_y": 1 * sin(theta), + }, + ranges=ranges, + area=2 * pi * radius, + ) self.edges = [edge] self.sdf = radius - sqrt((x - center[0]) ** 2 + (y - center[1]) ** 2) - self.bounds = {'x': (center[0] - radius, center[0] + radius), 'y': (center[1] - radius, center[1] + radius)} + self.bounds = { + "x": (center[0] - radius, center[0] + radius), + "y": (center[1] - radius, center[1] + radius), + } class Heart(Geometry2D): def __init__(self, center=(0, 0.5), radius=0.5): c1, c2 = center - theta, t, x, y = symbols('t theta x y') + theta, t, x, y = symbols("t theta x y") ranges = {theta: (0, math.pi), t: (0, 1)} - edge_1 = Edge(functions={'x': center[0] - t * radius, - 'y': center[1] - (1 - t) * radius, - 'normal_x': -1., - 'normal_y': -1.}, - ranges=ranges, - area=math.sqrt(2) * radius) + edge_1 = Edge( + functions={ + "x": center[0] - t * radius, + "y": center[1] - (1 - t) * radius, + "normal_x": -1.0, + "normal_y": -1.0, + }, + ranges=ranges, + area=math.sqrt(2) * radius, + ) - edge_2 = Edge(functions={'x': center[0] + t * radius, - 'y': center[1] - (1 - t) * radius, - 'normal_x': 1., - 'normal_y': -1.}, - ranges=ranges, - area=math.sqrt(2) * radius) + edge_2 = Edge( + functions={ + "x": center[0] + t * radius, + "y": center[1] - (1 - t) * radius, + "normal_x": 1.0, + "normal_y": -1.0, + }, + ranges=ranges, + area=math.sqrt(2) * radius, + ) - edge_3 = Edge(functions={'x': center[0] - radius / 2 + radius / math.sqrt(2) * cos(math.pi / 4 * 5 - theta), - 'y': center[1] + radius / 2 + radius / math.sqrt(2) * sin(math.pi / 4 * 5 - theta), - 'normal_x': cos(math.pi / 4 * 5 - theta), - 'normal_y': sin(math.pi / 4 * 5 - theta)}, - ranges=ranges, - area=math.sqrt(2) * radius * math.pi) + edge_3 = Edge( + functions={ + "x": center[0] + - radius / 2 + + radius / math.sqrt(2) * cos(math.pi / 4 * 5 - theta), + "y": center[1] + + radius / 2 + + radius / math.sqrt(2) * sin(math.pi / 4 * 5 - theta), + "normal_x": cos(math.pi / 4 * 5 - theta), + "normal_y": sin(math.pi / 4 * 5 - theta), + }, + ranges=ranges, + area=math.sqrt(2) * radius * math.pi, + ) - edge_4 = Edge(functions={'x': center[0] + radius / 2 + radius / math.sqrt(2) * cos(math.pi / 4 * 3 - theta), - 'y': center[1] + radius / 2 + radius / math.sqrt(2) * sin(math.pi / 4 * 3 - theta), - 'normal_x': cos(math.pi / 4 * 3 - theta), - 'normal_y': sin(math.pi / 4 * 3 - theta)}, - ranges=ranges, - area=math.sqrt(2) * radius * math.pi) + edge_4 = Edge( + functions={ + "x": center[0] + + radius / 2 + + radius / math.sqrt(2) * cos(math.pi / 4 * 3 - theta), + "y": center[1] + + radius / 2 + + radius / math.sqrt(2) * sin(math.pi / 4 * 3 - theta), + "normal_x": cos(math.pi / 4 * 3 - theta), + "normal_y": sin(math.pi / 4 * 3 - theta), + }, + ranges=ranges, + area=math.sqrt(2) * radius * math.pi, + ) self.edges = [edge_1, edge_2, edge_3, edge_4] - x, y = symbols('x y') + x, y = symbols("x y") x = (x - c1) * 0.5 / radius y = (y - c2) * 0.5 / radius + 0.5 - part1 = Heaviside(Abs(x) + y - 1) * (sqrt((Abs(x) - 0.25) ** 2 + (y - 0.75) ** 2) - math.sqrt(2) / 4) + part1 = Heaviside(Abs(x) + y - 1) * ( + sqrt((Abs(x) - 0.25) ** 2 + (y - 0.75) ** 2) - math.sqrt(2) / 4 + ) part_i = 0.5 * Max(Abs(x) + y, 0) - part2 = (1 - Heaviside(Abs(x) + y - 1)) * sign(Abs(x) - y) * Min(sqrt(Abs(x) ** 2 + (y - 1) ** 2), - sqrt((Abs(x) - part_i) ** 2 + ( - y - part_i) ** 2)) + part2 = ( + (1 - Heaviside(Abs(x) + y - 1)) + * sign(Abs(x) - y) + * Min( + sqrt(Abs(x) ** 2 + (y - 1) ** 2), + sqrt((Abs(x) - part_i) ** 2 + (y - part_i) ** 2), + ) + ) self.sdf = (-part1 - part2) * radius * 2 - self.bounds = {'x': ( - center[0] - 0.5 * radius - 0.5 * math.sqrt(2) * radius, - center[0] + 0.5 * radius + 0.5 * math.sqrt(2) * radius), - 'y': (center[1] - radius, center[1] + 0.5 * radius + 0.5 * math.sqrt(2) * radius)} + self.bounds = { + "x": ( + center[0] - 0.5 * radius - 0.5 * math.sqrt(2) * radius, + center[0] + 0.5 * radius + 0.5 * math.sqrt(2) * radius, + ), + "y": ( + center[1] - radius, + center[1] + 0.5 * radius + 0.5 * math.sqrt(2) * radius, + ), + } class Triangle(Geometry2D): def __init__(self, p0, p1, p2): - x, y, t = symbols('x y t') - N = CoordSys3D('N') + x, y, t = symbols("x y t") + N = CoordSys3D("N") P0 = p0[0] * N.i + p0[1] * N.j P1 = p1[0] * N.i + p1[1] * N.j P2 = p2[0] * N.i + p2[1] * N.j @@ -209,9 +293,11 @@ class Triangle(Geometry2D): u = sqrt(Min(pq0.dot(pq0), pq1.dot(pq1), pq2.dot(pq2))) - v = Min(s * (v0.dot(N.i) * e0.dot(N.j) - v0.dot(N.j) * e0.dot(N.i)), - s * (v1.dot(N.i) * e1.dot(N.j) - v1.dot(N.j) * e1.dot(N.i)), - s * (v2.dot(N.i) * e2.dot(N.j) - v2.dot(N.j) * e2.dot(N.i))) + v = Min( + s * (v0.dot(N.i) * e0.dot(N.j) - v0.dot(N.j) * e0.dot(N.i)), + s * (v1.dot(N.i) * e1.dot(N.j) - v1.dot(N.j) * e1.dot(N.i)), + s * (v2.dot(N.i) * e2.dot(N.j) - v2.dot(N.j) * e2.dot(N.i)), + ) self.sdf = u * sign(v) l0 = sqrt(e0.dot(e0)) @@ -219,33 +305,47 @@ class Triangle(Geometry2D): l2 = sqrt(e2.dot(e2)) ranges = {t: (0, 1)} in_out_sign = -sign(e0.cross(e1).dot(N.k)) - edge_1 = Edge(functions={'x': p1[0] + t * (p0[0] - p1[0]), - 'y': p1[1] + t * (p0[1] - p1[1]), - 'normal_x': (p0[1] - p1[1]) / l0 * in_out_sign, - 'normal_y': (p1[0] - p0[0]) / l0 * in_out_sign}, - ranges=ranges, - area=l0) - edge_2 = Edge(functions={'x': p2[0] + t * (p1[0] - p2[0]), - 'y': p2[1] + t * (p1[1] - p2[1]), - 'normal_x': (p1[1] - p2[1]) / l1 * in_out_sign, - 'normal_y': (p2[0] - p1[0]) / l1 * in_out_sign}, - ranges=ranges, - area=l1) - edge_3 = Edge(functions={'x': p0[0] + t * (p2[0] - p0[0]), - 'y': p0[1] + t * (p2[1] - p0[1]), - 'normal_x': (p2[1] - p0[1]) / l2 * in_out_sign, - 'normal_y': (p0[0] - p2[0]) / l2 * in_out_sign}, - ranges=ranges, - area=l2) + edge_1 = Edge( + functions={ + "x": p1[0] + t * (p0[0] - p1[0]), + "y": p1[1] + t * (p0[1] - p1[1]), + "normal_x": (p0[1] - p1[1]) / l0 * in_out_sign, + "normal_y": (p1[0] - p0[0]) / l0 * in_out_sign, + }, + ranges=ranges, + area=l0, + ) + edge_2 = Edge( + functions={ + "x": p2[0] + t * (p1[0] - p2[0]), + "y": p2[1] + t * (p1[1] - p2[1]), + "normal_x": (p1[1] - p2[1]) / l1 * in_out_sign, + "normal_y": (p2[0] - p1[0]) / l1 * in_out_sign, + }, + ranges=ranges, + area=l1, + ) + edge_3 = Edge( + functions={ + "x": p0[0] + t * (p2[0] - p0[0]), + "y": p0[1] + t * (p2[1] - p0[1]), + "normal_x": (p2[1] - p0[1]) / l2 * in_out_sign, + "normal_y": (p0[0] - p2[0]) / l2 * in_out_sign, + }, + ranges=ranges, + area=l2, + ) self.edges = [edge_1, edge_2, edge_3] - self.bounds = {'x': (min(p0[0], p1[0], p2[0]), max(p0[0], p1[0], p2[0])), - 'y': (min(p0[1], p1[1], p2[1]), max(p0[1], p1[1], p2[1]))} + self.bounds = { + "x": (min(p0[0], p1[0], p2[0]), max(p0[0], p1[0], p2[0])), + "y": (min(p0[1], p1[1], p2[1]), max(p0[1], p1[1], p2[1])), + } class Polygon(Geometry2D): def __init__(self, points): v = points - t = symbols('t') + t = symbols("t") ranges = {t: (0, 1)} def _sdf(x: np.ndarray, y: np.ndarray, **kwargs): @@ -255,13 +355,22 @@ class Polygon(Geometry2D): for i in range(len(v)): e = np.array(v[i - 1]) - np.array(v[i]) w = _points - np.array(v[i]) - b = w - e * np.clip((w * e).sum(axis=1, keepdims=True) / (e * e).sum(), 0, 1) + b = w - e * np.clip( + (w * e).sum(axis=1, keepdims=True) / (e * e).sum(), 0, 1 + ) d = np.minimum(d, (b * b).sum(keepdims=True, axis=1)) cond1 = _points[:, 1:] >= v[i][1] cond2 = _points[:, 1:] < v[i - 1][1] cond3 = e[0] * w[:, 1:] > e[1] * w[:, :1] inverse_idx1 = np.all([cond1, cond2, cond3], axis=0) - inverse_idx2 = np.all([np.logical_not(cond1), np.logical_not(cond2), np.logical_not(cond3)], axis=0) + inverse_idx2 = np.all( + [ + np.logical_not(cond1), + np.logical_not(cond2), + np.logical_not(cond3), + ], + axis=0, + ) inverse_idx = np.any([inverse_idx1, inverse_idx2], axis=0) s[inverse_idx] *= -1 return -np.sqrt(d) * s @@ -269,24 +378,30 @@ class Polygon(Geometry2D): self.sdf = _sdf self.edges = [] for i, _ in enumerate(points): - length = math.sqrt((points[i - 1][0] - points[i][0]) ** 2 + (points[i - 1][1] - points[i][1]) ** 2) - edge = Edge(functions={'x': points[i - 1][0] - t * (points[i - 1][0] - points[i][0]), - 'y': points[i - 1][1] - t * (points[i - 1][1] - points[i][1]), - 'normal_x': (points[i][1] - points[i - 1][1]) / length, - 'normal_y': (points[i - 1][0] - points[i][0]) / length}, - ranges=ranges, - area=length) + length = math.sqrt( + (points[i - 1][0] - points[i][0]) ** 2 + + (points[i - 1][1] - points[i][1]) ** 2 + ) + edge = Edge( + functions={ + "x": points[i - 1][0] - t * (points[i - 1][0] - points[i][0]), + "y": points[i - 1][1] - t * (points[i - 1][1] - points[i][1]), + "normal_x": (points[i][1] - points[i - 1][1]) / length, + "normal_y": (points[i - 1][0] - points[i][0]) / length, + }, + ranges=ranges, + area=length, + ) self.edges.append(edge) _p = iter(zip(*points)) _p1 = next(_p) _p2 = next(_p) - self.bounds = {'x': (min(_p1), max(_p1)), - 'y': (min(_p2), max(_p2))} + self.bounds = {"x": (min(_p1), max(_p1)), "y": (min(_p2), max(_p2))} def translation(self, direction: Union[List, Tuple]): raise NotImplementedError - def rotation(self, angle: float, axis: str = 'z', center=None): + def rotation(self, angle: float, axis: str = "z", center=None): raise NotImplementedError def scaling(self, scale: float, center: Tuple = None): @@ -294,89 +409,115 @@ class Polygon(Geometry2D): class Plane(Geometry3D): - def __init__(self, point_1, point_2, normal): assert point_1[0] == point_2[0], "Points must have the same x coordinate" - x, y, z, s_1, s_2 = symbols('x y z s_1 s_2') - center = (point_1[0] + (point_2[0] - point_1[0]) / 2, - point_1[1] + (point_2[1] - point_1[1]) / 2, - point_1[2] + (point_2[2] - point_1[2]) / 2) + x, y, z, s_1, s_2 = symbols("x y z s_1 s_2") + center = ( + point_1[0] + (point_2[0] - point_1[0]) / 2, + point_1[1] + (point_2[1] - point_1[1]) / 2, + point_1[2] + (point_2[2] - point_1[2]) / 2, + ) side_y = point_2[1] - point_1[1] side_z = point_2[2] - point_1[2] ranges = {s_1: (-1, 1), s_2: (-1, 1)} - edge = Edge(functions={'x': center[0], - 'y': center[1] + 0.5 * s_1 * side_y, - 'z': center[2] + 0.5 * s_2 * side_z, - 'normal_x': 1e-10 + normal, # TODO rm 1e-10 - 'normal_y': 0, - 'normal_z': 0}, - ranges=ranges, - area=side_y * side_z) + edge = Edge( + functions={ + "x": center[0], + "y": center[1] + 0.5 * s_1 * side_y, + "z": center[2] + 0.5 * s_2 * side_z, + "normal_x": 1e-10 + normal, # TODO rm 1e-10 + "normal_y": 0, + "normal_z": 0, + }, + ranges=ranges, + area=side_y * side_z, + ) self.edges = [edge] self.sdf = normal * (center[0] - x) - self.bounds = {'x': (min(point_1[0], point_2[0]), max(point_1[0], point_2[0])), - 'y': (min(point_1[1], point_2[1]), max(point_1[1], point_2[1])), - 'z': (min(point_1[2], point_2[2]), max(point_1[2], point_2[2])), } + self.bounds = { + "x": (min(point_1[0], point_2[0]), max(point_1[0], point_2[0])), + "y": (min(point_1[1], point_2[1]), max(point_1[1], point_2[1])), + "z": (min(point_1[2], point_2[2]), max(point_1[2], point_2[2])), + } class Tube3D(Geometry3D): - def __init__(self, point_1, point_2): - x, y, z, s_1, s_2 = symbols('x y z s_1 s_2') - center = (point_1[0] + (point_2[0] - point_1[0]) / 2, - point_1[1] + (point_2[1] - point_1[1]) / 2, - point_1[2] + (point_2[2] - point_1[2]) / 2) + x, y, z, s_1, s_2 = symbols("x y z s_1 s_2") + center = ( + point_1[0] + (point_2[0] - point_1[0]) / 2, + point_1[1] + (point_2[1] - point_1[1]) / 2, + point_1[2] + (point_2[2] - point_1[2]) / 2, + ) side_x = point_2[0] - point_1[0] side_y = point_2[1] - point_1[1] side_z = point_2[2] - point_1[2] ranges = {s_1: (-1, 1), s_2: (-1, 1)} - edge_1 = Edge(functions={'x': center[0] + 0.5 * s_1 * side_x, - 'y': center[1] + 0.5 * s_2 * side_y, - 'z': center[2] + 0.5 * side_z, - 'normal_x': 0, - 'normal_y': 0, - 'normal_z': 1}, - ranges=ranges, - area=side_x * side_y) - edge_2 = Edge(functions={'x': center[0] + 0.5 * s_1 * side_x, - 'y': center[1] + 0.5 * s_2 * side_y, - 'z': center[2] - 0.5 * side_z, - 'normal_x': 0, - 'normal_y': 0, - 'normal_z': -1}, - ranges=ranges, - area=side_x * side_y) - edge_3 = Edge(functions={'x': center[0] + 0.5 * s_1 * side_x, - 'y': center[1] + 0.5 * side_y, - 'z': center[2] + 0.5 * s_2 * side_z, - 'normal_x': 0, - 'normal_y': 1, - 'normal_z': 0}, - ranges=ranges, - area=side_x * side_z) - edge_4 = Edge(functions={'x': center[0] + 0.5 * s_1 * side_x, - 'y': center[1] - 0.5 * side_y, - 'z': center[2] + 0.5 * s_2 * side_z, - 'normal_x': 0, - 'normal_y': -1, - 'normal_z': 0}, - ranges=ranges, - area=side_x * side_z) + edge_1 = Edge( + functions={ + "x": center[0] + 0.5 * s_1 * side_x, + "y": center[1] + 0.5 * s_2 * side_y, + "z": center[2] + 0.5 * side_z, + "normal_x": 0, + "normal_y": 0, + "normal_z": 1, + }, + ranges=ranges, + area=side_x * side_y, + ) + edge_2 = Edge( + functions={ + "x": center[0] + 0.5 * s_1 * side_x, + "y": center[1] + 0.5 * s_2 * side_y, + "z": center[2] - 0.5 * side_z, + "normal_x": 0, + "normal_y": 0, + "normal_z": -1, + }, + ranges=ranges, + area=side_x * side_y, + ) + edge_3 = Edge( + functions={ + "x": center[0] + 0.5 * s_1 * side_x, + "y": center[1] + 0.5 * side_y, + "z": center[2] + 0.5 * s_2 * side_z, + "normal_x": 0, + "normal_y": 1, + "normal_z": 0, + }, + ranges=ranges, + area=side_x * side_z, + ) + edge_4 = Edge( + functions={ + "x": center[0] + 0.5 * s_1 * side_x, + "y": center[1] - 0.5 * side_y, + "z": center[2] + 0.5 * s_2 * side_z, + "normal_x": 0, + "normal_y": -1, + "normal_z": 0, + }, + ranges=ranges, + area=side_x * side_z, + ) self.edges = [edge_1, edge_2, edge_3, edge_4] y_dist = Abs(y - center[1]) - 0.5 * side_y z_dist = Abs(z - center[2]) - 0.5 * side_z outside_distance = sqrt(Max(y_dist, 0) ** 2 + Max(z_dist, 0) ** 2) inside_distance = Min(Max(y_dist, z_dist), 0) - self.sdf = - (outside_distance + inside_distance) + self.sdf = -(outside_distance + inside_distance) - self.bounds = {'x': (min(point_1[0], point_2[0]), max(point_1[0], point_2[0])), - 'y': (min(point_1[1], point_2[1]), max(point_1[1], point_2[1])), - 'z': (min(point_1[2], point_2[2]), max(point_1[2], point_2[2])), } + self.bounds = { + "x": (min(point_1[0], point_2[0]), max(point_1[0], point_2[0])), + "y": (min(point_1[1], point_2[1]), max(point_1[1], point_2[1])), + "z": (min(point_1[2], point_2[2]), max(point_1[2], point_2[2])), + } class Tube(Tube3D): @@ -386,164 +527,226 @@ class Tube(Tube3D): class CircularTube(Geometry3D): def __init__(self, center, radius, height): - x, y, z, h, theta = symbols('x y z h theta') + x, y, z, h, theta = symbols("x y z h theta") ranges = {h: (-1, 1), theta: (0, 2 * pi)} - edge_1 = Edge(functions={'x': center[0] + radius * cos(theta), - 'y': center[1] + radius * sin(theta), - 'z': center[2] + 0.5 * h * height, - 'normal_x': 1 * cos(theta), - 'normal_y': 1 * sin(theta), - 'normal_z': 0}, - ranges=ranges, - area=height * 2 * pi * radius) + edge_1 = Edge( + functions={ + "x": center[0] + radius * cos(theta), + "y": center[1] + radius * sin(theta), + "z": center[2] + 0.5 * h * height, + "normal_x": 1 * cos(theta), + "normal_y": 1 * sin(theta), + "normal_z": 0, + }, + ranges=ranges, + area=height * 2 * pi * radius, + ) self.edges = [edge_1] self.sdf = radius - sqrt((x - center[0]) ** 2 + (y - center[1]) ** 2) - self.bounds = {'x': (center[0] - radius, center[0] + radius), - 'y': (center[1] - radius, center[1] + radius), - 'z': (center[2] - height / 2, center[2] + height / 2)} + self.bounds = { + "x": (center[0] - radius, center[0] + radius), + "y": (center[1] - radius, center[1] + radius), + "z": (center[2] - height / 2, center[2] + height / 2), + } class Box(Geometry3D): def __init__(self, point_1, point_2): - x, y, z, s_1, s_2 = symbols('x y z s_1 s_2') - center = (point_1[0] + (point_2[0] - point_1[0]) / 2, - point_1[1] + (point_2[1] - point_1[1]) / 2, - point_1[2] + (point_2[2] - point_1[2]) / 2) + x, y, z, s_1, s_2 = symbols("x y z s_1 s_2") + center = ( + point_1[0] + (point_2[0] - point_1[0]) / 2, + point_1[1] + (point_2[1] - point_1[1]) / 2, + point_1[2] + (point_2[2] - point_1[2]) / 2, + ) side_x = point_2[0] - point_1[0] side_y = point_2[1] - point_1[1] side_z = point_2[2] - point_1[2] ranges = {s_1: (-1, 1), s_2: (-1, 1)} - self.bounds = {'x': (min(point_1[0], point_2[0]), max(point_1[0], point_2[0])), - 'y': (min(point_1[1], point_2[1]), max(point_1[1], point_2[1])), - 'z': (min(point_1[2], point_2[2]), max(point_1[2], point_2[2])), } + self.bounds = { + "x": (min(point_1[0], point_2[0]), max(point_1[0], point_2[0])), + "y": (min(point_1[1], point_2[1]), max(point_1[1], point_2[1])), + "z": (min(point_1[2], point_2[2]), max(point_1[2], point_2[2])), + } - edge_1 = Edge(functions={'x': center[0] + 0.5 * s_1 * side_x, - 'y': center[1] + 0.5 * s_2 * side_y, - 'z': center[2] + 0.5 * side_z, - 'normal_x': 0, - 'normal_y': 0, - 'normal_z': 1}, - ranges=ranges, - area=side_x * side_y) - edge_2 = Edge(functions={'x': center[0] + 0.5 * s_1 * side_x, - 'y': center[1] + 0.5 * s_2 * side_y, - 'z': center[2] - 0.5 * side_z, - 'normal_x': 0, - 'normal_y': 0, - 'normal_z': -1}, - ranges=ranges, - area=side_x * side_y) - edge_3 = Edge(functions={'x': center[0] + 0.5 * s_1 * side_x, - 'y': center[1] + 0.5 * side_y, - 'z': center[2] + 0.5 * s_2 * side_z, - 'normal_x': 0, - 'normal_y': 1, - 'normal_z': 0}, - ranges=ranges, - area=side_x * side_z) - edge_4 = Edge(functions={'x': center[0] + 0.5 * s_1 * side_x, - 'y': center[1] - 0.5 * side_y, - 'z': center[2] + 0.5 * s_2 * side_z, - 'normal_x': 0, - 'normal_y': -1, - 'normal_z': 0}, - ranges=ranges, - area=side_x * side_z) - edge_5 = Edge(functions={'x': center[0] + 0.5 * side_x, - 'y': center[1] + 0.5 * s_1 * side_y, - 'z': center[2] + 0.5 * s_2 * side_z, - 'normal_x': 1, - 'normal_y': 0, - 'normal_z': 0}, - ranges=ranges, - area=side_y * side_z) - edge_6 = Edge(functions={'x': center[0] - 0.5 * side_x, - 'y': center[1] + 0.5 * s_1 * side_y, - 'z': center[2] + 0.5 * s_2 * side_z, - 'normal_x': -1, - 'normal_y': 0, - 'normal_z': 0}, - ranges=ranges, - area=side_y * side_z) + edge_1 = Edge( + functions={ + "x": center[0] + 0.5 * s_1 * side_x, + "y": center[1] + 0.5 * s_2 * side_y, + "z": center[2] + 0.5 * side_z, + "normal_x": 0, + "normal_y": 0, + "normal_z": 1, + }, + ranges=ranges, + area=side_x * side_y, + ) + edge_2 = Edge( + functions={ + "x": center[0] + 0.5 * s_1 * side_x, + "y": center[1] + 0.5 * s_2 * side_y, + "z": center[2] - 0.5 * side_z, + "normal_x": 0, + "normal_y": 0, + "normal_z": -1, + }, + ranges=ranges, + area=side_x * side_y, + ) + edge_3 = Edge( + functions={ + "x": center[0] + 0.5 * s_1 * side_x, + "y": center[1] + 0.5 * side_y, + "z": center[2] + 0.5 * s_2 * side_z, + "normal_x": 0, + "normal_y": 1, + "normal_z": 0, + }, + ranges=ranges, + area=side_x * side_z, + ) + edge_4 = Edge( + functions={ + "x": center[0] + 0.5 * s_1 * side_x, + "y": center[1] - 0.5 * side_y, + "z": center[2] + 0.5 * s_2 * side_z, + "normal_x": 0, + "normal_y": -1, + "normal_z": 0, + }, + ranges=ranges, + area=side_x * side_z, + ) + edge_5 = Edge( + functions={ + "x": center[0] + 0.5 * side_x, + "y": center[1] + 0.5 * s_1 * side_y, + "z": center[2] + 0.5 * s_2 * side_z, + "normal_x": 1, + "normal_y": 0, + "normal_z": 0, + }, + ranges=ranges, + area=side_y * side_z, + ) + edge_6 = Edge( + functions={ + "x": center[0] - 0.5 * side_x, + "y": center[1] + 0.5 * s_1 * side_y, + "z": center[2] + 0.5 * s_2 * side_z, + "normal_x": -1, + "normal_y": 0, + "normal_z": 0, + }, + ranges=ranges, + area=side_y * side_z, + ) self.edges = [edge_1, edge_2, edge_3, edge_4, edge_5, edge_6] x_dist = Abs(x - center[0]) - 0.5 * side_x y_dist = Abs(y - center[1]) - 0.5 * side_y z_dist = Abs(z - center[2]) - 0.5 * side_z - outside_distance = sqrt(Max(x_dist, 0) ** 2 + Max(y_dist, 0) ** 2 + Max(z_dist, 0) ** 2) + outside_distance = sqrt( + Max(x_dist, 0) ** 2 + Max(y_dist, 0) ** 2 + Max(z_dist, 0) ** 2 + ) inside_distance = Min(Max(x_dist, y_dist, z_dist), 0) - self.sdf = - (outside_distance + inside_distance) - self.bounds = {'x': (min(point_1[0], point_2[0]), max(point_1[0], point_2[0])), - 'y': (min(point_1[1], point_2[1]), max(point_1[1], point_2[1])), - 'z': (min(point_1[2], point_2[2]), max(point_1[2], point_2[2])), } + self.sdf = -(outside_distance + inside_distance) + self.bounds = { + "x": (min(point_1[0], point_2[0]), max(point_1[0], point_2[0])), + "y": (min(point_1[1], point_2[1]), max(point_1[1], point_2[1])), + "z": (min(point_1[2], point_2[2]), max(point_1[2], point_2[2])), + } class Sphere(Geometry3D): - def __init__(self, center, radius): - x, y, z, v_1, v_2, u_1, u_2 = symbols('x y z v_1 v_2 u_1 u_2') + x, y, z, v_1, v_2, u_1, u_2 = symbols("x y z v_1 v_2 u_1 u_2") ranges = {v_1: (0, 1), v_2: (0, 1), u_1: (0, 1), u_2: (0, 1)} r_1 = sqrt(-log(v_1)) * cos(2 * pi * u_1) r_2 = sqrt(-log(v_1)) * sin(2 * pi * u_1) r_3 = sqrt(-log(v_2)) * cos(2 * pi * u_2) norm = sqrt(r_1 ** 2 + r_2 ** 2 + r_3 ** 2) - edge_1 = Edge(functions={'x': center[0] + radius * r_1 / norm, - 'y': center[1] + radius * r_2 / norm, - 'z': center[2] + radius * r_3 / norm, - 'normal_x': r_1 / norm, - 'normal_y': r_2 / norm, - 'normal_z': r_3 / norm}, - ranges=ranges, - area=4 * pi * radius ** 2) + edge_1 = Edge( + functions={ + "x": center[0] + radius * r_1 / norm, + "y": center[1] + radius * r_2 / norm, + "z": center[2] + radius * r_3 / norm, + "normal_x": r_1 / norm, + "normal_y": r_2 / norm, + "normal_z": r_3 / norm, + }, + ranges=ranges, + area=4 * pi * radius ** 2, + ) self.edges = [edge_1] - self.sdf = radius - sqrt((x - center[0]) ** 2 + (y - center[1]) ** 2 + (z - center[2]) ** 2) - self.bounds = {'x': (center[0] - radius, center[0] + radius), - 'y': (center[1] - radius, center[1] + radius), - 'z': (center[2] - radius, center[2] + radius)} + self.sdf = radius - sqrt( + (x - center[0]) ** 2 + (y - center[1]) ** 2 + (z - center[2]) ** 2 + ) + self.bounds = { + "x": (center[0] - radius, center[0] + radius), + "y": (center[1] - radius, center[1] + radius), + "z": (center[2] - radius, center[2] + radius), + } class Cylinder(Geometry3D): - def __init__(self, center, radius, height): - x, y, z, h, r, theta = symbols('x y z h r theta') + x, y, z, h, r, theta = symbols("x y z h r theta") ranges = {h: (-1, 1), r: (0, 1), theta: (0, 2 * pi)} - edge_1 = Edge(functions={'x': center[0] + radius * cos(theta), - 'y': center[1] + radius * sin(theta), - 'z': center[2] + 0.5 * h * height, - 'normal_x': 1 * cos(theta), - 'normal_y': 1 * sin(theta), - 'normal_z': 0}, - ranges=ranges, - area=height * 2 * pi * radius) - edge_2 = Edge(functions={'x': center[0] + sqrt(r) * radius * cos(theta), - 'y': center[1] + sqrt(r) * radius * sin(theta), - 'z': center[2] + 0.5 * height, - 'normal_x': 0, - 'normal_y': 0, - 'normal_z': 1}, - ranges=ranges, - area=math.pi * radius ** 2) - edge_3 = Edge(functions={'x': center[0] + sqrt(r) * radius * cos(theta), - 'y': center[1] + sqrt(r) * radius * sin(theta), - 'z': center[2] - 0.5 * height, - 'normal_x': 0, - 'normal_y': 0, - 'normal_z': -1}, - ranges=ranges, - area=pi * radius ** 2) + edge_1 = Edge( + functions={ + "x": center[0] + radius * cos(theta), + "y": center[1] + radius * sin(theta), + "z": center[2] + 0.5 * h * height, + "normal_x": 1 * cos(theta), + "normal_y": 1 * sin(theta), + "normal_z": 0, + }, + ranges=ranges, + area=height * 2 * pi * radius, + ) + edge_2 = Edge( + functions={ + "x": center[0] + sqrt(r) * radius * cos(theta), + "y": center[1] + sqrt(r) * radius * sin(theta), + "z": center[2] + 0.5 * height, + "normal_x": 0, + "normal_y": 0, + "normal_z": 1, + }, + ranges=ranges, + area=math.pi * radius ** 2, + ) + edge_3 = Edge( + functions={ + "x": center[0] + sqrt(r) * radius * cos(theta), + "y": center[1] + sqrt(r) * radius * sin(theta), + "z": center[2] - 0.5 * height, + "normal_x": 0, + "normal_y": 0, + "normal_z": -1, + }, + ranges=ranges, + area=pi * radius ** 2, + ) self.edges = [edge_1, edge_2, edge_3] r_dist = sqrt((x - center[0]) ** 2 + (y - center[1]) ** 2) z_dist = Abs(z - center[2]) - outside_distance = sqrt(Min(0, radius - r_dist) ** 2 + Min(0, 0.5 * height - z_dist) ** 2) - inside_distance = -1 * Min(Abs(Min(0, r_dist - radius)), Abs(Min(0, z_dist - 0.5 * height))) - self.sdf = - (outside_distance + inside_distance) + outside_distance = sqrt( + Min(0, radius - r_dist) ** 2 + Min(0, 0.5 * height - z_dist) ** 2 + ) + inside_distance = -1 * Min( + Abs(Min(0, r_dist - radius)), Abs(Min(0, z_dist - 0.5 * height)) + ) + self.sdf = -(outside_distance + inside_distance) - self.bounds = {'x': (center[0] - radius, center[0] + radius), - 'y': (center[1] - radius, center[1] + radius), - 'z': (center[2] - height / 2, center[2] + height / 2)} + self.bounds = { + "x": (center[0] - radius, center[0] + radius), + "y": (center[1] - radius, center[1] + radius), + "z": (center[2] - height / 2, center[2] + height / 2), + } diff --git a/idrlnet/optim.py b/idrlnet/optim.py index ea24678..234856f 100644 --- a/idrlnet/optim.py +++ b/idrlnet/optim.py @@ -6,7 +6,7 @@ import inspect import math from typing import Dict -__all__ = ['get_available_class', 'Optimizable'] +__all__ = ["get_available_class", "Optimizable"] def get_available_class(module, class_name) -> Dict[str, type]: @@ -19,20 +19,28 @@ def get_available_class(module, class_name) -> Dict[str, type]: :return: A dict mapping from subclass.name to subclass :rtype: Dict[str, type] """ - return dict(filter( - lambda x: inspect.isclass(x[1]) - and issubclass(x[1], class_name) - and (not x[1] == class_name), - inspect.getmembers(module))) + return dict( + filter( + lambda x: inspect.isclass(x[1]) + and issubclass(x[1], class_name) + and (not x[1] == class_name), + inspect.getmembers(module), + ) + ) class Optimizable(metaclass=abc.ABCMeta): """An abstract class for organizing optimization related configuration and operations. The interface is implemented by solver.Solver """ - OPTIMIZER_MAP = get_available_class(module=torch.optim, class_name=torch.optim.Optimizer) - SCHEDULE_MAP = get_available_class(module=torch.optim.lr_scheduler, - class_name=torch.optim.lr_scheduler._LRScheduler) + + OPTIMIZER_MAP = get_available_class( + module=torch.optim, class_name=torch.optim.Optimizer + ) + SCHEDULE_MAP = get_available_class( + module=torch.optim.lr_scheduler, + class_name=torch.optim.lr_scheduler._LRScheduler, + ) @property def optimizers(self): @@ -60,23 +68,25 @@ class Optimizable(metaclass=abc.ABCMeta): self.configure_optimizers() def parse_optimizer(self, **kwargs): - default_config = dict(optimizer='Adam', lr=1e-3) - default_config.update(kwargs.get('opt_config', {})) + default_config = dict(optimizer="Adam", lr=1e-3) + default_config.update(kwargs.get("opt_config", {})) self.optimizer_config = default_config def parse_lr_schedule(self, **kwargs): - default_config = dict(scheduler='ExponentialLR', gamma=math.pow(0.95, 0.001), last_epoch=-1) - default_config.update(kwargs.get('schedule_config', {})) + default_config = dict( + scheduler="ExponentialLR", gamma=math.pow(0.95, 0.001), last_epoch=-1 + ) + default_config.update(kwargs.get("schedule_config", {})) self.schedule_config = default_config def __str__(self): - if 'optimizer_config' in self.__dict__: + if "optimizer_config" in self.__dict__: opt_str = str(self.optimizer_config) else: - opt_str = str('optimizer is empty...') + opt_str = str("optimizer is empty...") - if 'schedule_config' in self.__dict__: + if "schedule_config" in self.__dict__: schedule_str = str(self.schedule_config) else: - schedule_str = str('scheduler is empty...') + schedule_str = str("scheduler is empty...") return "\n".join([opt_str, schedule_str]) diff --git a/idrlnet/pde_op/equations.py b/idrlnet/pde_op/equations.py index 028075e..521a109 100644 --- a/idrlnet/pde_op/equations.py +++ b/idrlnet/pde_op/equations.py @@ -5,7 +5,14 @@ from sympy import Function, Number, symbols from idrlnet.pde import PdeNode -__all__ = ['DiffusionNode', 'NavierStokesNode', 'WaveNode', 'BurgersNode', 'SchrodingerNode', 'AllenCahnNode'] +__all__ = [ + "DiffusionNode", + "NavierStokesNode", + "WaveNode", + "BurgersNode", + "SchrodingerNode", + "AllenCahnNode", +] def symbolize(s, input_variables=None): @@ -19,134 +26,172 @@ def symbolize(s, input_variables=None): class DiffusionNode(PdeNode): - def __init__(self, T='T', D='D', Q=0, dim=3, time=True, **kwargs): + def __init__(self, T="T", D="D", Q=0, dim=3, time=True, **kwargs): super().__init__(**kwargs) self.T = T - x, y, z, t = symbols('x y z t') - input_variables = {'x': x, 'y': y, 'z': z, 't': t} + x, y, z, t = symbols("x y z t") + input_variables = {"x": x, "y": y, "z": z, "t": t} assert type(T) == str, "T should be string" T = symbolize(T, input_variables=input_variables) D = symbolize(D, input_variables=input_variables) Q = symbolize(Q, input_variables=input_variables) - self.equations = {'diffusion_' + self.T: -Q} + self.equations = {"diffusion_" + self.T: -Q} if time: - self.equations['diffusion_' + self.T] += T.diff(t) + self.equations["diffusion_" + self.T] += T.diff(t) coord = [x, y, z] for i in range(dim): s = coord[i] - self.equations['diffusion_' + self.T] -= (D * T.diff(s)).diff(s) + self.equations["diffusion_" + self.T] -= (D * T.diff(s)).diff(s) self.make_nodes() class NavierStokesNode(PdeNode): - def __init__(self, nu=0.1, rho=1., dim=2., time=False, **kwargs): + def __init__(self, nu=0.1, rho=1.0, dim=2.0, time=False, **kwargs): super().__init__(**kwargs) self.dim = dim assert self.dim in [2, 3], "dim should be 2 or 3" self.time = time - x, y, z, t = symbols('x y z t') - input_variables = {'x': x, 'y': y, 'z': z, 't': t} + x, y, z, t = symbols("x y z t") + input_variables = {"x": x, "y": y, "z": z, "t": t} if self.dim == 2: - input_variables.pop('z') + input_variables.pop("z") if not self.time: - input_variables.pop('t') + input_variables.pop("t") - u = symbolize('u', input_variables) - v = symbolize('v', input_variables) - w = symbolize('w', input_variables) if self.dim == 3 else Number(0) - p = symbolize('p', input_variables) + u = symbolize("u", input_variables) + v = symbolize("v", input_variables) + w = symbolize("w", input_variables) if self.dim == 3 else Number(0) + p = symbolize("p", input_variables) nu = symbolize(nu, input_variables) rho = symbolize(rho, input_variables) mu = rho * nu - self.equations = {'continuity': rho.diff(t) + (rho * u).diff(x) + (rho * v).diff(y) + (rho * w).diff(z), - 'momentum_x': ((rho * u).diff(t) - + (u * ((rho * u).diff(x)) + v * ((rho * u).diff(y)) + w * ((rho * u).diff(z))) - + p.diff(x) - - (mu * u.diff(x)).diff(x) - - (mu * u.diff(y)).diff(y) - - (mu * u.diff(z)).diff(z)), - 'momentum_y': ((rho * v).diff(t) - + (u * ((rho * v).diff(x)) + v * ((rho * v).diff(y)) + w * ((rho * v).diff(z))) - + p.diff(y) - - (mu * v.diff(x)).diff(x) - - (mu * v.diff(y)).diff(y) - - (mu * v.diff(z)).diff(z)), } + self.equations = { + "continuity": rho.diff(t) + + (rho * u).diff(x) + + (rho * v).diff(y) + + (rho * w).diff(z), + "momentum_x": ( + (rho * u).diff(t) + + ( + u * ((rho * u).diff(x)) + + v * ((rho * u).diff(y)) + + w * ((rho * u).diff(z)) + ) + + p.diff(x) + - (mu * u.diff(x)).diff(x) + - (mu * u.diff(y)).diff(y) + - (mu * u.diff(z)).diff(z) + ), + "momentum_y": ( + (rho * v).diff(t) + + ( + u * ((rho * v).diff(x)) + + v * ((rho * v).diff(y)) + + w * ((rho * v).diff(z)) + ) + + p.diff(y) + - (mu * v.diff(x)).diff(x) + - (mu * v.diff(y)).diff(y) + - (mu * v.diff(z)).diff(z) + ), + } if self.dim == 3: - self.equations['momentum_z'] = ((rho * w).diff(t) - + (u * ((rho * w).diff(x)) + v * ((rho * w).diff(y)) + w * ( - (rho * w).diff(z))) + p.diff(z) - (mu * w.diff(x)).diff(x) - (mu * w.diff(y)).diff(y) - ( - mu * w.diff(z)).diff(z)) + self.equations["momentum_z"] = ( + (rho * w).diff(t) + + ( + u * ((rho * w).diff(x)) + + v * ((rho * w).diff(y)) + + w * ((rho * w).diff(z)) + ) + + p.diff(z) + - (mu * w.diff(x)).diff(x) + - (mu * w.diff(y)).diff(y) + - (mu * w.diff(z)).diff(z) + ) self.make_nodes() class WaveNode(PdeNode): - def __init__(self, u='u', c='c', dim=3, time=True, **kwargs): + def __init__(self, u="u", c="c", dim=3, time=True, **kwargs): super().__init__(**kwargs) self.u = u self.dim = dim self.time = time - x, y, z, t = symbols('x y z t') - input_variables = {'x': x, 'y': y, 'z': z, 't': t} + x, y, z, t = symbols("x y z t") + input_variables = {"x": x, "y": y, "z": z, "t": t} assert self.dim in [1, 2, 3], "dim should be 1, 2 or 3." if self.dim == 1: - input_variables.pop('y') - input_variables.pop('z') + input_variables.pop("y") + input_variables.pop("z") elif self.dim == 2: - input_variables.pop('z') + input_variables.pop("z") if not self.time: - input_variables.pop('t') + input_variables.pop("t") assert type(u) == str, "u should be string" u = symbolize(u, input_variables) c = symbolize(c, input_variables) - self.equations = {'wave_equation': (u.diff(t, 2) - - (c ** 2 * u.diff(x)).diff(x) - - (c ** 2 * u.diff(y)).diff(y) - - (c ** 2 * u.diff(z)).diff(z))} + self.equations = { + "wave_equation": ( + u.diff(t, 2) + - (c ** 2 * u.diff(x)).diff(x) + - (c ** 2 * u.diff(y)).diff(y) + - (c ** 2 * u.diff(z)).diff(z) + ) + } self.make_nodes() class BurgersNode(PdeNode): - def __init__(self, u: str = 'u', v='v'): + def __init__(self, u: str = "u", v="v"): super().__init__() - x, t = symbols('x t') - input_variables = {'x': x, 't': t} + x, t = symbols("x t") + input_variables = {"x": x, "t": t} assert type(u) == str, "u needs to be string" u = symbolize(u, input_variables) v = symbolize(v, input_variables) - self.equations = {f'burgers_{str(u)}': (u.diff(t) + u * u.diff(x) - v * (u.diff(x)).diff(x))} + self.equations = { + f"burgers_{str(u)}": (u.diff(t) + u * u.diff(x) - v * (u.diff(x)).diff(x)) + } self.make_nodes() class SchrodingerNode(PdeNode): - def __init__(self, u='u', v='v', c=0.5): + def __init__(self, u="u", v="v", c=0.5): super().__init__() self.c = c - x, t = symbols('x t') - input_variables = {'x': x, 't': t} + x, t = symbols("x t") + input_variables = {"x": x, "t": t} assert type(u) == str, "u should be string" u = symbolize(u, input_variables) assert type(v) == str, "v should be string" v = symbolize(v, input_variables) - self.equations = {'real': u.diff(t) + self.c * v.diff(x, 2) + (u ** 2 + v ** 2) * v, - 'imaginary': v.diff(t) - self.c * u.diff(x, 2) - (u ** 2 + v ** 2) * u} + self.equations = { + "real": u.diff(t) + self.c * v.diff(x, 2) + (u ** 2 + v ** 2) * v, + "imaginary": v.diff(t) - self.c * u.diff(x, 2) - (u ** 2 + v ** 2) * u, + } self.make_nodes() class AllenCahnNode(PdeNode): - def __init__(self, u='u', gamma_1=0.0001, gamma_2=5): + def __init__(self, u="u", gamma_1=0.0001, gamma_2=5): super().__init__() self.gama_1 = gamma_1 self.gama_2 = gamma_2 - x, t = symbols('x t') - input_variables = {'x': x, 't': t} + x, t = symbols("x t") + input_variables = {"x": x, "t": t} assert type(u) == str, "u should be string" u = symbolize(u, input_variables) - self.equations = {'AllenCahn_' + str(u): u.diff(t) - self.gama_1 * u.diff(x, 2) - self.gama_2 * (u - u ** 3)} + self.equations = { + "AllenCahn_" + + str(u): u.diff(t) + - self.gama_1 * u.diff(x, 2) + - self.gama_2 * (u - u ** 3) + } self.make_nodes() diff --git a/idrlnet/pde_op/operator.py b/idrlnet/pde_op/operator.py index 24f2e85..4493a62 100644 --- a/idrlnet/pde_op/operator.py +++ b/idrlnet/pde_op/operator.py @@ -11,7 +11,16 @@ from typing import Union, List from idrlnet.torch_util import integral, _replace_derivatives, torch_lambdify from idrlnet.variable import Variables -__all__ = ['NormalGradient', 'Difference', 'Derivative', 'Curl', 'Divergence', 'ICNode', 'Int1DNode', 'IntEq'] +__all__ = [ + "NormalGradient", + "Difference", + "Derivative", + "Curl", + "Divergence", + "ICNode", + "Int1DNode", + "IntEq", +] class NormalGradient(PdeNode): @@ -21,48 +30,53 @@ class NormalGradient(PdeNode): self.dim = dim self.time = time - x, y, z, normal_x, normal_y, normal_z, t = symbols('x y z normal_x normal_y normal_z t') + x, y, z, normal_x, normal_y, normal_z, t = symbols( + "x y z normal_x normal_y normal_z t" + ) - input_variables = {'x': x, - 'y': y, - 'z': z, - 't': t} + input_variables = {"x": x, "y": y, "z": z, "t": t} if self.dim == 1: - input_variables.pop('y') - input_variables.pop('z') + input_variables.pop("y") + input_variables.pop("z") elif self.dim == 2: - input_variables.pop('z') + input_variables.pop("z") if not self.time: - input_variables.pop('t') + input_variables.pop("t") T = Function(T)(*input_variables) - self.equations = {'normal_gradient_' + self.T: (normal_x * T.diff(x) - + normal_y * T.diff(y) - + normal_z * T.diff(z))} + self.equations = { + "normal_gradient_" + + self.T: ( + normal_x * T.diff(x) + normal_y * T.diff(y) + normal_z * T.diff(z) + ) + } self.make_nodes() class Difference(PdeNode): - def __init__(self, T: Union[str, Symbol, float, int], S: Union[str, Symbol, float, int], dim=3, time=True): + def __init__( + self, + T: Union[str, Symbol, float, int], + S: Union[str, Symbol, float, int], + dim=3, + time=True, + ): super().__init__() self.T = T self.S = S self.dim = dim self.time = time - x, y, z = symbols('x y z') - t = Symbol('t') - input_variables = {'x': x, - 'y': y, - 'z': z, - 't': t} + x, y, z = symbols("x y z") + t = Symbol("t") + input_variables = {"x": x, "y": y, "z": z, "t": t} if self.dim == 1: - input_variables.pop('y') - input_variables.pop('z') + input_variables.pop("y") + input_variables.pop("z") elif self.dim == 2: - input_variables.pop('z') + input_variables.pop("z") if not self.time: - input_variables.pop('t') + input_variables.pop("t") # variables to set the gradients (example Temperature) T = Function(T)(*input_variables) @@ -70,32 +84,35 @@ class Difference(PdeNode): # set equations self.equations = {} - self.equations['difference_' + self.T + '_' + self.S] = T - S + self.equations["difference_" + self.T + "_" + self.S] = T - S self.make_nodes() class Derivative(PdeNode): - def __init__(self, T: Union[str, Symbol, float, int], p: Union[str, Symbol], S: Union[str, Symbol, float, int] = 0., - dim=3, time=True): + def __init__( + self, + T: Union[str, Symbol, float, int], + p: Union[str, Symbol], + S: Union[str, Symbol, float, int] = 0.0, + dim=3, + time=True, + ): super().__init__() self.T = T self.S = S self.dim = dim self.time = time - x, y, z = symbols('x y z') - t = Symbol('t') + x, y, z = symbols("x y z") + t = Symbol("t") - input_variables = {'x': x, - 'y': y, - 'z': z, - 't': t} + input_variables = {"x": x, "y": y, "z": z, "t": t} if self.dim == 1: - input_variables.pop('y') - input_variables.pop('z') + input_variables.pop("y") + input_variables.pop("z") elif self.dim == 2: - input_variables.pop('z') + input_variables.pop("z") if not self.time: - input_variables.pop('t') + input_variables.pop("t") if type(S) is str: S = Function(S)(*input_variables) elif type(S) in [float, int]: @@ -105,9 +122,11 @@ class Derivative(PdeNode): T = Function(T)(*input_variables) self.equations = {} if isinstance(S, Function): - self.equations['derivative_' + self.T + ':' + str(p) + '_' + str(self.S)] = T.diff(p) - S + self.equations[ + "derivative_" + self.T + ":" + str(p) + "_" + str(self.S) + ] = (T.diff(p) - S) else: - self.equations['derivative_' + self.T + ':' + str(p)] = T.diff(p) - S + self.equations["derivative_" + self.T + ":" + str(p)] = T.diff(p) - S self.make_nodes() @@ -115,9 +134,9 @@ class Curl(PdeNode): def __init__(self, vector, curl_name=None): super().__init__() if curl_name is None: - curl_name = ['u', 'v', 'w'] - x, y, z = symbols('x y z') - input_variables = {'x': x, 'y': y, 'z': z} + curl_name = ["u", "v", "w"] + x, y, z = symbols("x y z") + input_variables = {"x": x, "y": y, "z": z} v_0 = vector[0] v_1 = vector[1] @@ -146,11 +165,11 @@ class Curl(PdeNode): class Divergence(PdeNode): - def __init__(self, vector, div_name='div_v'): + def __init__(self, vector, div_name="div_v"): super().__init__() - x, y, z = symbols('x y z') + x, y, z = symbols("x y z") - input_variables = {'x': x, 'y': y, 'z': z} + input_variables = {"x": x, "y": y, "z": z} v_0 = vector[0] v_1 = vector[1] @@ -174,9 +193,13 @@ class Divergence(PdeNode): class ICNode(PdeNode): - def __init__(self, T: Union[str, Symbol, int, float, List[Union[str, Symbol, int, float]]], dim: int = 2, - time: bool = False, - reduce_name: str = None): + def __init__( + self, + T: Union[str, Symbol, int, float, List[Union[str, Symbol, int, float]]], + dim: int = 2, + time: bool = False, + reduce_name: str = None, + ): super().__init__() if reduce_name is None: reduce_name = str(T) @@ -185,28 +208,26 @@ class ICNode(PdeNode): self.time = time self.reduce_name = reduce_name - x, y, z = symbols('x y z') - normal_x = Symbol('normal_x') - normal_y = Symbol('normal_y') - normal_z = Symbol('normal_z') - area = Symbol('area') + x, y, z = symbols("x y z") + normal_x = Symbol("normal_x") + normal_y = Symbol("normal_y") + normal_z = Symbol("normal_z") + area = Symbol("area") - t = Symbol('t') + t = Symbol("t") - input_variables = {'x': x, - 'y': y, - 'z': z, - 't': t} + input_variables = {"x": x, "y": y, "z": z, "t": t} if self.dim == 1: - input_variables.pop('y') - input_variables.pop('z') + input_variables.pop("y") + input_variables.pop("z") elif self.dim == 2: - input_variables.pop('z') + input_variables.pop("z") if not self.time: - input_variables.pop('t') + input_variables.pop("t") - def sympify_T(T: Union[str, Symbol, int, float, List[Union[str, Symbol, int, float]]]) -> Union[ - Symbol, List[Symbol]]: + def sympify_T( + T: Union[str, Symbol, int, float, List[Union[str, Symbol, int, float]]] + ) -> Union[Symbol, List[Symbol]]: if isinstance(T, list): return [sympify_T(_T) for _T in T] elif type(T) is str: @@ -220,23 +241,33 @@ class ICNode(PdeNode): self.equations = {} if isinstance(T, list): if self.dim == 3: - self.equations['integral_' + self.reduce_name] = integral((normal_x * T[0] - + normal_y * T[1] - + normal_z * T[2]) * area) + self.equations["integral_" + self.reduce_name] = integral( + (normal_x * T[0] + normal_y * T[1] + normal_z * T[2]) * area + ) if self.dim == 2: - self.equations['integral_' + self.reduce_name] = integral((normal_x * T[0] - + normal_y * T[1]) * area) + self.equations["integral_" + self.reduce_name] = integral( + (normal_x * T[0] + normal_y * T[1]) * area + ) else: - self.equations['integral_' + self.reduce_name] = integral(T * area) + self.equations["integral_" + self.reduce_name] = integral(T * area) self.make_nodes() class Int1DNode(PdeNode): counter = 0 - def __init__(self, expression, expression_name, lb, ub, var: Union[str, sp.Symbol] = 's', degree=20, **kwargs): + def __init__( + self, + expression, + expression_name, + lb, + ub, + var: Union[str, sp.Symbol] = "s", + degree=20, + **kwargs + ): super().__init__(**kwargs) - x = sp.Symbol('x') + x = sp.Symbol("x") self.equations = {} self.var = sp.Symbol(var) if isinstance(var, str) else var self.degree = degree @@ -265,13 +296,19 @@ class Int1DNode(PdeNode): else: raise - if 'funs' in kwargs.keys(): - self.funs = kwargs['funs'] + if "funs" in kwargs.keys(): + self.funs = kwargs["funs"] else: self.funs = {} - self.computable_name = set(*[fun['output_map'].values() for _, fun in self.funs.items()]) + self.computable_name = set( + *[fun["output_map"].values() for _, fun in self.funs.items()] + ) self.fun_require_input = set( - *[set(fun['eval'].inputs) - set(fun['input_map'].keys()) for _, fun in self.funs.items()]) + *[ + set(fun["eval"].inputs) - set(fun["input_map"].keys()) + for _, fun in self.funs.items() + ] + ) self.make_nodes() @@ -300,13 +337,22 @@ class Int1DNode(PdeNode): self.derivatives = [] self.outputs = [x for x in name_set] - def new_node(self, name: str = None, tf_eq: sp.Expr = None, free_symbols: List[str] = None, *args, **kwargs): + def new_node( + self, + name: str = None, + tf_eq: sp.Expr = None, + free_symbols: List[str] = None, + *args, + **kwargs + ): out_symbols = [x for x in free_symbols if x not in self.funs.keys()] lb_lambda = torch_lambdify(out_symbols, self.lb) ub_lambda = torch_lambdify(out_symbols, self.ub) eq_lambda = torch_lambdify([*free_symbols, self.var.name], tf_eq) node = Node() - node.evaluate = IntEq(self, lb_lambda, ub_lambda, out_symbols, free_symbols, eq_lambda, name) + node.evaluate = IntEq( + self, lb_lambda, ub_lambda, out_symbols, free_symbols, eq_lambda, name + ) node.inputs = [x for x in free_symbols if x not in self.funs.keys()] node.derivatives = [] node.outputs = [name] @@ -315,7 +361,16 @@ class Int1DNode(PdeNode): class IntEq: - def __init__(self, binding_node, lb_lambda, ub_lambda, out_symbols, free_symbols, eq_lambda, name): + def __init__( + self, + binding_node, + lb_lambda, + ub_lambda, + out_symbols, + free_symbols, + eq_lambda, + name, + ): self.binding_node = binding_node self.lb_lambda = lb_lambda self.ub_lambda = ub_lambda @@ -326,8 +381,12 @@ class IntEq: def __call__(self, var: Variables): var = {k: v for k, v in var.items()} - lb_value = self.lb_lambda(**{k: v for k, v in var.items() if k in self.out_symbols}) - ub_value = self.ub_lambda(**{k: v for k, v in var.items() if k in self.out_symbols}) + lb_value = self.lb_lambda( + **{k: v for k, v in var.items() if k in self.out_symbols} + ) + ub_value = self.ub_lambda( + **{k: v for k, v in var.items() if k in self.out_symbols} + ) xx = dict() for syp in self.free_symbols: @@ -347,19 +406,21 @@ class IntEq: new_var = dict() for _, fun in self.binding_node.funs.items(): - input_map = fun['input_map'] - output_map = fun['output_map'] + input_map = fun["input_map"] + output_map = fun["output_map"] tmp_var = dict() for k, v in xx.items(): tmp_var[k] = v for k, v in input_map.items(): tmp_var[k] = quad_s - res = fun['eval'].evaluate(tmp_var) + res = fun["eval"].evaluate(tmp_var) for k, v in output_map.items(): res[v] = res.pop(k) new_var.update(res) xx.update(new_var) - values = quad_w * self.eq_lambda(**dict(**{self.binding_node.var.name: quad_s}, **xx)) + values = quad_w * self.eq_lambda( + **dict(**{self.binding_node.var.name: quad_s}, **xx) + ) values = values.reshape(shape) return {self.name: values.sum(1, keepdim=True)} diff --git a/setup.py b/setup.py index 4878d9b..951f3f4 100644 --- a/setup.py +++ b/setup.py @@ -19,4 +19,4 @@ setuptools.setup( "Operating System :: OS Independent", ], python_requires='>=3.6', -) \ No newline at end of file +)