"""Compute and write Sparameters using Meep."""
from __future__ import annotations
import inspect
import multiprocessing
import pathlib
import time
from functools import partial
from pathlib import Path
from typing import Any
import gdsfactory as gf
import meep as mp
import numpy as np
import yaml
from gdsfactory import logger
from gdsfactory.component import Component
from gdsfactory.pdk import get_layer_stack
from gdsfactory.serialization import clean_value_json
from gdsfactory.technology import LayerStack
from gdsfactory.typings import ComponentSpec, PathType, Port, PortSymmetries
from tqdm.auto import tqdm
from gplugins.common.utils import port_symmetries
from gplugins.common.utils.get_sparameters_path import (
get_sparameters_path_meep as get_sparameters_path,
)
from gplugins.gmeep.get_simulation import (
get_simulation,
settings_get_simulation,
)
core_materials = multiprocessing.cpu_count()
def remove_simulation_kwargs(d: dict[str, Any]) -> dict[str, Any]:
"""Returns a copy of dict with only simulation settings.
removes all flags for the simulator itself
"""
d = d.copy()
d.pop("run", None)
d.pop("lazy_parallelism", None)
d.pop("overwrite", None)
d.pop("animate", None)
d.pop("wait_to_finish", None)
d.pop("cores", None)
d.pop("temp_dir", None)
d.pop("temp_file_str", None)
return d
def parse_port_eigenmode_coeff(
port_name: str, ports: dict[str, Port], sim_dict: dict, port_mode: int = 0
):
"""Returns the coefficients relative to whether the wavevector is entering or \
exiting simulation.
Args:
port_index: index of port.
ports: component_ref.ports.
sim_dict: simulation dict.
"""
if port_name not in ports:
port_names = [port.name for port in ports]
raise ValueError(f"port = {port_name!r} not in {port_names}.")
orientation = ports[port_name].orientation
# Inputs
sim = sim_dict["sim"]
monitors = sim_dict["monitors"]
# get_eigenmode_coeff.alpha[:,:,idx]
# with ind being the forward or backward wave according to cell coordinates.
# Figure out if that is exiting the simulation or not
# depending on the port orientation (assuming it's near PMLs)
if orientation == 0: # east
kpoint = mp.Vector3(x=1)
idx_in = 1
idx_out = 0
elif orientation == 90: # north
kpoint = mp.Vector3(y=1)
idx_in = 1
idx_out = 0
elif orientation == 180: # west
kpoint = mp.Vector3(x=1)
idx_in = 0
idx_out = 1
elif orientation == 270: # south
kpoint = mp.Vector3(y=1)
idx_in = 0
idx_out = 1
else:
raise ValueError(
f"Port orientation {orientation!r} not in 0, 90, 180, or 270 degrees!"
)
# Get port coeffs
monitor_coeff = sim.get_eigenmode_coefficients(
monitors[port_name], [port_mode + 1], kpoint_func=lambda f, n: kpoint
)
coeff_in = monitor_coeff.alpha[
0, :, idx_in
] # ingoing (w.r.t. simulation cell) wave
coeff_out = monitor_coeff.alpha[
0, :, idx_out
] # outgoing (w.r.t. simulation cell) wave
return coeff_in, coeff_out
[docs]
def write_sparameters_meep(
component: ComponentSpec,
port_source_names: list[str] | None = None,
port_source_modes: dict[str, list] | None = None,
port_modes: list[int] | None = None,
port_symmetries: PortSymmetries | None = None,
resolution: int = 30,
wavelength_start: float = 1.5,
wavelength_stop: float = 1.6,
wavelength_points: int = 50,
dirpath: PathType | None = None,
layer_stack: LayerStack | None = None,
port_margin: float = 2,
port_monitor_offset: float = -0.1,
port_source_offset: float = -0.1,
filepath: Path | None = None,
overwrite: bool = False,
animate: bool = False,
animate_center: tuple[float, float, float] | None = None,
animate_size: tuple[float, float, float] | None = None,
lazy_parallelism: bool = False,
run: bool = True,
dispersive: bool = False,
xmargin: float = 0,
ymargin: float = 0,
zmargin: float = 0,
xmargin_left: float = 0,
xmargin_right: float = 0,
ymargin_top: float = 0,
ymargin_bot: float = 0,
zmargin_top: float = 0,
zmargin_bot: float = 0,
decay_by: float = 1e-3,
is_3d: bool = False,
z: float = 0,
plot_args: dict | None = None,
only_return_filepath_sim_settings=False,
verbosity: int = 0,
**settings,
) -> dict[str, np.ndarray]:
r"""Returns Sparameters and writes them to npz filepath.
Simulates each time using a different input port (by default, all of them)
unless you specify port_symmetries:
.. code::
port_symmetries_crossing = {
"o1@0,o1@0": ["o2@0,o2@0", "o3@0,o3@0", "o4@0,o4@0"],
"o2@0,o1@0": ["o1@0,o2@0", "o3@0,o4@0", "o4@0,o3@0"],
"o3@0,o1@0": ["o1@0,o3@0", "o2@0,o4@0", "o4@0,o2@0"],
"o4@0,o1@0": ["o1@0,o4@0", "o2@0,o3@0", "o3@0,o2@0"],
}
- Only simulations using the outer key port names will be run
- The associated value is another dict whose keys are the S-parameters computed
when this source is active
- The values of this inner Dict are lists of s-parameters whose values are copied
.. code::
top view
________________________________
| |
| xmargin_left | port_extension
|<---------> port_margin ||<-->
o2_|___________ _________||_o3
| \ / |
| \ / |
| ====== |
| / \ |
o1_|___________/ \__________|_o4
| | <-------->|
| |ymargin_bot xmargin_right|
| | |
|___|___________________________|
side view
________________________________
| | |
| | |
| zmargin_top |
|xmargin_left | |
|<---> _____ _|___ |
| | | | | |
| | | | | |
| |_____| |_____| |
| | |
| | |
| |zmargin_bot |
| | |
|_______|_______________________|
Args:
component: to simulate.
resolution: in pixels/um (30: for coarse, 100: for fine).
port_source_names: list of ports to excite. Defaults to all.
port_symmetries: Dict to specify port symmetries, to save number of simulations.
dirpath: directory to store Sparameters.
layer_stack: contains layer to thickness, zmin and material.
Defaults to active pdk.layer_stack.
port_margin: margin on each side of the port.
port_monitor_offset: offset between Component and monitor port in um.
port_source_offset: offset between Component and source port in um.
filepath: to store pandas Dataframe with Sparameters in npz format.
Defaults to dirpath/component_.npz.
overwrite: overwrites stored Sparameter npz results.
animate: saves a MP4 images of the simulation for inspection, and also
outputs during computation. The name of the file is the source index.
lazy_parallelism: toggles the flag "meep.divide_parallel_processes" to
perform the simulations with different sources in parallel.
By default MPI just runs the same copy of the Python script everywhere,
with the C++ under MEEP actually being parallelized.
divide_parallel_processes allows us to logically split this one calculation
into (in this case "cores") subdivisions.
The only difference in the scripts is that a different integer n
is returned depending on the subdivision it is running in.
So we use that n to select different sources, and each subdivision calculates
its own Sparams independently. Afterwards, we collect all
results in one of the subdivisions (if rank == 0).
run: runs simulation, if False, only plots simulation.
dispersive: use dispersive models for materials (requires higher resolution).
xmargin: left and right distance from component to PML.
xmargin_left: west distance from component to PML.
xmargin_right: east distance from component to PML.
ymargin: top and bottom distance from component to PML.
ymargin_top: north distance from component to PML.
ymargin_bot: south distance from component to PML.
zmargin_top: +z distance from component to PML.
zmargin_bot: -z distance from component to PML.
is_3d: if True runs in 3D (much slower).
z: for 2D plot.
plot_args: if animate or not run, customization keyword arguments passed to
`plot2D()` (i.e. `labels`, `eps_parameters`, `boundary_parameters`, `field_parameters`, etc.)
keyword Args:
extend_ports_length: to extend ports beyond the PML (um).
zmargin_top: thickness for cladding above core (um).
zmargin_bot: thickness for cladding below core (um).
tpml: PML thickness (um).
clad_material: material for cladding.
wavelength_start: wavelength min (um).
wavelength_stop: wavelength max (um).
wavelength_points: wavelength steps.
dfcen: delta frequency.
port_source_name: input port name.
port_margin: margin on each side of the port (um).
distance_source_to_monitors: in (um).
port_source_offset: offset between source Component port and source MEEP port.
port_monitor_offset: offset between Component and MEEP port monitor.
material_name_to_meep: map layer_stack names with meep material database name
or refractive index. dispersive materials have a wavelength dependent index.
Returns:
sparameters in a Dict (wavelengths, s11a, o1@0,o2@0, ...)
where `a` is the angle in radians and `m` the module.
"""
component = (
component if isinstance(component, Component) else gf.get_component(component)
)
layer_stack = layer_stack or get_layer_stack()
plot_args = plot_args or {}
for setting in settings:
if setting not in settings_get_simulation:
raise ValueError(f"{setting!r} not in {settings_get_simulation}")
port_symmetries = port_symmetries or {}
xmargin_left = xmargin_left or xmargin
xmargin_right = xmargin_right or xmargin
ymargin_top = ymargin_top or ymargin
ymargin_bot = ymargin_bot or ymargin
zmargin_top = zmargin_top or zmargin
zmargin_bot = zmargin_bot or zmargin
sim_settings = dict(
resolution=resolution,
port_symmetries=port_symmetries,
wavelength_start=wavelength_start,
wavelength_stop=wavelength_stop,
wavelength_points=wavelength_points,
port_margin=port_margin,
port_monitor_offset=port_monitor_offset,
port_source_offset=port_source_offset,
dispersive=dispersive,
zmargin_top=zmargin_top,
zmargin_bot=zmargin_bot,
ymargin_top=ymargin_top,
ymargin_bot=ymargin_bot,
xmargin_left=xmargin_left,
xmargin_right=xmargin_right,
is_3d=is_3d,
**settings,
)
filepath = filepath or get_sparameters_path(
component=component,
dirpath=dirpath,
layer_stack=layer_stack,
**sim_settings,
)
sim_settings = sim_settings.copy()
sim_settings["layer_stack"] = layer_stack.to_dict()
sim_settings["component"] = component.to_dict()
filepath = pathlib.Path(filepath)
filepath_sim_settings = filepath.with_suffix(".yml")
# FIXME: Ideally, we should split sim settings generation from doing the
# simulation... this is a hack.
if only_return_filepath_sim_settings:
return filepath_sim_settings
# filepath_sim_settings.write_text(OmegaConf.to_yaml(sim_settings))
# logger.info(f"Write simulation settings to {filepath_sim_settings!r}")
# return filepath_sim_settings
component = gf.add_padding_container(
component,
default=0,
top=ymargin_top,
bottom=ymargin_bot,
left=xmargin_left,
right=xmargin_right,
)
dummy = Component()
component_ref = dummy << component
ports = component_ref.ports
port_names = [port.name for port in ports]
port_source_names = port_source_names or port_names
port_source_modes = port_source_modes or {key: [0] for key in port_source_names}
port_modes = port_modes or [0]
num_sims = len(port_source_names) - len(port_symmetries)
# set verbosity
mp.verbosity(verbosity)
if not run:
sim_dict = get_simulation(
component=component,
wavelength_start=wavelength_start,
wavelength_stop=wavelength_stop,
wavelength_points=wavelength_points,
layer_stack=layer_stack,
port_source_name=port_source_names[0],
port_margin=port_margin,
port_monitor_offset=port_monitor_offset,
port_source_offset=port_source_offset,
dispersive=dispersive,
is_3d=is_3d,
resolution=resolution,
zmargin_top=zmargin_top,
zmargin_bot=zmargin_bot,
**settings,
)
sim = sim_dict["sim"]
if is_3d:
sim.plot2D(
output_plane=mp.Volume(
size=mp.Vector3(sim.cell_size.x, sim.cell_size.y, 0),
center=mp.Vector3(0, 0, z),
),
**plot_args,
)
else:
sim.plot2D(plot_eps_flag=True, **plot_args)
return sim
if filepath.exists():
if not overwrite:
logger.info(f"Simulation loaded from {filepath!r}")
return dict(np.load(filepath))
elif overwrite:
filepath.unlink()
sp = {} # Sparameters dict
start = time.time()
def sparameter_calculation(
port_source_name: str,
component: Component,
port_symmetries: PortSymmetries | None = port_symmetries,
port_names: list[str] = port_names,
port_source_mode: int = 0,
wavelength_start: float = wavelength_start,
wavelength_stop: float = wavelength_stop,
wavelength_points: int = wavelength_points,
animate: bool = animate,
animate_center: tuple[float, float, float] = animate_center,
animate_size: tuple[float, float, float] = animate_size,
plot_args: dict = plot_args,
dispersive: bool = dispersive,
decay_by: float = decay_by,
**settings,
) -> dict:
"""Return Sparameter dict."""
sim_dict = get_simulation(
component=component,
port_source_name=port_source_name,
port_source_mode=port_source_mode,
resolution=resolution,
wavelength_start=wavelength_start,
wavelength_stop=wavelength_stop,
wavelength_points=wavelength_points,
port_margin=port_margin,
port_monitor_offset=port_monitor_offset,
port_source_offset=port_source_offset,
dispersive=dispersive,
layer_stack=layer_stack,
is_3d=is_3d,
**settings,
)
sim = sim_dict["sim"]
# freqs = sim_dict["freqs"]
# wavelengths = 1 / freqs
# print(sim.resolution)
# Terminate when the area in the whole area decayed
termination = [mp.stop_when_energy_decayed(dt=50, decay_by=decay_by)]
if animate:
# Defaults for animation
if "field_parameters" not in plot_args:
plot_args["field_parameters"] = {
"alpha": 0.8,
"cmap": "RdBu",
"interpolation": "none",
}
if "eps_parameters" not in plot_args:
plot_args["eps_parameters"] = {"contour": True}
if "fields" not in plot_args:
if is_3d:
plot_args["fields"] = mp.Hz
else:
plot_args["fields"] = mp.Ez
if "realtime" not in plot_args:
plot_args["realtime"] = True
if "normalize" not in plot_args:
plot_args["normalize"] = True
sim.use_output_directory()
if is_3d:
animate = mp.Animate2D(
sim,
output_plane=mp.Volume(
center=mp.Vector3(*animate_center),
size=mp.Vector3(*animate_size),
),
**plot_args,
)
else:
animate = mp.Animate2D(
sim,
**plot_args,
)
sim.run(mp.at_every(1, animate), until_after_sources=termination)
animate.to_mp4(
30, f"{component.name}_{port_source_name}_{port_source_mode}.mp4"
)
else:
sim.run(until_after_sources=termination)
# Calculate mode overlaps
# Get source monitor results
source_entering, _ = parse_port_eigenmode_coeff(
port_source_name, component.ports, sim_dict, port_mode=port_source_mode
)
# Get coefficients
for port_name in port_names:
for port_mode in port_modes:
_, monitor_exiting = parse_port_eigenmode_coeff(
port_name,
component.ports,
sim_dict,
port_mode=port_mode,
)
key = f"{port_name}@{port_mode},{port_source_name}@{port_source_mode}"
sp[key] = monitor_exiting / source_entering
if bool(port_symmetries):
for key, symmetries in port_symmetries.items():
for sym in symmetries:
if key in sp:
sp[sym] = sp[key]
return sp
if lazy_parallelism: # TODO: FIX Port modes
from mpi4py import MPI
cores = min([num_sims, multiprocessing.cpu_count()])
n = mp.divide_parallel_processes(cores)
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
# Map port names to integers
port_source_dict = {}
for number, name in enumerate(port_source_names):
port_source_dict[number] = name
sp = sparameter_calculation(
port_source_name=port_source_dict[n],
component=component,
port_symmetries=port_symmetries,
wavelength_start=wavelength_start,
wavelength_stop=wavelength_stop,
wavelength_points=wavelength_points,
animate=animate,
port_names=port_names,
**settings,
)
# Synchronize dicts
if rank == 0:
for i in range(1, size):
data = comm.recv(source=i, tag=11)
sp.update(data)
sp["wavelengths"] = np.linspace(
wavelength_start, wavelength_stop, wavelength_points
)
np.savez_compressed(filepath, **sp)
logger.info(f"Write simulation results to {filepath!r}")
filepath_sim_settings.write_text(yaml.dump(clean_value_json(sim_settings)))
logger.info(f"Write simulation settings to {filepath_sim_settings!r}")
return sp
else:
comm.send(sp, dest=0, tag=11)
else:
for port_source_name in tqdm(port_source_names):
for port_source_mode in port_source_modes[port_source_name]:
sp.update(
sparameter_calculation(
port_source_name,
port_source_mode=port_source_mode,
component=component,
port_symmetries=port_symmetries,
wavelength_start=wavelength_start,
wavelength_stop=wavelength_stop,
wavelength_points=wavelength_points,
animate=animate,
port_names=port_names,
**settings,
)
)
sp["wavelengths"] = np.linspace(
wavelength_start, wavelength_stop, wavelength_points
)
np.savez_compressed(filepath, **sp)
end = time.time()
sim_settings.update(compute_time_seconds=end - start)
sim_settings.update(compute_time_minutes=(end - start) / 60)
logger.info(f"Write simulation results to {filepath!r}")
filepath_sim_settings.write_text(yaml.dump(sim_settings))
logger.info(f"Write simulation settings to {filepath_sim_settings!r}")
return sp
write_sparameters_meep_1x1 = partial(
write_sparameters_meep, port_symmetries=port_symmetries.port_symmetries_1x1
)
write_sparameters_meep_1x1_bend90 = partial(
write_sparameters_meep,
ymargin=0,
ymargin_bot=3,
xmargin_right=3,
port_symmetries=port_symmetries.port_symmetries_1x1,
)
sig = inspect.signature(write_sparameters_meep)
settings_write_sparameters_meep = set(sig.parameters.keys()).union(
settings_get_simulation
)
if __name__ == "__main__":
wavelength_start = 1.26
wavelength_stop = 1.36
sim_settings = dict(
wavelength_start=wavelength_start, wavelength_stop=wavelength_stop
)
# c = gf.components.mmi1x2(cross_section=gf.cross_section.strip)
c = gf.components.straight(length=2)
import matplotlib.pyplot as plt
def func(x):
result = np.where(np.abs(x) > 1e-10, np.abs(x) ** 2, -10)
return np.log10(result, out=result, where=result > 0)
sp = write_sparameters_meep(
c,
run=True,
animate=True,
is_3d=False,
plot_args={
"eps_parameters": {"contour": True},
"field_parameters": {
"alpha": 0.8,
"cmap": "RdBu",
"interpolation": "none",
"post_process": func,
},
"realtime": False,
},
overwrite=True,
**sim_settings,
)
plt.show()
# from gplugins.add_simulation_markers import add_simulation_markers
# import gplugins as sim
# c = gf.components.straight(length=2)
# c = gf.components.bend_euler(radius=3)
# c = add_simulation_markers(c)
# sp = write_sparameters_meep_1x1(c, run=True, is_3d=False)
# sim.plot.plot_sparameters(sp)
# import matplotlib.pyplot as plt
# plt.show()