Source code for braket.analog_hamiltonian_simulator.rydberg.rydberg_simulator_helpers

# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

import itertools
import time
import warnings

import numpy as np
import scipy.sparse
from braket.ir.ahs.atom_arrangement import AtomArrangement
from braket.ir.ahs.program_v1 import Program


[docs] def validate_config(config: str, atoms_coordinates: np.ndarray, blockade_radius: float) -> bool: """Valid if a given configuration complies with the Rydberg approximation Args: config (str): The configuration to be validated atoms_coordinates (ndarray): The coordinates for atoms in the filled sites blockade_radius (float): The Rydberg blockade radius Returns: bool: True if the configuration complies with the Rydberg approximation, False otherwise """ # The indices for the Rydberg atoms in the configuration rydberg_atoms = [i for i, item in enumerate(config) if item == "r"] for i, rydberg_atom in enumerate(rydberg_atoms[:-1]): dists = np.linalg.norm( atoms_coordinates[rydberg_atom] - atoms_coordinates[rydberg_atoms[i + 1 :]], axis=1 ) if min(dists) <= blockade_radius: return False return True
[docs] def get_blockade_configurations(lattice: AtomArrangement, blockade_radius: float) -> list[str]: """Return the lattice configurations complying with the blockade approximation Args: lattice (AtomArrangement): A lattice with Rydberg atoms and their coordinates blockade_radius (float): The Rydberg blockade radius Returns: list[str]: A list of bit strings, each of them corresponding to a valid configuration complying with the blockade approximation. The length of each configuration is the same as the number of atoms in the lattice, with 'r' and 'g' indicating the Rydberg and ground states, respectively. Notes on the indexing: The left-most bit in the configuration corresponds to the first atom in the lattice. Notes on the algorithm: We start from all possible configurations and get rid of those violating the blockade approximation constraint. """ # The coordinates for atoms in the filled sites atoms_coordinates = np.array(lattice.sites)[np.where(lattice.filling)] min_separation = float("inf") # The minimum separation between atoms, or filled sites for i, atom_coord in enumerate(atoms_coordinates[:-1]): dists = np.linalg.norm(atom_coord - atoms_coordinates[i + 1 :], axis=1) min_separation = min(min_separation, min(dists)) configurations = [ "".join(item) for item in itertools.product(["g", "r"], repeat=sum(lattice.filling)) ] if blockade_radius < min_separation: # no need to consider blockade approximation return configurations return [ config for config in configurations if validate_config(config, atoms_coordinates, blockade_radius) ]
def _get_interaction_dict( program: Program, rydberg_interaction_coef: float, configurations: list[str] ) -> dict[tuple[int, int], float]: """Return the dict contains the Rydberg interaction strength for all configurations. Args: program (Program): An analog simulation program for Rydberg system with the interaction term rydberg_interaction_coef (float): The interaction coefficient configurations (list[str]): The list of configurations that comply with the blockade approximation. Returns: dict[tuple[int, int], float]: The dictionary for the interaction operator """ # The coordinates for atoms in the filled sites lattice = program.setup.ahs_register atoms_coordinates = np.array( [lattice.sites[i] for i in range(len(lattice.sites)) if lattice.filling[i] == 1] ) interactions = {} # The interaction in the basis of configurations, as a dictionary for config_index, config in enumerate(configurations): interaction = 0 # The indices for the Rydberg atoms in the configuration rydberg_atoms = [i for i, item in enumerate(config) if item == "r"] # Obtain the pairwise distances between the Rydberg atoms, followed by adding their Rydberg # interactions for ind_1, rydberg_atom_1 in enumerate(rydberg_atoms[:-1]): for ind_2, rydberg_atom_2 in enumerate(rydberg_atoms): if ind_2 > ind_1: dist = np.linalg.norm( atoms_coordinates[rydberg_atom_1] - atoms_coordinates[rydberg_atom_2] ) interaction += rydberg_interaction_coef / (float(dist) ** 6) if interaction > 0: interactions[(config_index, config_index)] = interaction return interactions def _get_detuning_dict( targets: tuple[int], configurations: list[str] ) -> dict[tuple[int, int], float]: """Return the dict contains the detuning operators for a set of target atoms. Args: targets (tuple[int]): The target atoms of the detuning operator configurations (list[str]): The list of configurations that comply with the blockade approximation. Returns: dict[tuple[int, int], float]: The dictionary for the detuning operator """ detuning = {} # The detuning term in the basis of configurations, as a dictionary for ind_1, config in enumerate(configurations): value = sum([1 for ind_2, item in enumerate(config) if item == "r" and ind_2 in targets]) if value > 0: detuning[(ind_1, ind_1)] = value return detuning def _get_rabi_dict(targets: tuple[int], configurations: list[str]) -> dict[tuple[int, int], float]: """Return the dict for the Rabi operators for a set of target atoms. Args: targets (tuple[int]): The target atoms of the detuning operator configurations (list[str]): The list of configurations that comply with the blockade approximation. Returns: dict[tuple[int, int], float]: The dictionary for the Rabi operator Note: We only save the lower triangular part of the matrix that corresponds to the Rabi operator. """ rabi = {} # The Rabi term in the basis of configurations, as a dictionary # use dictionary to store index of configurations configuration_index = {config: ind for ind, config in enumerate(configurations)} for ind_1, config_1 in enumerate(configurations): for target in targets: # Only keep the lower triangular part of the Rabi operator # which convert a single atom from "g" to "r". if config_1[target] != "g": continue # Construct the state after applying the Rabi operator bit_list = list(config_1) bit_list[target] = "r" config_2 = "".join(bit_list) # If the constructed state is in the Hilbert space, # add the corresponding matrix element to the Rabi operator. if config_2 in configuration_index: rabi[(configuration_index[config_2], ind_1)] = 1 return rabi def _get_sparse_from_dict( matrix_dict: dict[tuple[int, int], float], matrix_dimension: int ) -> scipy.sparse.csr_matrix: """Convert a dict to a CSR sparse matrix Args: matrix_dict (dict[tuple[int, int], float]): The dict for the sparse matrix matrix_dimension (int): The size of the sparse matrix Returns: scipy.sparse.csr_matrix: The sparse matrix in CSR format """ rows = [key[0] for key in matrix_dict.keys()] cols = [key[1] for key in matrix_dict.keys()] return scipy.sparse.csr_matrix( tuple([list(matrix_dict.values()), [rows, cols]]), shape=(matrix_dimension, matrix_dimension), ) def _get_sparse_ops( program: Program, configurations: list[str], rydberg_interaction_coef: float ) -> tuple[ list[scipy.sparse.csr_matrix], list[scipy.sparse.csr_matrix], scipy.sparse.csr_matrix, list[scipy.sparse.csr_matrix], ]: """Returns the sparse matrices for Rabi, detuning, interaction and local detuning detuning operators Args: program (Program): An analog simulation program for Rydberg system configurations (list[str]): The list of configurations that comply with the blockade approximation. rydberg_interaction_coef (float): The interaction coefficient Returns: tuple[list[csr_matrix],list[csr_matrix],csr_matrix,list[csr_matrix]]: A tuple containing the list of Rabi operators, the list of detuing operators, the interaction operator and the list of local detuing operators """ # Get the driving fields as sparse matrices, whose targets are all the atoms in the system targets = np.arange(np.count_nonzero(program.setup.ahs_register.filling)) rabi_dict = _get_rabi_dict(targets, configurations) detuning_dict = _get_detuning_dict(targets, configurations) # Driving field is an array of operators, which has only one element for now rabi_ops = [_get_sparse_from_dict(rabi_dict, len(configurations))] detuning_ops = [_get_sparse_from_dict(detuning_dict, len(configurations))] # Get the interaction term as a sparse matrix interaction_dict = _get_interaction_dict(program, rydberg_interaction_coef, configurations) interaction_op = _get_sparse_from_dict(interaction_dict, len(configurations)) # Get local detuning as sparse matrices. # Local detuning is an array of operators, which has only one element for now local_detuning_ops = [] for local_detuning in program.hamiltonian.localDetuning: temp = 0 filled_site = 0 # Index of the filled site for filling, strength in zip( program.setup.ahs_register.filling, local_detuning.magnitude.pattern ): # If the site is not filled, we move on to the next filled site if filling == 0: continue opt = _get_sparse_from_dict( _get_detuning_dict((filled_site,), configurations), len(configurations) ) temp += float(strength) * scipy.sparse.csr_matrix(opt, dtype=float) filled_site += 1 local_detuning_ops.append(temp) return rabi_ops, detuning_ops, interaction_op, local_detuning_ops def _interpolate_time_series( t: float, times: list[float], values: list[float], method: str = "piecewise_linear" ) -> float: """Interpolates the value of a series of time-value pairs at the given time via linear interpolation. Args: t (float): The given time point times (list[float]): The list of time points values (list[float]): The list of values method (str): The method for interpolation, either "piecewise_linear" or "piecewise_constant." Default: "piecewise_linear" Returns: float: The interpolated value of the time series at t """ times = [float(time) for time in times] values = [float(value) for value in values] if method == "piecewise_linear": return np.interp(t, times, values) elif method == "piecewise_constant": index = np.searchsorted(times, t, side="right") - 1 return values[index] else: raise ValueError("`method` can only be `piecewise_linear` or `piecewise_constant`.") def _get_coefs( program: Program, simulation_times: list[float] ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Returns the coefficients for the Rabi operators, detuning operators and local detuning operators for all the time points in the analog simulation program. Args: program (Program): An analog simulation program for Rydberg system simulation_times (list[float]): The list of time points Returns: tuple[ndarray, ndarray, ndarray]: A tuple containing the list of Rabi frequencies, the list of global detuings and the list of local detunings """ rabi_coefs, detuning_coefs = [], [] for driving_field in program.hamiltonian.drivingFields: amplitude = driving_field.amplitude.time_series phase = driving_field.phase.time_series detuning = driving_field.detuning.time_series # Get the Rabi part. We use the convention: Omega * exp(1j*phi) * |r><g| + h.c. rabi_coef = np.array( [ _interpolate_time_series( t, amplitude.times, amplitude.values, method="piecewise_linear" ) * np.exp( 1j * _interpolate_time_series( t, phase.times, phase.values, method="piecewise_constant" ) ) for t in simulation_times ], dtype=complex, ) rabi_coefs.append(rabi_coef) # Get the detuning part detuning_coef = np.array( [ _interpolate_time_series( t, detuning.times, detuning.values, method="piecewise_linear" ) for t in simulation_times ], dtype=complex, ) detuning_coefs.append(detuning_coef) # add local detuning local_detuing_coefs = [] for local_detuning in program.hamiltonian.localDetuning: magnitude = local_detuning.magnitude.time_series local_detuing_coef = np.array( [ _interpolate_time_series( t, magnitude.times, magnitude.values, method="piecewise_linear" ) for t in simulation_times ], dtype=complex, ) local_detuing_coefs.append(local_detuing_coef) return np.array(rabi_coefs), np.array(detuning_coefs), np.array(local_detuing_coefs) def _get_ops_coefs( program: Program, configurations: list[str], rydberg_interaction_coef: float, simulation_times: list[float], ) -> tuple[ list[scipy.sparse.csr_matrix], list[scipy.sparse.csr_matrix], list[scipy.sparse.csr_matrix], np.ndarray, np.ndarray, np.ndarray, scipy.sparse.csr_matrix, ]: """Returns the sparse matrices and coefficients for the Rabi terms, detuning terms and the local detuining terms, together with the interaction operator in the given analog simulation program for Rydberg systems. Args: program (Program): An analog simulation program for Rydberg system configurations (list[str]): The list of configurations that comply to the blockade approximation. rydberg_interaction_coef (float): The interaction coefficient simulation_times (list[float]): The list of time points Returns: tuple[ list[csr_matrix], list[csr_matrix], list[csr_matrix], ndarray, ndarray, ndarray, csr_matrix ]: A tuple containing the list of Rabi operators, the list of detuing operators, the list of local detuing operators, the list of Rabi frequencies, the list of global detuings, the list of local detunings and the interaction operator. """ rabi_ops, detuning_ops, interaction_op, local_detuning_ops = _get_sparse_ops( program, configurations, rydberg_interaction_coef ) rabi_coefs, detuning_coefs, local_detuing_coefs = _get_coefs(program, simulation_times) return ( rabi_ops, detuning_ops, local_detuning_ops, rabi_coefs, detuning_coefs, local_detuing_coefs, interaction_op, )
[docs] def sample_state(state: np.ndarray, shots: int) -> np.ndarray: """Sample measurement outcomes from the quantum state `state` Args: state (ndarray): A state vector shots (int): The number of samples Returns: ndarray: The array for the sample results """ weights = (np.abs(state) ** 2).flatten() weights /= sum(weights) sample = np.random.multinomial(shots, weights) return sample
def _print_progress_bar(num_time_points: int, index_time: int, start_time: float) -> None: """Print a lightweight progress bar Args: num_time_points (int): The total number of time points index_time (int): The index of the current time point start_time (float): The starting time for the simulation """ if index_time == 0: print("0% finished, elapsed time = NA, ETA = NA", flush=True, end="\r") else: current_time = time.time() estimate_time_arrival = ( (current_time - start_time) / (index_time + 1) * (num_time_points - (index_time + 1)) ) print( f"{100 * (index_time + 1) / num_time_points}% finished, " f"elapsed time = {(current_time - start_time)} seconds, " f"ETA = {estimate_time_arrival} seconds ", flush=True, end="\r", ) def _get_hamiltonian( index_time: float, operators_coefficients: tuple[ list[scipy.sparse.csr_matrix], list[scipy.sparse.csr_matrix], list[scipy.sparse.csr_matrix], np.ndarray, np.ndarray, np.ndarray, scipy.sparse.csr_matrix, ], ) -> scipy.sparse.csr_matrix: """Get the Hamiltonian at a given time point Args: index_time (float): The index of the current time point operators_coefficients (tuple[ list[csr_matrix], list[csr_matrix], list[csr_matrix], ndarray, ndarray, ndarray, csr_matrix ]): A tuple containing the list of Rabi operators, the list of detuing operators, the list of local detuing operators, the list of Rabi frequencies, the list of global detuings, the list of local detunings and the interaction operator. Returns: (scipy.sparse.csr_matrix): The Hamiltonian at the given time point as a sparse matrix """ ( rabi_ops, detuning_ops, local_detuning_ops, rabi_coefs, detuning_coefs, local_detuing_coefs, interaction_op, ) = operators_coefficients index_time = int(index_time) if len(rabi_coefs) > 0: # If there is driving field, the maximum of index_time is the maximum time index # for the driving field. # Note that, if there is more than one driving field, we assume that they have the # same number of coefficients max_index_time = len(rabi_coefs[0]) - 1 else: # If there is no driving field, then the maxium of index_time is the maxium time # index for local detuning. # Note that, if there is more than one local detuning, we assume that they have the # same number of coefficients # Note that, if there is no driving field nor local detuning, the initial state will # be returned, and the simulation would not reach here. max_index_time = len(local_detuing_coefs[0]) - 1 # If the integrator uses intermediate time value that is larger than the maximum # time value specified, the final time value is used as an approximation. if index_time > max_index_time: index_time = max_index_time warnings.warn( "The solver uses intermediate time value that is " "larger than the maximum time value specified. " "The final time value of the specified range " "is used as an approximation." ) # If the integrator uses intermediate time value that is larger than the minimum # time value specified, the final time value is used as an approximation. if index_time < 0: index_time = 0 warnings.warn( "The solver uses intermediate time value that is " "smaller than the minimum time value specified. " "The first time value of the specified range " "is used as an approximation." ) hamiltonian = interaction_op # Add the driving fields for rabi_op, rabi_coef, detuning_op, detuning_coef in zip( rabi_ops, rabi_coefs, detuning_ops, detuning_coefs ): hamiltonian += ( rabi_op * rabi_coef[index_time] / 2 + (rabi_op.T.conj() * np.conj(rabi_coef[index_time]) / 2) - detuning_op * detuning_coef[index_time] ) # Add local detuning for local_detuning_op, local_detuning_coef in zip(local_detuning_ops, local_detuing_coefs): hamiltonian -= local_detuning_op * local_detuning_coef[index_time] return hamiltonian def _apply_hamiltonian( index_time: float, operators_coefficients: tuple[ list[scipy.sparse.csr_matrix], list[scipy.sparse.csr_matrix], list[scipy.sparse.csr_matrix], list[scipy.sparse.csr_matrix], np.ndarray, np.ndarray, scipy.sparse.csr_matrix, ], input_register: np.ndarray, ) -> scipy.sparse.csr_matrix: """Applies the Hamiltonian at a given time point on a state. Args: index_time (float): The index of the current time point operators_coefficients (tuple[ list[csr_matrix], list[csr_matrix], list[csr_matrix], list[csr_matrix], ndarray, ndarray, csr_matrix ]): A tuple containing the list of Rabi operators, the list of detuing operators, the list of local detuing operators, the list of Rabi frequencies, the list of global detuings, the list of local detunings and the interaction operator. input_register (ndarray): The input state which we apply the Hamiltonian to. Returns: (ndarray): The result """ ( rabi_ops, detuning_ops, local_detuning_ops, rabi_coefs, detuning_coefs, local_detuing_coefs, interaction_op, ) = operators_coefficients index_time = int(index_time) if len(rabi_coefs): # If there is driving field, the maximum of index_time is the maximum time index # for the driving field. # Note that, if there is more than one driving field, we assume that they have the # same number of coefficients max_index_time = len(rabi_coefs[0]) - 1 else: # If there is no driving field, then the maxium of index_time is the maxium time # index for local detuning. # Note that, if there is more than one local detuning, we assume that they have the # same number of coefficients # Note that, if there is no driving field nor local detuning, the initial state will # be returned, and the simulation would not reach here. max_index_time = len(local_detuing_coefs[0]) - 1 # If the integrator uses intermediate time value that is larger than the maximum # time value specified, the final time value is used as an approximation. if index_time > max_index_time: index_time = max_index_time warnings.warn( "The solver uses intermediate time value that is " "larger than the maximum time value specified. " "The final time value of the specified range " "is used as an approximation." ) # If the integrator uses intermediate time value that is larger than the minimum # time value specified, the final time value is used as an approximation. if index_time < 0: index_time = 0 warnings.warn( "The solver uses intermediate time value that is " "smaller than the minimum time value specified. " "The first time value of the specified range " "is used as an approximation." ) output_register = interaction_op.dot(input_register) # Add the driving fields for rabi_op, rabi_coef, detuning_op, detuning_coef in zip( rabi_ops, rabi_coefs, detuning_ops, detuning_coefs ): output_register += (rabi_coef[index_time] / 2) * rabi_op.dot(input_register) output_register += (np.conj(rabi_coef[index_time]) / 2) * rabi_op.H.dot(input_register) output_register -= detuning_coef[index_time] * detuning_op.dot(input_register) # Add local detuning for local_detuning_op, local_detuning_coef in zip(local_detuning_ops, local_detuing_coefs): output_register -= local_detuning_coef[index_time] * local_detuning_op.dot(input_register) return output_register