Source code for gdsfactory.routing.route_bundle

"""Routes bundles of ports (river routing).

get bundle is the generic river routing function
route_bundle calls different function depending on the port orientation.

 - route_bundle_same_axis: ports facing each other with arbitrary pitch on each side
 - route_bundle_corner: 90Deg / 270Deg between ports with arbitrary pitch
 - route_bundle_udirect: ports with direct U-turns
 - route_bundle_uindirect: ports with indirect U-turns

"""

from __future__ import annotations

import warnings
from collections.abc import Sequence
from functools import partial
from typing import Any, Literal, cast
from warnings import warn

import kfactory as kf
from kfactory.routing.generic import ManhattanRoute
from kfactory.routing.optical import PathLengthConfig

import gdsfactory as gf
from gdsfactory.config import CONF
from gdsfactory.routing.auto_taper import add_auto_tapers
from gdsfactory.routing.resolve_pins import resolve_pins
from gdsfactory.routing.sort_ports import get_port_x, get_port_y
from gdsfactory.typings import (
    STEP_DIRECTIVES,
    ComponentSpec,
    Coordinates,
    CrossSectionSpec,
    LayerSpec,
    LayerSpecs,
    LayerTransitions,
    Pin,
    Port,
    Ports,
    Step,
)

OpticalManhattanRoute = ManhattanRoute

TOLERANCE = 1


def get_min_spacing(
    ports1: Ports,
    ports2: Ports,
    separation: float = 5.0,
    radius: float = 5.0,
    sort_ports: bool = True,
) -> float:
    """Returns the minimum amount of spacing in um required to create a fanout.

    Args:
        ports1: first list of ports.
        ports2: second list of ports.
        separation: minimum separation between two straights in um.
        radius: bend radius in um.
        sort_ports: sort the ports according to the axis.

    """
    if not ports1 or not ports2:
        raise ValueError("ports1 and ports2 must be non-empty")
    if len(ports1) != len(ports2):
        raise ValueError(f"ports1={len(ports1)} and ports2={len(ports2)} must be equal")

    axis = "X" if ports1[0].orientation in [0, 180] else "Y"
    j = 0
    min_j = 0
    max_j = 0
    if sort_ports:
        if axis in {"X", "x"}:
            ports1 = sorted(ports1, key=get_port_y)
            ports2 = sorted(ports2, key=get_port_y)
        else:
            ports1 = sorted(ports1, key=get_port_x)
            ports2 = sorted(ports2, key=get_port_x)

    for port1, port2 in zip(ports1, ports2, strict=False):
        if axis in {"X", "x"}:
            x1 = get_port_y(port1)
            x2 = get_port_y(port2)
        else:
            x1 = get_port_x(port1)
            x2 = get_port_x(port2)
        if x2 >= x1:
            j += 1
        else:
            j -= 1
        if j < min_j:
            min_j = j
        if j > max_j:
            max_j = j
    return (max_j - min_j) * separation + 2 * radius + 1.0


def _ensure_manhattan_waypoints(
    waypoints: list[kf.kdb.DPoint],
    start_port: gf.Port | None = None,
) -> list[kf.kdb.DPoint]:
    """Insert corner points between non-Manhattan waypoints to make the path Manhattan.

    For each pair of consecutive waypoints that are not axis-aligned,
    an intermediate corner point is inserted so all segments are
    either purely horizontal or purely vertical.

    Args:
        waypoints: list of waypoints that may contain non-Manhattan segments.
        start_port: optional start port to determine initial routing direction.

    Returns:
        list of waypoints with corner points inserted where needed.
    """
    if len(waypoints) < 2:
        return list(waypoints)

    tol = 1e-3
    result = [waypoints[0]]

    for i in range(1, len(waypoints)):
        prev = result[-1]
        curr = waypoints[i]

        dx = abs(curr.x - prev.x)
        dy = abs(curr.y - prev.y)

        if dx < tol or dy < tol:
            result.append(curr)
            continue

        # Non-Manhattan segment - insert a corner point
        if len(result) >= 2:
            prev_prev = result[-2]
            last_horizontal = abs(prev.x - prev_prev.x) > abs(prev.y - prev_prev.y)
            go_horizontal_first = not last_horizontal
        elif start_port is not None and start_port.orientation is not None:
            go_horizontal_first = int(start_port.orientation) % 360 in {0, 180}
        else:
            go_horizontal_first = True

        if go_horizontal_first:
            result.append(kf.kdb.DPoint(curr.x, prev.y))
        else:
            result.append(kf.kdb.DPoint(prev.x, curr.y))

        result.append(curr)

    return result


[docs] def route_bundle( component: gf.Component, ports1: Port | Ports | list[Pin] | None = None, ports2: Port | Ports | list[Pin] | None = None, cross_section: CrossSectionSpec | None = None, layer: LayerSpec | None = None, separation: float = 3.0, bend: ComponentSpec = "bend_euler", sort_ports: bool = False, start_straight_length: float = 0, end_straight_length: float = 0, min_straight_taper: float = 100, taper: ComponentSpec | None = None, port_type: str | None = None, collision_check_layers: LayerSpecs | None = None, on_collision: Literal["error", "show_error", "warning"] | None = None, on_placer_error: Literal["error", "show_error", "warning"] | None = None, bboxes: Sequence[kf.kdb.DBox] | None = None, allow_width_mismatch: bool | None = None, allow_layer_mismatch: bool | None = None, allow_type_mismatch: bool | None = None, radius: float | None = None, route_width: float | None = None, straight: ComponentSpec = "straight", sbend: ComponentSpec | None = None, auto_taper: bool = True, auto_taper_taper: ComponentSpec | None = None, waypoints: Coordinates | Sequence[gf.kdb.DPoint] | None = None, steps: Sequence[Step] | None = None, start_angles: float | list[float] | None = None, end_angles: float | list[float] | None = None, router: Literal["optical", "electrical"] | None = None, layer_transitions: LayerTransitions | None = None, show_waypoints: bool = False, layer_marker: LayerSpec | None = None, raise_on_error: bool | None = None, path_length_matching_config: PathLengthConfig | None = None, layer_label: LayerSpec | None = None, port1: Port | None = None, port2: Port | None = None, ) -> list[ManhattanRoute]: """Places a bundle of routes to connect two groups of ports. Routes connect a bundle of ports with a river router. Chooses the correct routing function depending on port angles. Can also be used with single ports instead of lists, replacing route_bundle. Args: component: component to add the routes to. ports1: starting port or list of starting ports. ports2: end port or list of end ports. cross_section: CrossSection or function that returns a cross_section. layer: layer to use for the route. separation: bundle separation (center to center). Defaults to cross_section.width + cross_section.gap bend: function for the bend. Defaults to euler. sort_ports: sort port coordinates. start_straight_length: straight length at the beginning of the route. If None, uses default value for the routing CrossSection. end_straight_length: end length at the beginning of the route. If None, uses default value for the routing CrossSection. min_straight_taper: minimum length for tapering the straight sections. taper: function for tapering long straight waveguides beyond min_straight_taper. Defaults to None. port_type: type of port to place. Defaults to optical. collision_check_layers: list of layers to check for collisions. on_collision: action to take on collision. Defaults to None (ignore). on_placer_error: action to take on placer error. Defaults to None (ignore). bboxes: list of bounding boxes to avoid collisions. allow_width_mismatch: allow different port widths. allow_layer_mismatch: allow different port layers to connect. allow_type_mismatch: allow different port types to connect. radius: bend radius. If None, defaults to cross_section.radius. route_width: width of the route. If None, defaults to cross_section.width. straight: function for the straight. Defaults to straight. sbend: function for the s-bend. If None, uses the same function as bend. auto_taper: if True, auto-tapers ports to the cross-section of the route. auto_taper_taper: taper to use for auto-tapering. If None, uses the default taper for the cross-section. waypoints: list of waypoints to add to the route. steps: list of steps to add to the route. Each step is a dict with keys: x (absolute), y (absolute), dx (relative), dy (relative). Use x/y to set an absolute coordinate and dx/dy to shift relative to the current position. start_angles: list of start angles for the routes. Only used for electrical ports. end_angles: list of end angles for the routes. Only used for electrical ports. router: Set the type of router to use, either the optical one or the electrical one. If None, the router is optical unless the port_type is "electrical". layer_transitions: dictionary of layer transitions to use for the routing when auto_taper=True. show_waypoints: if True, places markers at each waypoint using CONF.layer_marker. layer_marker: layer to place markers on the route. Overrides CONF.layer_marker when show_waypoints=True. raise_on_error: if True, raises an exception on routing error instead of adding error markers. path_length_matching_config: path length matching configuration. layer_label: layer to place length labels on the route. port1: single start port (alternative to ports1 for single-port routing). port2: single end port (alternative to ports2 for single-port routing). .. plot:: :include-source: import gdsfactory as gf dy = 200.0 xs1 = [-500, -300, -100, -90, -80, -55, -35, 200, 210, 240, 500, 650] pitch = 10.0 N = len(xs1) xs2 = [-20 + i * pitch for i in range(N // 2)] xs2 += [400 + i * pitch for i in range(N // 2)] a1 = 90 a2 = a1 + 180 ports1 = [gf.Port(name=f"top_{i}", center=(xs1[i], +0), width=0.5, orientation=a1, layer=(1, 0)) for i in range(N)] ports2 = [gf.Port(name=f"bot_{i}", center=(xs2[i], dy), width=0.5, orientation=a2, layer=(1, 0)) for i in range(N)] c = gf.Component() gf.routing.route_bundle(component=c, ports1=ports1, ports2=ports2, cross_section='strip', separation=5) c.plot() """ on_collision = on_collision or CONF.on_collision on_placer_error = on_placer_error or CONF.on_placer_error if raise_on_error is None: raise_on_error = CONF.raise_on_error # Support deprecated port1/port2 keyword arguments if port1 is not None: if ports1 is not None: raise ValueError("Cannot specify both ports1 and port1") ports1 = port1 if port2 is not None: if ports2 is not None: raise ValueError("Cannot specify both ports2 and port2") ports2 = port2 if ports1 is None or ports2 is None: raise ValueError("ports1 and ports2 are required") # Wrap single ports in lists if isinstance(ports1, kf.DPort): ports1 = [ports1] if isinstance(ports2, kf.DPort): ports2 = [ports2] # Ensure ports are lists (they may be reversed, generators, etc.) port_list1 = list(ports1) port_list2 = list(ports2) # Resolve Pin inputs to Ports if port_list1 and isinstance(port_list1[0], kf.DPin): if not (port_list2 and isinstance(port_list2[0], kf.DPin)): raise TypeError( "Cannot mix Pins and Ports. " "If ports1 contains Pins, ports2 must also contain Pins." ) port_list1, port_list2 = resolve_pins( # type: ignore[assignment] cast(list[Pin], port_list1), cast(list[Pin], port_list2) ) elif port_list2 and isinstance(port_list2[0], kf.DPin): raise TypeError( "Cannot mix Pins and Ports. " "If ports2 contains Pins, ports1 must also contain Pins." ) if show_waypoints and layer_marker is None: layer_marker = gf.CONF.layer_marker component = gf.Component(base=component.base) # type: ignore[call-overload] ports1_resolved = [gf.Port(base=p1.base) for p1 in cast(list[kf.DPort], port_list1)] ports2_resolved = [gf.Port(base=p2.base) for p2 in cast(list[kf.DPort], port_list2)] if router: warnings.warn( f"The argument {router=} is ignored and will be removed in a future release.", stacklevel=2, ) if cross_section is None: if layer is None or route_width is None: raise ValueError( f"Either {cross_section=} or {layer=} and {route_width=} must be provided" ) elif layer is not None: raise ValueError( f"Cannot have both {layer=} and {cross_section=} provided. Choose one." ) c = component ports1_ = ports1_resolved ports2_ = ports2_resolved port_type = port_type or ports1_[0].port_type if cross_section is None: cross_section = partial( gf.cross_section.cross_section, layer=cast("LayerSpec", layer), width=cast("float", route_width), port_names=("e1", "e2") if port_type == "electrical" else ("o1", "o2"), port_types=(port_type, port_type), ) if len(ports1_) != len(ports2_): raise ValueError( f"ports1={len(ports1_)} and ports2={len(ports2_)} must be equal" ) if route_width: xs = gf.get_cross_section(cross_section, width=route_width) else: xs = gf.get_cross_section(cross_section) width = route_width or xs.width radius = radius or xs.radius taper_cell = gf.get_component(taper) if taper else None if collision_check_layers: collision_check_layer_enums = [ gf.get_layer(layer) for layer in collision_check_layers ] else: collision_check_layer_enums = None bboxes = list(bboxes or []) if auto_taper and auto_taper_taper: warn( "Use of `auto_taper_taper` is deprecated. Please use `layer_transitions` instead.", DeprecationWarning, stacklevel=2, ) taper_ = gf.get_component(auto_taper_taper) taper_o1 = taper_.ports[0].name taper_o2 = taper_.ports[1].name ports1_new: list[gf.Port] = [] ports2_new: list[gf.Port] = [] for p1, p2 in zip(ports1_, ports2_, strict=False): t1 = c << taper_ t2 = c << taper_ t1.connect(taper_o1, p1) t2.connect(taper_o1, p2) ports1_new.append(t1.ports[taper_o2]) ports2_new.append(t2.ports[taper_o2]) ports1_ = ports1_new ports2_ = ports2_new bbox1 = gf.kdb.DBox() bbox2 = gf.kdb.DBox() for port in ports1_: bbox1 += port.dcplx_trans.disp.to_p() for port in ports2_: bbox2 += port.dcplx_trans.disp.to_p() bboxes.append(bbox1) bboxes.append(bbox2) elif auto_taper: bbox1 = gf.kdb.DBox() bbox2 = gf.kdb.DBox() for port in ports1_: bbox1 += port.dcplx_trans.disp.to_p() for port in ports2_: bbox2 += port.dcplx_trans.disp.to_p() ports1_ = add_auto_tapers( component, ports1_, cross_section=xs, layer_transitions=layer_transitions ) ports2_ = add_auto_tapers( component, ports2_, cross_section=xs, layer_transitions=layer_transitions ) for port in ports1_: bbox1 += port.dcplx_trans.disp.to_p() for port in ports2_: bbox2 += port.dcplx_trans.disp.to_p() bboxes.append(bbox1) bboxes.append(bbox2) # component.shapes(component.kcl.layer(1,0)).insert(bbox) if steps and waypoints: raise ValueError("Provide only one of steps or waypoints") if steps: waypoints = [] x, y = ports1_[0].center for d in steps: if isinstance(d, dict): if not STEP_DIRECTIVES.issuperset(d): raise ValueError( f"Invalid step directives: {list(d.keys() - STEP_DIRECTIVES)}." f"Valid directives are {list(STEP_DIRECTIVES)}" ) x = d.get("x", x) + d.get("dx", 0) y = d.get("y", y) + d.get("dy", 0) else: raise ValueError( f"Invalid step {d!r}. Each step must be a dict with keys (x, y, dx, dy)." ) waypoints += [(x, y)] # type: ignore[arg-type] if layer_marker: marker = component << gf.components.rectangle( size=(10, 10), layer=layer_marker, centered=True ) marker.center = (x, y) if waypoints is not None and steps and len(waypoints) < 2: x, y = waypoints[-1][0], waypoints[-1][1] # type: ignore[index] x1, y1 = ports1_[0].center port2 = ports2_[0] x2, y2 = port2.center orientation = port2.orientation if orientation is not None and int(orientation) in {0, 180}: yt = y1 + (y2 - y1) / 3 ytt = y1 + 2 * (y2 - y1) / 3 waypoints = [(x, yt), (x, ytt)] elif orientation is not None and int(orientation) in {90, 270}: xt = x1 + (x2 - x1) / 3 xtt = x1 + 2 * (x2 - x1) / 3 waypoints = [(xt, y), (xtt, y)] waypoints_: list[kf.kdb.DPoint] | None if waypoints is None: waypoints_ = None elif len(waypoints) == 0: waypoints_ = [] elif not isinstance(waypoints[0], kf.kdb.DPoint): waypoints_ = [ kf.kdb.DPoint(p[0], p[1]) # type: ignore[index] for p in waypoints ] else: waypoints_ = [cast("kf.kdb.DPoint", p) for p in waypoints] if layer_marker and waypoints_ is not None: for p in waypoints_: marker = component << gf.components.rectangle( size=(10, 10), layer=layer_marker, centered=True ) marker.center = (p.x, p.y) if waypoints_ is not None and len(waypoints_) >= 2: waypoints_ = _ensure_manhattan_waypoints(waypoints_, start_port=ports1_[0]) bend90 = ( bend if isinstance(bend, gf.Component) else gf.get_component( bend, cross_section=cross_section, radius=radius, width=width ) ) def straight_um(width: float, length: float) -> gf.Component: return gf.get_component( straight, length=length, cross_section=cross_section, width=width ) if sbend: def _sbend( c: gf.kf.ProtoTKCell[Any], offset: float, length: float, width: float ) -> gf.kf.DInstanceGroup: sb = gf.get_component( sbend, cross_section=cross_section, width=width, size=(length, offset), ) sb_ref = component << sb return gf.kf.DInstanceGroup(insts=[sb_ref], ports=list(sb_ref.ports)) try: kf_on_collision = "error" if on_collision == "warning" else on_collision kf_on_placer_error = ( "error" if on_placer_error == "warning" else on_placer_error ) route = kf.routing.optical.route_bundle( component, ports1_, ports2_, separation=separation, straight_factory=straight_um, bend90_cell=bend90, taper_cell=taper_cell, starts=start_straight_length, ends=end_straight_length, min_straight_taper=min_straight_taper, place_port_type=port_type, collision_check_layers=[ c.kcl.layout.get_info(layer) for layer in collision_check_layer_enums ] if collision_check_layer_enums else None, on_collision=kf_on_collision, on_placer_error=kf_on_placer_error, allow_width_mismatch=allow_width_mismatch, allow_layer_mismatch=allow_layer_mismatch, allow_type_mismatch=allow_type_mismatch, bboxes=list(bboxes or []), route_width=width, sort_ports=sort_ports, waypoints=waypoints_, end_angles=end_angles, start_angles=start_angles, path_length_matching_config=path_length_matching_config, sbend_factory=_sbend if sbend else None, ) except Exception as e: if raise_on_error: if "kdb.Trans" in str(e): raise ValueError("You need at least 2 waypoints or steps.") from e if "non-manhattan" in str(e): raise ValueError( "Waypoints need to be Manhattan (axis-aligned) coordinates." ) from e raise if "kdb.Trans" in str(e): e = ValueError("You need at least 2 waypoints or steps.") elif "non-manhattan" in str(e): e = ValueError("Waypoints need to be Manhattan (axis-aligned) coordinates.") gf.logger.error(f"Error in route_bundle: {e}") warn(f"Routing failed: {e}", stacklevel=2) layer_error_path = gf.get_layer_info(gf.CONF.layer_error_path) route = kf.routing.electrical.route_bundle( component, ports1_, ports2_, separation=separation, starts=start_straight_length, ends=end_straight_length, on_collision=None, on_placer_error=None, bboxes=bboxes, route_width=width, sort_ports=sort_ports, end_angles=end_angles, start_angles=start_angles, place_layer=layer_error_path, ) if waypoints and waypoints_ is not None: layer_marker = gf.CONF.layer_error_path for p in waypoints_: marker = component << gf.components.rectangle( size=(10, 10), layer=layer_marker, centered=True ) marker.center = (p.x, p.y) if layer_label: for route_i in route: c.add_label( text=f"{route_i.length:.3f}", layer=layer_label, position=route_i.instances[0].dcenter, ) return route
route_bundle_electrical = partial( route_bundle, bend="wire_corner", allow_width_mismatch=True, )