Source code for utils

import numpy as np
import ezdxf
import trimesh
import alphashape
from shapely.geometry import Point
from scipy.spatial import Delaunay, cKDTree
from shapely.geometry import Polygon
from scipy.interpolate import griddata
import os
import pandas as pd
import demo_utils


[docs] def generate_random_field( x_range, y_range, nx, ny, lx, ly, sigma2, mean_value, seed=None): """Generate a random field with anisotropic exponential correlation. Args: x_range: Range of x coordinates (x_min, x_max). y_range: Range of y coordinates (y_min, y_max). nx: Number of grid points in x direction. ny: Number of grid points in y direction. lx: Correlation length in x direction. ly: Correlation length in y direction. sigma2: Variance of the random field. mean_value: Mean value of the random field. seed: Random seed for reproducibility. Returns: field_2D: 2D array containing the random field values. x: Array of x coordinates. y: Array of y coordinates. """ # Set random seed if provided if seed is not None: np.random.seed(seed) # Create spatial grid with custom ranges x = np.linspace(x_range[0], x_range[1], nx) y = np.linspace(y_range[0], y_range[1], ny) gx, gy = np.meshgrid(x, y, indexing='ij') points = np.vstack([gx.ravel(), gy.ravel()]).T # Flattened grid points # Construct covariance matrix (vectorized implementation) num_points = points.shape[0] # Compute all pairwise distances at once (more efficient) dx = np.subtract.outer(points[:, 0], points[:, 0]) * (2/lx) dy = np.subtract.outer(points[:, 1], points[:, 1]) * (2/ly) r = np.sqrt(dx**2 + dy**2) # Compute covariance matrix C = sigma2 * np.exp(-r) try: # Perform Cholesky decomposition with small noise for numerical stability L = np.linalg.cholesky(C + 1e-6 * np.eye(num_points)) # Generate random field z = np.random.randn(num_points) # Standard normal samples field = L @ z # Apply Cholesky factor # Add mean value to the field field += mean_value # Reshape to 2D grid field_2D = field.reshape(nx, ny) return field_2D, x, y except np.linalg.LinAlgError: raise ValueError("Cholesky decomposition failed. Try increasing the regularization parameter.")
[docs] def save_script(current_script_path, save_path): # Read the content of the specified script with open(current_script_path, 'r') as script_file: script_content = script_file.read() # Save the content to the specified path with open(save_path, 'w') as save_file: save_file.write(script_content)
[docs] def get_z_coordinates( mesh: trimesh.Trimesh, xy_coords: np.array, find_method: str ): """ Get the z-coordinate values on the mesh that correspond to given (x, y) coordinates. Parameters: - mesh_path: trimesh object - xy_coords: np.array of shape (n, 2), array of (x, y) coordinates to get z-coords - find_method: 'kdtree', 'linear' Returns: - np.array of shape (n,), array of z-coordinate values. """ # Extract the vertices of the mesh vertices = mesh.vertices # Create a KDTree for fast nearest-neighbor lookup xy_vertices = vertices[:, :2] z_vertices = vertices[:, 2] if find_method == "kdtree": kdtree = cKDTree(xy_vertices) # Find the nearest vertex in the mesh for each (x, y) coordinate distances, vertex_indices = kdtree.query(xy_coords) # Get the z-values for the corresponding vertices z_values = vertices[vertex_indices, 2] elif find_method == "linear": # Use griddata to interpolate the z-values for the given (x, y) coordinates z_values = griddata( xy_vertices, z_vertices, xy_coords, method='linear') else: raise ValueError return z_values
[docs] def fill_particles_between_mesh( lower_mesh: trimesh.Trimesh, upper_mesh: trimesh.Trimesh, cell_size: list, n_particles_per_dim: int, z_find_method: str, base_find_method: str, z_fill_method: str = 'simple', initial_alpha: float = 0.1, alpha_decay: float = 1 ): """ Args: base_find_method (str): method to find the base of the area where two surfaces overlap ('alphashape' or 'simple') z_find_method (str): method to find z-coordinate of mesh z_fill_method (str): method to fill between lower and upper z-coordinates lower_mesh (trimesh.Trimesh): upper_mesh (trimesh.Trimesh): cell_size (list): initial_alpha (float): = 0.1 alpha_decay (float): n_particles_per_dim (int): Returns: """ # Error handling # if not all(i == cell_size[0] for i in cell_size): # raise NotImplementedError("Current version only support the same element size for all dimension") # Define spacings particle_distance = cell_size[0] / n_particles_per_dim particle_offset_distance = particle_distance / 2 # Project the topography mesh to xy plain projected_vertices = upper_mesh.vertices[:, :2] # Define candidate base points # define min anx max grid coord # min_grid_values = np.floor(np.min(projected_vertices, axis=0) / cell_size[0]) * cell_size[0] # max_grid_values = np.ceil(np.max(projected_vertices, axis=0) / cell_size[0]) * cell_size[0] # define min and max particle coord min_particle_values = np.min(projected_vertices, axis=0) + particle_offset_distance max_particle_values = np.max(projected_vertices, axis=0) - particle_offset_distance # define range candidate_point_range = list(zip(min_particle_values, max_particle_values)) rounded_array_processed = [] for x, y in candidate_point_range: x_rounded = round((x - particle_offset_distance) / particle_distance) * particle_distance + particle_offset_distance y_rounded = round((y - particle_offset_distance) / particle_distance) * particle_distance + particle_offset_distance rounded_array_processed.append((x_rounded, y_rounded)) # Generate candidate base particle points candidate_points = generate_points( rounded_array_processed, distance=particle_distance) if base_find_method == "alphashape": # TODO: add visualization for perimeter check # Create a 2D polygon encompassing the projected vertices max_iterations = 100 for i in range(max_iterations): alpha = initial_alpha / (alpha_decay * (i + 1)) alpha_shape = alphashape.alphashape(projected_vertices, alpha=alpha) if isinstance(alpha_shape, Polygon): break else: raise ValueError("Could not find a Polygon for projected base in alpha shape") # Check which grid points fall inside the projected polygon inside_check = [alpha_shape.contains(Point(point)) for point in candidate_points] base_particles = candidate_points[inside_check] elif base_find_method == "simple": base_particles = candidate_points else: raise ValueError(f"Check `base_find_method`") # Define lower and upper z bounds and populate particles inbetween. lower_z = get_z_coordinates( lower_mesh, base_particles, z_find_method) upper_z = get_z_coordinates( upper_mesh, base_particles, z_find_method) criteria = upper_z - lower_z lower_z_processed = np.where(criteria <= 0, 0, lower_z) upper_z_processed = np.where(criteria <= 0, 0, upper_z) z_ranges = np.column_stack( (lower_z_processed, upper_z_processed)) material_points = fill_particles_inbetween( base_particles, z_ranges, particle_distance, method=z_fill_method) # ---------------------------------------- # # Populate particles between two surfaces # lower_z = get_z_coordinates( # lower_mesh, base_particles, z_find_method) # lower_zgrid = np.floor(lower_z / cell_size[0]) * cell_size[0] # lower_zparticle = lower_zgrid + particle_offset_distance # # upper_z = get_z_coordinates( # upper_mesh, base_particles, z_find_method) # upper_zgrid = np.round(upper_z / cell_size[0]) * cell_size[0] # upper_zparticle = upper_zgrid - particle_offset_distance # # criteria = upper_zgrid - lower_zgrid # # lower_zgrid_processed = np.where(criteria <= 0, 0, lower_zgrid) # upper_zgrid_processed = np.where(criteria <= 0, 0, upper_zgrid) # zgrid_ranges = np.column_stack( # (lower_zgrid_processed, upper_zgrid_processed)) # # material_points = fill_particles_inbetween( # base_particles, zgrid_ranges, particle_distance) # ---------------------------------------- return material_points
[docs] def fill_particles_inbetween( xy_coords: np.array, z_ranges: np.array, particle_distance: float, method: str = "simple" ): """ Args: xy_coords (np.array): shape=(n_coords, 2) z_ranges (np.array): shape=(n_xy_coords, 2) where z_ranges[:, 0] is the lower bound & z_ranges[:, 1] is upper bound cell_size (list): [x_len, y_len, z_len] particle_distance (float): default distance between particles method (str): method to fill particles between z_ranges: "simple", "round" If `simple`, it uses the exact z-coordinate of mesh. If `round`, it uses the nearest particle grid points for particle generation. Returns: """ # particle_distance_offset particle_offset_distance = particle_distance / 2 # Initialize an empty list to store particles particles_list = [] for i in range(len(xy_coords)): if method == "simple": lower_z = z_ranges[i, 0] upper_z = z_ranges[i, 1] inside_z_points = np.arange(lower_z + particle_offset_distance, upper_z, particle_distance) elif method == "round": lower_z = np.round( (z_ranges[i, 0] - particle_offset_distance) / particle_distance) * particle_distance + particle_offset_distance upper_z = np.round( (z_ranges[i, 1] - particle_offset_distance) / particle_distance) * particle_distance + particle_offset_distance inside_z_points = np.arange(lower_z, upper_z, particle_distance) else: raise ValueError(f"Method `{method}` is not valid option") num_z_points = len(inside_z_points) # Repeat the xy coordinates for the number of z values xy_repeated = np.repeat(xy_coords[i].reshape(1, 2), num_z_points, axis=0) # Stack the xy coordinates and z values particles = np.column_stack((xy_repeated, inside_z_points)) # Append to the list particles_list.append(particles) # TODO: potential improvements # - If `lower_z` is 0, don't concatenate # - Only concatenate `upper_z` if the distance between the last element of `main_particles` exceeds a certain threshold # ---------------------------------------- # # Create z values for the current point # z_values = np.arange( # z_ranges[i, 0] + distance/2, # z_ranges[i, 1] - distance/2, # distance) # num_z_values = len(z_values) # # # Repeat the xy coordinates for the number of z values # xy_repeated = np.repeat(xy_coords[i].reshape(1, 2), num_z_values, axis=0) # # # Stack the xy coordinates and z values # particles = np.column_stack((xy_repeated, z_values)) # # # Append to the list # particles_list.append(particles) # ---------------------------------------- # Concatenate all particles into a single array all_particles = np.vstack(particles_list) return all_particles
[docs] def generate_points( ranges: list, distance: float): """ Args: ranges (list): [[x_min, x_max], [y_min, y_max] [z_min, z_max]] distance (float): round_unit (float): Returns: coordinate of points with shape=(n_points, n_dims) """ n_dims = len(ranges) if n_dims not in [2, 3]: raise ValueError("The function only supports 2D and 3D ranges.") coordinates = [] for r in ranges: min_val, max_val = r coords = np.arange(min_val, max_val + distance, distance) coordinates.append(coords) points = np.array(np.meshgrid(*coordinates)).T.reshape(-1, n_dims) return points
[docs] def obj2mesh(file_path): vertices = [] faces = [] with open(file_path, 'r') as file: for line in file: parts = line.split() if not parts: continue if parts[0] == 'v': # Convert vertex coordinates to float and add to the list vertex = list(map(float, parts[1:])) vertices.append(vertex) elif parts[0] == 'f': # Convert face indices to integer and correct for 0-based indexing face = [int(index) - 1 for index in parts[1:]] faces.append(face) # Convert lists to numpy arrays vertices_array = np.array(vertices) faces_array = np.array(faces, dtype=int) return vertices_array, faces_array
# Extracting points from various entities in the DXF file
[docs] def dxf2points(dxf_file): # Read the DXF file dxf_file = 'topo_surface_Fundao.dxf' doc = ezdxf.readfile(dxf_file) points = [] # Checking for different entity types and extracting points for entity in doc.modelspace().query('POLYLINE VERTEX POINT 3DFACE'): if entity.dxftype() == 'POLYLINE': # print("polyline") for vertex in entity.vertices: points.append([vertex.dxf.location.x, vertex.dxf.location.y, vertex.dxf.location.z]) elif entity.dxftype() == 'VERTEX': # print("vertex") points.append([entity.dxf.location.x, entity.dxf.location.y, entity.dxf.location.z]) elif entity.dxftype() == 'POINT': # print("point") points.append([entity.dxf.location.x, entity.dxf.location.y, entity.dxf.location.z]) elif entity.dxftype() == '3DFACE': # print("3dface") points.append([entity.dxf.vtx0.x, entity.dxf.vtx0.y, entity.dxf.vtx0.z]) points.append([entity.dxf.vtx1.x, entity.dxf.vtx1.y, entity.dxf.vtx1.z]) points.append([entity.dxf.vtx2.x, entity.dxf.vtx2.y, entity.dxf.vtx2.z]) points.append([entity.dxf.vtx3.x, entity.dxf.vtx3.y, entity.dxf.vtx3.z]) return np.array(points)
[docs] def get_h5(uuid_dir, timestep, n_mpis): # Create an empty list to store DataFrames dfs = [] # Iterate over different files from MPI and append to list for i in range(n_mpis): file = f'particles-{i}_{n_mpis}-{timestep}.h5' # ex) particles-26_32-0120000.h5 h5_path = os.path.join(uuid_dir, file) dfs.append(pd.read_hdf(h5_path, 'table')) # Concatenate all DataFrames df = pd.concat(dfs, ignore_index=True) return df