Source code for tkwant.manybody

# -*- coding: utf-8 -*-
# Copyright 2016-2023 tkwant authors.
# This file is part of tkwant.  It is subject to the license terms in the file
# LICENSE.rst found in the top-level directory of this distribution and at
# A list of tkwant authors can be found in
# the file AUTHORS.rst at the top-level directory of this distribution and at
"""Tools for calculating time-dependent, manybody quantities."""

import functools as ft
import itertools
import copy
import numpy as np

import kwant
import kwantspectrum

from . import onebody, leads, integration, mpi, _common, _logging, greenfunctions
from .system import add_time_to_params, ExtendedSystem

__all__ = ['Occupation', 'Interval', 'lead_occupation',
           'calc_intervals', 'split_intervals', 'combine_intervals',
           'calc_tasks', 'calc_initial_state', 'calc_energy_cutoffs',
           'WaveFunction', 'State', 'make_boundstates_time_dependent',
           'add_boundstates', 'boundstates_present', 'ManybodyIntegrand']

GreenFunction = greenfunctions.GreenFunction

# set module logger
logger = _logging.make_logger(name=__name__)
log_func = _logging.log_func(logger)

# TODO: reintroduce dataclasses at some point when python 3.7 is stable

[docs]class Occupation: """Data format for the lead occupation, see `tkwant.manybody.lead_occupation`. Attributes ---------- distribution : callable distribution function energy_range : sequence, optional Energy cutoffs. bands : sequence, optional Selected bands. multivariate` : bool, optional multivariate distribution function """ def __init__(self, distribution, energy_range=None, bands=None, multivariate=False): self.distribution = distribution self.energy_range = energy_range self.bands = bands self.multivariate = multivariate def __str__(self): string = "lead occupation: " \ "distribution={distribution}, energy_range={energy_range} , " \ "bands={bands}, multivariate={multivariate}".format(**self.__dict__) return string
[docs]class Interval: """Data format for a quadrature interval, see `tkwant.manybody.calc_intervals`. Attributes ---------- lead : int lead index band : int band index (*n*) kmin : float lower momentum (:math:`k`) bound kmax : float upper momentum (:math:`k`) bound order : int, optional Integration rule order to be applied on the interval. See `tkwant.integration.calc_abscissas_and_weights`. quadrature : string, optional Integration quadrature to be applied on the interval. See `tkwant.integration.calc_abscissas_and_weights`. integration_variable : string, optional The variable of integration (in general energy or momentum). See `tkwant.manybody.calc_tasks`. """ def __init__(self, lead: int, band: int, kmin: float, kmax: float, order: int = 10, quadrature: str = "kronrod", integration_variable: str = "momentum"): self.lead = lead = band self.kmin = kmin self.kmax = kmax self.order = order self.quadrature = quadrature self.integration_variable = integration_variable def __eq__(self, other) -> bool: if not isinstance(other, Interval): return NotImplemented return ( (self.lead,, self.kmin, self.kmax, self.order, self.quadrature, self.integration_variable) == (other.lead,, other.kmin, other.kmax, other.order, other.quadrature, other.integration_variable)) def __hash__(self) -> int: return hash((self.lead,, self.kmin, self.kmax, self.order, self.quadrature, self.integration_variable)) def __str__(self): string = "quadrature interval: " \ "lead={lead}, band={band}, kmin={kmin:.6f}, kmax={kmax:.6f}, " \ "order={order}, quadrature={quadrature}, "\ "integration_variable={integration_variable}".format(**self.__dict__) return string
# occupation def _fermi_function_below_epsilon(temperature, epsilon=1E-10): """Return the energy *E* above which 1/(1 + exp(E/T)) < epsilon""" assert _common.is_type(temperature, 'real_number') assert _common.is_type(epsilon, 'real_number') assert temperature > 0 assert 0 < epsilon < 1 return temperature * np.log((1 - epsilon) / epsilon)
[docs]@log_func def lead_occupation(chemical_potential=0, temperature=0, bands=None, occupation_type=Occupation): r"""Set the occupation (:math:`T, \mu, f(E)`) for one lead. Routine to obtain the fermi function f(E) and the upper energy cutoff for a single lead, in order to calculate integrals of the form :math:`I = \int d E \, f(E) \ldots`. Energy cutoffs are estimated from the temperature and the chemical potential only, without knowledge of the band structure. * Fermi dirac distribution :math:`f(E) = (1 + \exp((E - \mu)/T))^{-1}` For *T = 0*: :math:`f(E) = \theta(\mu - E)` the chemical potential (Fermi energy) is a sharp upper cutoff, :math:`I = \int_{-\infty}^\mu d E \, \ldots`. For *T > 0* we estimate an effective upper cutoff *Eeff*, such that :math:`f(E) \leq \varepsilon` for :math:`E \geq Eeff`, :math:`I = \int_{-\infty}^{Eeff} d E \, f(E) \ldots`. Parameters ---------- chemical_potential : float or sequence of floats, optional chemical potential :math:`\mu`, zero by default If a sequence, it must have the same number of elements as bands per lead and acts as a cutoff per band. Setting ``chemical_potential`` to None is equivalent to + infinity. temperature : float, optional temperature :math:`T`, default: zero temperature bands : int or list of int, optional If present, only bands with band indices (*n*) present in the list are taken into account. By default, all bands are considered. occupation_type : `~dataclasses.dataclass`, optional Data format of the returnded ``occupation`` object to store the lead occupation. Default: `tkwant.manybody.Occupation`. Returns ------- occupation : ``occupation_type`` Lead occupation. Attributes of ``occupation`` that are modified by this routine: - `distribution` : callable, distribution function *f(E)*, calling signature `(energy)` - `energy_range` : sequence, energy cutoffs - `bands` : list of int, bands to be considered - `multivariate` : bool, multivariate distribution function """ def fermi_dirac_distribution(energy, mu): """Fermi diract distribution function""" if mu is None: return 1. / (1 + np.exp(energy / temperature)) return 1. / (1 + np.exp((energy - mu) / temperature)) def zero_temperature_fermi_dirac_distribution(energy, mu): """Fermi diract distribution function""" if mu is None: return 1 return np.where(energy <= mu, 1, 0) multivariate = False if not isinstance(chemical_potential, if not (_common.is_type(chemical_potential, 'real_number') or (chemical_potential is None)): raise TypeError('chemical potential must be a real number or "None".') else: multivariate = True for mu in chemical_potential: if not (_common.is_type(mu, 'real_number') or (chemical_potential is None)): raise TypeError('chemical potential must be a sequence of real numbers or "None".') if not _common.is_type(temperature, 'real_number'): raise TypeError('temperature must be a real number.') if temperature < 0: raise ValueError('temperature={} is negative.'.format(temperature)) if np.isclose(temperature, 0):'distribution function: zero-temperature fermi-dirac') # chemical potential is upper energy cutoff for zero-temp. fermi dirac if not multivariate: distribution = ft.partial(zero_temperature_fermi_dirac_distribution, mu=chemical_potential) energy_range = [(None, chemical_potential)] else: distribution = [ft.partial(zero_temperature_fermi_dirac_distribution, mu=mu) for mu in chemical_potential] energy_range = [[(None, mu)] for mu in chemical_potential] else:'distribution function: finite-temperature fermi-dirac') edt = _fermi_function_below_epsilon(temperature) if not multivariate: distribution = ft.partial(fermi_dirac_distribution, mu=chemical_potential) if chemical_potential is None: energy_range = [(None, None)] else: effective_emax = chemical_potential + edt energy_range = [(None, effective_emax)] else: distribution = [ft.partial(fermi_dirac_distribution, mu=mu) for mu in chemical_potential] energy_range = [] for mu in chemical_potential: if my is None: energy_range.append([(None, None)]) else: energy_range.append([(None, mu + edt)]) _bands = copy.deepcopy(bands) if _bands is not None: try: _bands = sorted(set(list(_bands))) except TypeError: # if bands is just a single integer _bands = [_bands] return occupation_type(distribution, energy_range, _bands, multivariate)
# selection of the energy/momentum integration region def _calc_momentum_intervals(spectrum, emin=None, emax=None, bands=None, tol=1e-5): r"""Get the momentum intervals for one lead. Parameters ---------- spectrum : `~kwantspectrum.spectrum` Energy dispersion :math:`E_n(k)` of one lead. emin : int or float, optional Minimum band energy. If present, select intervals such that :math:`E_n(k) \geq` `emin`. Default: no lower energy cutoff. emax : int or float, optional Maximum band energy. If present, select intervals such that :math:`E_n(k) \leq` `emax`. Default: no upper energy cutoff, only `chemical_potential` will serve as a cutoff for the default fermi dirac distribution. bands : list of int, optional If present, only bands with band indices (*n*) present in the list are taken into account. (`emin` and `emax` still limit the energy range of these bands.) tol : intervals below width k < tol might be dropped, optional Returns ------- intervals : list List of all momentum intervals in the form `(band, (kmin, kmax))` where `band` is the band index (*n*), and `kmin` respectively `kmax` are the minimum, respectively maximum values of the momentum (*k*) interval. The length of the list depends on the lead spectrum and is not known a priori. """ def positive_velocity_intervals(band): """Momentum intervals with emin < energy < emax and positive velocity""" intervals_valid_energy = spectrum.intervals(band, lower=emin, upper=emax) intervals_positve_velocity = spectrum.intervals(band, lower=0, derivative_order=1, tol=tol) return kwantspectrum.intersect_intervals( intervals_valid_energy, intervals_positve_velocity) if bands is None: bands = range(spectrum.nbands) assert _common.is_type_array(bands, 'integer').all() return [(band, interval) for band in bands for interval in positive_velocity_intervals(band) if _common.is_not_empty(interval)]
[docs]@log_func def calc_intervals(spectra, occupations, interval_type=Interval, tol=1e-5): r"""Return a list of momentum intervals to perform the manybody integration. Parameters ---------- spectra : `kwantspectrum.spectrum` or sequence thereof Energy dispersion :math:`E_n(k)` of one lead or sequence thereof in case of several leads. occupations : `tkwant.manybody.Occupation` or sequence thereof Each sequence element represents the occupation of a lead. If ``occupations`` consistst of only one element or if the sequence has only one element, the occupation is assumed to be identical in each lead. If ``occupations`` is a sequence with more than one element, such that the occupation for each lead is different, ``occupations`` must have the same length as ``spectra``. If a lead is occupied, the corresponding ``occupations`` element must have the following two attributes: - `energy_range` : sequence of energy bounds in form `(emin, emax)` (`emin` :math:`\leq E_n(k) \leq` `emax`), no bound if `emin` and/or `emax` is set to `None`. - `bands` : list of int, bands to be considered, all bands considered if `None` If a lead is not occupied, the corresponding element of ``occupations`` must be `None` or `False`. interval_type : `~dataclasses.dataclass`, optional Data format to store an interval in the returned ``intervals`` list. Default: `tkwant.manybody.Interval` tol : intervals below width k < tol might be dropped, optional Returns ------- intervals : list of ``interval_type`` All momentum intervals to perform the statistical average. Attributes of each ``intervals`` element that are modified by this routine: - `lead` : int, lead index - `band` : int, band index (*n*) - `kmin` : float, lower momentum (:math:`k`) bound - `kmax` : float, upper momentum (:math:`k`) bound """ def calc_intervals_per_lead(lead, occupation, spectrum): """Intervals and quadrature rule for a single lead.""" bands = occupation.bands if occupation.energy_range is None: emin = None emax = None lead_intervals = [interval_type(lead, band, kmin, kmax) for band, (kmin, kmax) in _calc_momentum_intervals(spectrum, emin, emax, bands, tol)] else: lead_intervals = [] if occupation.multivariate: # energy range given for each band if len(occupation.energy_range) != spectrum.nbands: raise ValueError('Multivariate occupation does not match nb of bands.') for band, energy_range in enumerate(occupation.energy_range): for emin, emax in energy_range: for _, (kmin, kmax) in _calc_momentum_intervals(spectrum, emin, emax, [band], tol): lead_intervals.append(interval_type(lead, band, kmin, kmax)) else: # energy range given for all band for (emin, emax) in occupation.energy_range: lead_intervals += [interval_type(lead, band, kmin, kmax) for band, (kmin, kmax) in _calc_momentum_intervals(spectrum, emin, emax, bands, tol)] return lead_intervals if not isinstance(spectra, spectra = [spectra] if not isinstance(occupations, occupations = [occupations] uniform_occupation = False if len(occupations) == 1: logger.debug('occupation in all leads assumed identical for interval calculation') uniform_occupation = True if not uniform_occupation and len(occupations) != len(spectra): raise ValueError('Occupation and spectra must have the same length.') intervals = [] for lead, spectrum in enumerate(spectra): if uniform_occupation: occupation = occupations[0] else: occupation = occupations[lead] if occupation: intervals_per_lead = calc_intervals_per_lead(lead, occupation, spectrum) if _common.is_not_empty(intervals_per_lead): intervals += intervals_per_lead return intervals
def _split_interval(interval, number_subintervals): """Divide an interval into `number_subintervals` equidistant intervals. Parameters ---------- interval : `tkwant.manybody.Interval` A momentum interval. An ``interval`` must have at least the following two attributes: - `kmin` : float, lower momentum bound - `kmax` : float, upper momentum bound number_subintervals : int number of subintervals in which ``interval`` will be divided. `number_subintervals` must be larger zero. Returns ------- intervals : list List of `tkwant.manybody.Interval`. All attributes are copied, only the values for the `kmin` and `kmax` attribute are changed. The number of list elements is `number_subintervals`. """ if number_subintervals <= 0: raise ValueError("split number={} must be positive and larger zero." .format(number_subintervals)) assert _common.is_type(number_subintervals, 'integer') interval = copy.deepcopy(interval) if number_subintervals == 1: return [interval] dkint = (interval.kmax - interval.kmin) / number_subintervals interval.kmax = interval.kmin + dkint def make_new_interval(dk): new_interval = copy.deepcopy(interval) new_interval.kmin += dk new_interval.kmax += dk return new_interval return [make_new_interval(i * dkint) for i in range(number_subintervals)]
[docs]def split_intervals(intervals, number_subintervals): """Divide each interval in `number_subintervals` equidistant intervals. Parameters ---------- intervals : sequence of `tkwant.manybody.Interval` Momentum intervals to perform the statistical average. Each ``intervals`` sequence element must have at least the following two attributes: - `kmin` : float, lower momentum bound - `kmax` : float, upper momentum bound number_subintervals : int number of subintervals in which each element of ``intervals`` in will be divided Returns ------- splitted_intervals : list List of subdivided momentum intervals to perform the statistical average. All attributes are copied, only the values for the `kmin` and `kmax` attribute are changed. Number of list elements is ``len(intervals) * number_subintervals``. """ splitted_intervals = [] for interval in copy.deepcopy(intervals): splitted_intervals += _split_interval(interval, number_subintervals) return splitted_intervals
def _attributes_similar(obj0, obj1, attributes, atol=1E-10, rtol=1E-10): """Return True if attributes are either equal or numerically close Parameters ---------- obj0 : obj Some object. obj1 : obj Some object. attributes : sequence of str Attributes which are used to check similarity. atol : float, optional Absolute accuracy. rtol : float, optional Relative accuracy. Returns ------- similar : bool Returns True if ``obj0`` and ``obj1`` are similar. Let us define `a = obj0.attribute` and `b = obj1.attribute` where attribute is an element of the ``attributes`` list. If `a` and `b` are numerical values (float or numpy arrays) we require that they are approximatly similar: `abs(a - b) < (atol + rtol * abs(b))`. For all other attributes `a == b` is required stricly. If either `a` of `b` is None, we return False. """ assert _common.is_type(atol, 'real_number') assert atol >= 0 assert _common.is_type(rtol, 'real_number') assert rtol >= 0 for attrib in attributes: a = getattr(obj0, attrib, None) b = getattr(obj1, attrib, None) if isinstance(a, float) or isinstance(b, float): try: similar = abs(a - b) < (atol + rtol * abs(b)) except Exception: similar = False if not similar: return False elif isinstance(a, np.ndarray) or isinstance(b, np.ndarray): try: similar = np.allclose(a, b, atol, rtol) except Exception: similar = False if not similar: return False else: try: similar = a == b except Exception: similar = False if not similar: return False return True
[docs]@log_func def combine_intervals(intervals, atol=1E-10, rtol=1E-10): """Group leads for similar intervals together. As an example ``int_1 = (lead=0, kmin=0, kmax=1..)`` ``int_2 = (lead=1, kmin=0, kmax=1..)`` are two intervals, which are similar except the lead index. If the sequence ``intervals = [int_1, int_2]`` is passed to this routine the lead indices are combined and the returnd result is ``[(lead=(0, 1), kmin=0, kmax=1...)]``. To test for similarity between the intervals, we compare the attributes, (exept the ones with two leading underscores). For attributes with numerical values `a` and `b`, we require `abs(a - b) < (atol + rtol * abs(b))` for similarity. Other attributes must match exactly: `a == b if a, b != lead`. Parameters ---------- intervals : sequence of `tkwant.manybody.Interval` Quadrature intervals. atol : float, optional Absolute accuracy. rtol : float, optional Relative accuracy. Returns ------- new_intervals : list of `tkwant.manybody.Interval` Combined intervals. Lead indices in the combined intervals are grouped in a tuple. """ intervals_ = copy.deepcopy(intervals) new_intervals = [] while True: try: i0 = intervals_.pop(0) except: break new_intervals.append(i0) attributes = [a for a in dir(i0) if not a.startswith('__')] try: attributes.remove('lead') except Exception: raise AttributeError('attribute "lead" missing in interval={}'.format(i0)) leads = set() tmp = [] for i1 in intervals_: if _attributes_similar(i0, i1, attributes, atol, rtol): if isinstance(i0.lead, int): leads.add(i0.lead) else: leads.update(list(i0.lead)) if isinstance(i1.lead, int): leads.add(i1.lead) else: leads.update(list(i1.lead)) else: tmp.append(i1) intervals_ = tmp if leads: new_intervals[-1].lead = tuple(leads) return new_intervals
[docs]@log_func def calc_tasks(intervals, spectra, occupations, keys=None, task_type=onebody.Task, tol=1E-10): r"""Return all tasks (set of quantum numbers that uniquely identify a onebody state) that will form the manybody state. Parameters ---------- intervals : `tkwant.manybody.Interval` or sequence thereof Momentum intervals with quadrature rules for each interval. Each element of ``intervals`` must have at least the following attributes: - `lead` : int or tuple of int, lead index - `band` : int, band index (*n*) - `kmin` : float, lower momentum bound - `kmax` : float, upper momentum bound, must be larger than `kmin` - `order` : int, quadrature order. See `tkwant.integration.calc_abscissas_and_weights` - `quadrature` : string, quadrature rule to use. See `tkwant.integration.calc_abscissas_and_weights` - `integration_variable` : string, variable of integration. Possible values: `"energy"` : integrate over energy, `"momentum"`: integrate over momentum spectra : `kwantspectrum.spectrum` or sequence thereof Energy dispersion :math:`E_n(k)` of one lead or list thereof in case of several leads. occupations : `tkwant.manybody.Occupation` or sequence thereof Each sequence element represents the occupation of a lead. If ``occupations`` consistst of only one element or if the sequence has only one element, the occupation is assumed to be identical in each lead. If ``occupations`` is a sequence with more than one element, such that the occupation for each lead is different, ``occupations`` must have the same length as ``spectra``. Each element of ``occupations`` must have at least the following attribute: - `distribution` : callable, thermal distribution function with calling signature `(energy)` If a lead is not occupied, the corresponding element of ``occupations`` must be `None` or `False`. keys : iterable, optional Iterable to generate keys for the ``tasks`` dict. Default: Ascending integer sequence (0, 1, 2..), starting at zero. task_type : `~dataclasses.dataclass`, optional Data format to store a task in the returned ``tasks`` dict. Default: `tkwant.onebody.Task` tol : float, optional Numerical tolerance to remove tasks when their weights are almost zero. Condition to remove a task is \|weights| < tol. Returns ------- tasks : dict of ``task_type`` Dict of all tasks (set of quantum numbers that uniquely identify a onebody state) to set up the manybody wavefunction. Attributes of each ``tasks`` element that are modified by this routine: - `lead` : int, lead index - `mode` : int, scattering mode index - `energy` : float, energy of the onebody state - `momentum` : float, momentum of the onebody state; None if unknown - `weight` : float, weighting factor of the one-body state in the manybody sum (weight = math_weight * phys_weight) - `math_weight` : float, mathematical weighting factor - `phys_weight` : float, physical weighting factor Tasks are ordered as ``intervals`` and for each interval by increasing energy. Notes ----- Momentum values in ``tasks`` are only present if integration is performed over momentum (``intervals.integration_variable`` == 'momentum'). """ if not isinstance(intervals, intervals = [intervals] if not isinstance(spectra, spectra = [spectra] if not isinstance(occupations, occupations = [occupations] if keys is None: keys = itertools.count() uniform_occupation = False if len(occupations) == 1: logger.debug('occupation in all leads assumed identical for task calculation') uniform_occupation = True if not uniform_occupation and len(occupations) != len(spectra): raise ValueError('Occupation and spectra must have the same length.') assert _common.is_type(tol, 'real_number') assert tol > 0 tasks = {} for interval in intervals: if not isinstance(interval.lead, leads = [interval.lead] else: leads = interval.lead for lead in leads: assert _common.is_type(lead, 'integer') spectrum = spectra[lead] if uniform_occupation: occupation = occupations[0] else: occupation = occupations[lead] if occupation: logger.debug('calc quadrature weights for lead={}'.format(lead)) modes_and_weights = _calc_modes_and_weights(interval, occupation.distribution, spectrum) for (mode, energy, momentum, weight, math_weight, phys_weight) in zip(*modes_and_weights): weight_zero = _common.is_zero(weight, tol).all() mode_not_open = (mode == -1) if mode_not_open and not weight_zero: msg = ('no open modes were found for energy={}, lead={}, ' 'but the modes weight is {}, ' 'which is larger than numerical tolerance={}.' .format(energy, lead, weight, tol)) logger.warning(msg) if not (weight_zero or mode_not_open): tasks[next(keys)] = task_type(lead=lead, mode=mode, energy=energy, momentum=momentum, weight=weight, math_weight=math_weight, phys_weight=phys_weight) logger.debug('number of tasks={}'.format(len(tasks))) for task in tasks.items(): logger.debug('key={}, {}'.format(*task)) return tasks
def absolute_error(result): r"""Absolute error for the solver result with `quadrature="kronrod"` Parameters ---------- result : `~numpy.ndarray` Output from the manybody solver with Gauss-Kronrod quadrature Returns ------- error : float error estimate :math:`\delta = |I_n - I_{2 n+1}|`, where :math:`I_n` is the integral estimate with order *n*. """ low_order, high_order = result return np.abs(low_order - high_order) def _calc_modes_and_weights(interval, distribution, spectrum): """Return mode indices, energies and weights on an integration interval. Parameters ---------- interval : `tkwant.manybody.Interval` momentum interval and quadrature rule on that interval. Each ``interval`` must have at least the following attributes: - `band` : int, band index (*n*) - `kmin` : float, lower momentum bound - `kmax` : float, upper momentum bound, must be larger `kmin` - `order` : int, quadrature order - `quadrature` : string, quadrature rule to use. - `integration_variable` : string, energy vs. momentum integration distribution : callable distribution function of one lead. Calling signature: `(energy)`. spectrum : `kwantspectrum.spectrum` energy dispersion :math:`E_n(k)` of one lead. Returns ------- modes : `~numpy.ndarray` of int, shape `(n, )` mode number (kwant convention) energies : `~numpy.ndarray` of float, shape `(n, )` mode energies momenta : `~numpy.ndarray` of float or `None`, shape `(n, )` momenta of the nodes. only present if `integration_variable` is 'momentum', otherwise all values are `None`. weights : `~numpy.ndarray` of float shape `(n, )` for simple quadratures or shape `(n, 2)` for Gauss-Kronrod quadrature weight of the corresponding onebody-state in the manybody average ``weights`` = ``math_weights`` * ``phys_weights`` math_weights : `~numpy.ndarray` of float mathematical quadrature weights shape `(n, )` for simple quadratures or shape `(n, 2)` for Gauss-Kronrod quadrature phys_weights : `~numpy.ndarray` of float, shape `(n, )` physical weighting factor (fermi function, pi factors..) Notes ----- The size `n` in above arrays depends on the quadrature: For Gauss-Legendre: n = interval.order. For Gauss-Kronrod: n = 2 * interval.order + 1. """ if interval.integration_variable == "momentum": logger.debug('momentum integration: kmin={0.kmin}, kmax={0.kmax}, ' 'band={}, order={0.order}, quadrature={0.quadrature}' .format(interval)) momenta, math_weights = integration.calc_abscissas_and_weights( a=interval.kmin, b=interval.kmax, n=interval.order, quadrature=interval.quadrature) energies = spectrum(momenta, velocities = spectrum(momenta,, derivative_order=1) modes = np.array([spectrum.momentum_to_scattering_mode(k, for k in momenta]) jacobian = velocities elif interval.integration_variable == "energy": emin = spectrum(interval.kmin, emax = spectrum(interval.kmax, logger.debug('energy integration: emin={0}, emax={1}, kmin={2.kmin},' 'kmax={2.kmax}, band={}, order={2.order}, ' 'quadrature={2.quadrature}'.format(emin, emax, interval)) energies, math_weights = integration.calc_abscissas_and_weights( a=emin, b=emax, n=interval.order, quadrature=interval.quadrature) modes = np.array([spectrum.energy_to_scattering_mode(energy,, interval.kmin, interval.kmax) for energy in energies]) jacobian = 1 momenta = [None] * len(energies) else: raise NotImplementedError('integration_variable= {} not implemented'. format(interval.integration_variable)) if isinstance(distribution, logger.debug('found multivariate distribution function') if len(distribution) != spectrum.nbands: raise ValueError('Multivariate distribution does not match nb of bands.') dist_func = distribution[] else: dist_func = distribution try: phys_weights = jacobian * dist_func(energies) / (2 * np.pi) except TypeError: distribution_ = np.array([dist_func(energy) for energy in energies]) phys_weights = jacobian * distribution_ / (2 * np.pi) weights = math_weights * phys_weights assert _common.is_type_array(modes, 'integer').all() assert _common.is_type_array(energies, 'real_number').all() assert _common.is_type_array(weights, 'real_number').all() assert _common.is_type_array(math_weights, 'real_number').all() assert _common.is_type_array(phys_weights, 'real_number').all() return modes, energies, momenta, weights.T, math_weights.T, phys_weights # manybody solvers
[docs]@log_func def calc_initial_state(syst, tasks, boundaries=None, params=None, scattering_state_type=onebody.ScatteringStates, mpi_distribute=mpi.round_robin, comm=None): """Calculate the initial manybody scattering wave function using MPI. Parameters ---------- syst : `kwant.builder.FiniteSystem` or `tkwant.system.ExtendedSystem` The low level system for which the wave functions are to be calculated. boundaries : sequence of `~tkwant.leads.BoundaryBase`, optional The boundary conditions for each lead attached to ``syst``. Must be only provided if ``syst`` is a `kwant.builder.FiniteSystem`. tasks : sequence of `tkwant.onebody.Task` Each element in the sequence represents a one-body state that composes the manybody state. An element of ``tasks`` must have at least the following three attributes: - `lead` : int, lead index - `mode` : int, scattering mode index - `energy` : float, energy params : dict, optional Extra arguments to pass to the Hamiltonian of ``syst``, excluding time. scattering_state_type : `tkwant.onebody.ScatteringStates`, optional Class to calculate time-dependent onebody wavefunctions starting in an equilibrium scattering state. mpi_distribute : callable, optional Function to distribute the tasks dict keys over all MPI ranks. By default, keys must be integer and are distributed round-robin like. comm : `mpi4py.MPI.Intracomm`, optional The MPI communicator over which to parallelize the computation. By default, use the tkwant global MPI communicator. Returns ------- psi_init : dict of `tkwant.onebody.WaveFunction` Ensemble of all one-body scattering states that form the initial manybody state. Each one-body state stored in the ``psi_init`` dictionary corresponds to an element in the ``tasks`` list. The list index of ``tasks`` serves as dict key. """ def scattering_state(energy, lead): """Calculate a one-body scattering state that can be evolved in time""" logger.debug('calc scattering state: energy={}, lead={}'.format(energy, lead)) try: return scattering_state_type(extended_syst, energy, lead, params=params) except Exception: raise RuntimeError('scattering state calculation failed for ' 'energy={}, lead={}'.format(energy, lead)) if isinstance(syst, kwant.system.System): time_name, time_start, onebody_wavefunction_type = \ onebody._get_time_name_from_wavefunction(derived_type=scattering_state_type) tparams = add_time_to_params(params, time_name=time_name, time=time_start, check_numeric_type=True) solver_type = _common.get_default_function_argument(onebody_wavefunction_type, 'solver_type') extended_syst = onebody.make_extended_system(syst, solver_type, boundaries, tparams) elif isinstance(syst, ExtendedSystem): extended_syst = syst else: raise TypeError('"syst" must be a finalized kwant system ' 'or a "tkwant.system.ExtendedSystem"') comm = mpi.get_communicator(comm) tasks_per_rank = mpi.distribute_dict(tasks, mpi_distribute, comm) return {key: scattering_state(, task.lead)[task.mode] for key, task in tasks_per_rank.items()}
[docs]class WaveFunction: """Evolve a many-particle wavefunction in time.""" def __init__(self, psi_init, tasks, comm=None): """ Initialize the manybody state. Parameters ---------- psi_init : dict of `tkwant.onebody.WaveFunction` Dictionary with all initial one-body states. For load balancing the dictionary should be distributed over all MPI ranks. tasks : dict of `tkwant.onebody.Task` Dictionary containing the weighting factor for each one-body state. Each item must have at least the following attribute: - `weight` : `~numpy.ndarray`, weighting factor ``tasks`` must include all one-body states stored in `psi_init` and must be the same on all MPI ranks. comm : `~mpi4py.MPI.Intracomm`, optional The MPI communicator over which to parallelize the computation. By default, use the tkwant global MPI communicator. """ comm = mpi.get_communicator(comm) self.psi = mpi.DistributedDict(psi_init, comm) self.tasks = tasks self.comm = comm self.time = 0
[docs] def evolve(self, time): """ Evolve all wavefunctions up to ``time``. Parameters ---------- time : int or float time argument up to which the solver should be evolved """ for onebody_psi in self.psi.local_data().values(): onebody_psi.evolve(time) self.time = time
[docs] def evaluate(self, observable, root=0): """Evaluate the expectation value of an operator at the current time. Parameters ---------- observable : callable or `kwant.operator` An operator to evaluate the expectation value. Must have the calling signature of `kwant.operator`. root : int or None, optional MPI return rank on which to return the result. If ``root`` is an integer, it must be in the range of valid MPI ranks ``0 <= root < self.comm.size``. In that case, the calculated result is returned only on that specific MPI rank where ``rank == root``, whereas the result is `None` on all other MPI ranks with ``rank != root``. Alternatively, if ``root`` is `None`, the calculated result is returned on all MPI ranks. By default, the result is returned on MPI rank zero only. Returns ------- result : `~numpy.ndarray` The expectation value of ``observable``, integrated over all occupied bands. The result might not be returned on all MPI ranks; note the explanation above for input parameter ``root``. Notes ----- Returning the result on all MPI ranks (by setting ``root=None``), might be slower, as an additional broadcast step is needed. """ return self._calc_expectation_value(observable, root=root)
[docs] def get_onebody_state(self, key, root=0): """ Get the onebody wavefunction corresponding to its key. Parameters ---------- key : dict key Idendentifier key of the state. root : int or None, optional MPI return rank on which to return the state. If ``root`` is an integer, it must be in the range of valid MPI ranks ``0 <= root < self.comm.size``. In that case, the state is returned only on that specific MPI rank where ``rank == root``, whereas the result is `None` on all other MPI ranks with ``rank != root``. Alternatively, if ``root`` is `None`, the calculated state is returned on all MPI ranks. By default, the state is returned on MPI rank zero only. Returns ------- state : `tkwant.onebody.WaveFunction` onebody wavefunction """ return, to_rank=root)
[docs] def add_onebody_state(self, state, task, key=None, rank=None): """Add a onebody wavefunction to the manybody state. Parameters ---------- state : `tkwant.onebody.WaveFunction` onebody wavefunction task : `tkwant.onebody.Task` Task info of the state. ``task`` must contain at least the weight factors of the onebody states in the manybody average as attribute: - `weight` : `~numpy.ndarray`, weighting factor key : dict key, optional Idendentifier of the state. A free key is attributed if not present. rank : int, optional MPI rank where the state should be added. Returns ------- key : dict key Idendentifier of the added onebody state. Notes ----- The state must be present on all MPI ranks. """ if key is None: key = self.get_free_key() self.psi.add(key, state, rank) self.tasks[key] = task return key
[docs] def add_distributed_onebody_states(self, states, tasks): """Add several onebody wavefunctions to the manybody state. Parameters ---------- state : dict of `tkwant.onebody.WaveFunction` Dictionary of onebody wavefunctions to be added task : dict of `tkwant.onebody.Task` Task info of the states. Each ``tasks`` item must contain at least the weight factors of the onebody states in the manybody average as attribute: - `weight` : `~numpy.ndarray`, weighting factor Notes ----- The states present in the states dictionary can be distributed over all MPI ranks. Each single onebody state must be unique and not dublicated on any other MPI rank. """ for key, psi in states.items(): self.psi.add(key, psi, rank=self.comm.rank, check=False) assert self.psi.keys_are_unique() self.tasks.update(tasks)
[docs] def delete_onebody_state(self, key): """Delete a onebody wavefunction corresponding to its key. Parameters ---------- key : dict key Idendentifier of the onebody state that should be deleted. """ self.psi.delete(key) del self.tasks[key]
[docs] def get_keys(self): """Get the keys of all onebody wavefunctions forming the manybody state. Returns ------- keys : list List of all state identifier keys present in the solver. """ return list(self.tasks.keys())
[docs] def get_free_key(self): """Get a new free key. Returns ------- key : int New unused key. All numbers larger then the returned key are also valid empty keys. """ try: next_key = max((i for i in self.get_keys() if _common.is_type(i, 'integer'))) + 1 except ValueError: next_key = 0 return next_key
[docs] def weight_shape(self): """Get the shape of the weighting factor. Returns ------- shape : tuple Shape of the weight factor array. """ key = self.get_keys()[0] weight = self.tasks[key].weight return weight.shape
[docs] def add_perturbation(self, qt): """ set perturbation """ for onebody_psi in self.psi.local_data().values(): onebody_psi.add_perturbation(qt)
def _check_consistency(self): """Check if keys for the wave functions consistent in itself and with the tasks. Returns ------- consistent : bool Returns `True` if all keys are unique and if the keys in task list are also present in the state dictionary. """ keys_unique = self.psi.keys_are_unique() keys_from_psi = list(self.psi.keys()) keys_from_tasks = list(self.tasks.keys()) return keys_unique and set(keys_from_psi) == set(keys_from_tasks) def _calc_expectation_value(self, observable, keys=None, root=0, return_integrand=False): """Calculate the expectation value of an observable at a given time. Parameters ---------- observable : callable or `kwant.operator` An operator to evaluate the expectation value. Must have the calling signature of `kwant.operator`. keys : list of dict keys, optional If present, the manybody sum is calculated only over these states. root : int or None, optional MPI return rank on which to return ``result``. If ``root`` is an integer, it must be in the range of valid MPI ranks ``0 <= root < self.comm.size``. In that case, the calculated ``result`` is returned only on that specific MPI rank where ``rank == root``, whereas the ``result`` is `None` on all other MPI ranks with ``rank != root``. Alternatively, if ``root`` is `None`, the calculated result is returned on all MPI ranks. By default, ``result`` is returned on MPI rank zero only. return_integrand : bool, optional If `True` return ``integrand`` in addition. Returns ------- result : `~numpy.ndarray` The expectation value of ``observable``. integrand : dict, optional The integrand of ``result``. Similar keys as the states. The integrand is distributed over all MPI ranks, ordering similar to the corresponding states. Only returned if ``return_integrand`` is `True`. Notes ----- Returning the result on all MPI ranks (by setting ``root=None``), might be slower, as an additional broadcast step is needed. """ if keys is None: keys = self.psi.local_keys() else: # intersection between given keys and keys on MPI rank keys = list(set(keys) & set(self.psi.local_keys())) integral = 0 # local integral part on mpi process # calculate weighted sum (partly on every rank) if return_integrand: integrand = {} for key in keys: expect = self.psi.local_data()[key].evaluate(observable) weight = self.tasks[key].weight phys_weight = self.tasks[key].phys_weight integral += np.outer(weight, expect) integrand[key] = phys_weight * expect else: for key in keys: expect = self.psi.local_data()[key].evaluate(observable) weight = self.tasks[key].weight integral += np.outer(weight, expect) if root is None: result = self.comm.allreduce(integral) else: result = self.comm.reduce(integral, root=root) result = np.squeeze(result) if return_integrand: return result, integrand return result
def _find_min_max_range(x): """Obtain min/max from a sequence of ranges. Each range is a tuple (x0, x1), with x0 <= x1, a value of None for x0 is interpreted as -inf, for x1 as +inf. """ lower, upper = zip(*x) if None in lower: lower = None else: lower = min(lower) if None in upper: upper = None else: upper = max(upper) return lower, upper
[docs]@log_func def calc_energy_cutoffs(occupations): """Extract upper and lower energy cutoffs from the lead occupations. We give an example how this routine works: Let be `energy_range` = [(None, 1), (2, 3)] for one lead, where each tuple has a meaning of an energy interval. For this lead, the largest energy interval, including all these intervals is (None, 3). Note that None as the first tuple elements is interpreted as - infinity. The same is done now for all elements of the ``occupations`` sequence, which represent the leads. The ``(emin, emax)`` values returned by this routine is the largest interval, that contain all energy intervals of the leads. If a lead is not present, the corresponding element of the ``occupations`` sequence must be None, such that it does contribute. Pay attention that None has double meaning: As an energy interval (None, 0) or (0, None) it is interpreted as - infinity, respectively + infinity. For the ``occupations`` sequence, None means absence, such that emin/emax are not changed by the corresponding lead. If all lead elements are None, ``(None, None)`` is returned. Parameters ---------- occupation : `tkwant.manybody.Occupation` or sequence thereof Lead occupation, see `lead_occupation` for details. If a lead is not occupied, the corresponding element must be set to None. Otherwise, each element of the ``occupations`` sequence must have at least the following attribute: - `energy_range` : energy integration range (see `lead_occupation`) Returns ------- emin : float or None Lower energy cutoff, None means a cutoff of - infinity. emin : float or None Upper energy cutoff, None means a cutoff of + infinity. """ lower_energy_cutoff_lead = [] upper_energy_cutoff_lead = [] if not isinstance(occupations, occupations = [occupations] for occup in occupations: # loop over all occupied leads if occup is not None: if occup.multivariate: # energy range given for each band for energy_range in occup.energy_range: lower, upper = _find_min_max_range(energy_range) lower_energy_cutoff_lead.append(lower) upper_energy_cutoff_lead.append(upper) else: # global energy range given for all bands lower, upper = _find_min_max_range(occup.energy_range) lower_energy_cutoff_lead.append(lower) upper_energy_cutoff_lead.append(upper) # only occupied leads contribute to the cutoff if None in lower_energy_cutoff_lead or not lower_energy_cutoff_lead: lower = None else: lower = min(lower_energy_cutoff_lead) if None in upper_energy_cutoff_lead or not upper_energy_cutoff_lead: upper = None else: upper = max(upper_energy_cutoff_lead) return lower, upper
[docs]class State: """Solve the time-dependent many-particle Schrödinger equation.""" def __init__(self, syst, tmax=None, occupations=None, params=None, spectra=None, boundaries=None, intervals=Interval, refine=True, combine=False, error_op=None, scattering_state_type=onebody.ScatteringStates, manybody_wavefunction_type=WaveFunction, mpi_distribute=mpi.round_robin, comm=None): r""" Parameters ---------- syst : `kwant.builder.FiniteSystem` or `tkwant.system.ExtendedSystem` The low level system for which the wave functions are to be calculated. tmax : float, optional The maximum time up to which to simulate. Sets the boundary conditions such that they are accurate up to ``tmax``. Must be set if ``boundaries`` are not provided. Mutually exclusive with `boundaries`. occupations : `tkwant.manybody.Occupation` or sequence thereof, optional Lead occupation. By default (or if ``occupations`` is set to `False`), all leads are taken into account and are considered as equally occupied with: chemical potential :math:`\mu = 0`, temperature :math:`T = 0` and the non-interacting Fermi-Dirac distribution as distribution function :math:`f(E)`. To change the default values (:math:`\mu, T, f(E)`), a `tkwant.manybody.Occupation` instance is precalculated with `tkwant.manybody.lead_occupation` and passed as ``occupations`` argument. If ``occupations`` is only one element, respectively a sequence with only one element, (:math:`\mu, T, f(E)`) will be identical in each lead. In the most general case, if (:math:`\mu, T, f(E)`) is different for each lead, ``occupations`` must be a sequence of `tkwant.manybody.Occupation` instances with an ordering similar to ``syst.leads``. In that case, ``occupations`` must have the same length as ``syst.leads``, respectively ``spectra``. A lead is not considered, if the corresponding ``occupations`` element is set to `False`. Otherwise, for a lead to be considered, an element of the ``occupations`` sequence must have at least the following attributes: - `energy_range` : energy integration range - `bands` : int or list of int, bands (*n*) to be considered, all bands considered if `None` - `distribution` : callable, distribution function. Calling signature: `(energy)`. params : dict, optional Extra arguments to pass to the Hamiltonian of ``syst``, excluding time. spectra : sequence of `~kwantspectrum.spectrum`, optional Energy dispersion :math:`E_n(k)` for the leads. Must have the same length as ``syst.leads``. If needed but not present, it will be calculated on the fly from `syst.leads`. boundaries : sequence of `~tkwant.leads.BoundaryBase`, optional The boundary conditions for each lead attached to ``syst``. Must have the same length as ``syst.leads``. Mutually exclusive with ``tmax``. intervals : `tkwant.manybody.Interval` sequence or class, optional Momentum intervals and quadrature rules on these intervals. If ``intervals`` is a sequence, it represents the momentum intervals. In that case, initial integration intervals are not calculated from ``occupations`` but ``intervals`` is used instead. Each element of the ``intervals`` sequence must have at least the following attributes: - `lead` : int, lead index - `band` : int, band index (*n*) - `kmin` : float, lower momentum bound - `kmax` : float, upper momentum bound, must be larger `kmin` - `integration_variable` : string, energy vs. momentum integration - `order` : int, quadrature order - `quadrature` : string, quadrature rule to use. See `tkwant.integration.calc_abscissas_and_weights` If ``intervals`` is a (data) class, it is passed to the `tkwant.manybody.calc_intervals` routine as ``Interval`` argument. By default, intervals are calculated from `tkwant.manybody.calc_intervals` and `tkwant.manybody.Interval`. refine : bool, optional If `True`, intervals are refined at the initial time. combine : bool, optional If `True`, intervals are grouped by lead indices. error_op : callable or `kwant.operator`, optional Observable used for the quadrature error estimate. Must have the calling signature of `kwant.operator`. Default: Error estimate with density expectation value. scattering_state_type : `tkwant.onebody.ScatteringStates`, optional Class to calculate time-dependent onebody wavefunctions starting in an equilibrium scattering state. Name of the time argument and initial time are taken from this class. If this is not possible, default values are used as a fallback. manybody_wavefunction_type : `tkwant.manybody.WaveFunction`, optional Class to evolve a many-particle wavefunction in time. mpi_distribute : callable, optional Function to distribute the tasks dict keys over all MPI ranks. By default, keys must be integer and are distributed round-robin like. comm : `mpi4py.MPI.Intracomm`, optional The MPI communicator over which to parallelize the computation. Notes ----- The name of the time argument (`time_name`) and the initial time of the evolution (`time_start`) are taken from the default values of the `scattering_state_type.__init__` method. Changing the default values by partial prebind (e.g. via functools) is possible. """'initialize manybody.State') if isinstance(syst, kwant.system.System): extended_syst = None if tmax is None and boundaries is None: raise ValueError("'boundaries' or 'tmax' must be provided") elif isinstance(syst, ExtendedSystem): if boundaries is not None: logger.warning('boundaries will not be used for extended system') extended_syst = syst syst = extended_syst.syst else: raise TypeError('"syst" must be a finalized kwant system ' 'or a "tkwant.system.ExtendedSystem"') if tmax is not None and boundaries is not None: raise ValueError("'boundaries' and 'tmax' are mutually exclusive.") # get initial time and time argument name from the onebody wavefunction time_name, time_start, onebody_wavefunction_type = \ onebody._get_time_name_from_wavefunction(derived_type=scattering_state_type) # add initial time to the params dict tparams = add_time_to_params(params, time_name=time_name, time=time_start, check_numeric_type=True) if spectra is None: spectra = kwantspectrum.spectra(syst.leads, params=tparams) if occupations is None: occupations = [lead_occupation()] if not isinstance(occupations, logger.debug('occupation in all leads is {}'.format(occupations)) occupations = [occupations] else: for i, occ in enumerate(occupations): logger.debug('occupation in lead={} is {}'.format(i, occ)) if intervals is None: intervals = calc_intervals(spectra, occupations) if combine: intervals = combine_intervals(intervals) else: if not (isinstance(intervals, or isinstance(intervals, Interval)): intervals = calc_intervals(spectra, occupations, interval_type=intervals) if combine: intervals = combine_intervals(intervals) try: len_int = len(intervals) except TypeError: len_int = 0 if len_int == 0: logger.warning('no occupied states found, the chemical potential ' 'is probably wrong.') else:'initial number of intervals={}'.format(len_int)) for interval in intervals: logger.debug(interval) if extended_syst is None: if boundaries is None: emin, emax = calc_energy_cutoffs(occupations) boundaries = leads.automatic_boundary(spectra, tmax, emin=emin, emax=emax) solver_type = _common.get_default_function_argument(onebody_wavefunction_type, 'solver_type') extended_syst = onebody.make_extended_system(syst, solver_type, boundaries, tparams) self.time = time_start self.extended_syst = extended_syst self.spectra = spectra self.occupations = occupations self.mpi_distribute = mpi_distribute self.onebody_wavefunction_type = onebody_wavefunction_type self.scattering_state_type = scattering_state_type # no public params attribute exists for the manybody state. # each individual one-body state holds its own parameters # the private params should be only used to create new inital # onebody states. self._params = params self._tasks_from_interval = {} tasks = {} offset = 0 for interval in intervals: tasks_ = self._calc_tasks(interval, offset) if tasks_: offset += len(tasks_) tasks.update(tasks_) self.comm = mpi.get_communicator(comm) psi_init = self._calc_initial_state(tasks, self.comm) self.manybody_wavefunction = manybody_wavefunction_type(psi_init, tasks, self.comm) # by default, return the result of the higher-order rule, which in # our convention has to be the last element of the weight array self.return_element = -1 if error_op is None:'set default error estimate based on density') error_op = kwant.operator.Density(syst) else:'set error estimate based on user given operator') self.error_op = error_op if refine: self.refine_intervals()'manybody.State initialization done') def _calc_tasks(self, interval, offset=0, tol=1E-10): """Evaluate all tasks corresponding to an interval and store them. Parameters ---------- interval : `tkwant.manybody.Interval` Interval offset : int, optional First key value of the dict. tol : float, optional Numerical tolerance to remove tasks. Returns ------- tasks : dict Dict with all tasks (states) for the interval. Dict keys are incremented by one starting at `offset`. """ tasks = calc_tasks(interval, self.spectra, self.occupations, keys=itertools.count(offset), tol=tol) if tasks: self._tasks_from_interval[interval] = tasks else: logger.warning('no tasks for given interval={}'.format(interval)) return tasks def _calc_initial_state(self, tasks, comm): """Calculate the initial manybody state for all tasks. Parameters ---------- tasks : dict Dict with all tasks. Each item represents a one-body state that composes the manybody state. comm : `mpi4py.MPI.Intracomm` Returns ------- psi_init : dict Ensemble of all one-body scattering states that form the initial manybody state. """ return calc_initial_state(self.extended_syst, tasks, params=self._params, scattering_state_type=self.scattering_state_type, mpi_distribute=self.mpi_distribute, comm=comm) def _get_keys_from_interval(self, interval): """Return a list of all keys corresponding to an interval. Parameters ---------- interval : `tkwant.manybody.Interval` Interval Returns ------- keys : list List of all keys representing the states of the interval """ tasks = self._tasks_from_interval[interval] return list(tasks.keys()) def _add_interval(self, interval, tasks=None, tol=1E-10): """Add all one-body states and tasks corresponding to an interval. Parameters ---------- interval : `tkwant.manybody.Interval` Integration interval that should be added to the solver tasks : dict, optional Dict with all tasks corresponding to the interval. tol : float, optional Numerical tolerance to remove tasks. """ if tasks is None: next_free_key = self.manybody_wavefunction.get_free_key() tasks = self._calc_tasks(interval, offset=next_free_key, tol=tol) else: self._tasks_from_interval[interval] = tasks psi_init = self._calc_initial_state(tasks, comm=self.comm) self.manybody_wavefunction.add_distributed_onebody_states(psi_init, tasks) def _remove_interval(self, interval): """Remove all one-body states and tasks corresponding to an interval. Parameters ---------- interval : `tkwant.manybody.Interval` Interval that should be removed from the solver """ for key in self._get_keys_from_interval(interval): self.manybody_wavefunction.delete_onebody_state(key) del self._tasks_from_interval[interval] def _evaluate_interval(self, interval, operator, root=None, return_integrand=False): """Evaluate the statistical average over the given interval. Parameters ---------- interval : `tkwant.manybody.Interval` Interval over which the manybody sum should be calculated. operator : callable or `kwant.operator` Observable to be calculated root : int or None, optional root receive the result, other rank receive None. If root is None all ranks receive the result. return_integrand : bool, optional If true, return also the integrand of the integral. Returns ------- result : `~numpy.ndarray` The expectation value of ``operator``, integrated over the interval. integrand : dict, optional The integrand to the integral. Only returned if ``return_integrand`` it true. """ keys = self._get_keys_from_interval(interval) return self.manybody_wavefunction._calc_expectation_value(operator, keys=keys, root=root, return_integrand=return_integrand)
[docs] def calc_integrand(self, operator, interval, root=0): """Return the integrand and the corresponding abscissa values on an interval. This routine is mainly for debugging purpose. Parameters ---------- operator : callable or `kwant.operator` Observable to be calculated interval : `tkwant.manybody.Interval` Interval over which the manybody sum should be calculated. Must be present in the solver. root : int or None, optional root receive the result, other rank receive None. If root is None all ranks receive the result. Returns ------- abscissa : `~numpy.ndarray` The integration points (abcissa) of the quadrature. Corresponds to energy values if integration is performed over energy (``interval.integration_variable`` == 'energy') or momentum (if ``interval.integration_variable`` == 'momentum'). integrand : `~numpy.ndarray` The expectation value of ``operator`` on all abcissa points. """ self.evolve(time=self.time) # make sure interval is at current time keys = self._get_keys_from_interval(interval) tasks = self._tasks_from_interval[interval] abscissa = np.array([getattr(tasks[key], interval.integration_variable) for key in keys]) _, integrand_per_rank = self.manybody_wavefunction._calc_expectation_value(operator, keys=keys, root=root, return_integrand=True) integrand_ = mpi.DistributedDict(integrand_per_rank, self.comm) integrand = np.array([ for key in keys]) return abscissa, integrand
[docs] def get_intervals(self): """Return a list of all intervals stored in the solver. Returns ------- intervals : list List of all momentum intervals stored in the solver. """ # without copy we may modify the state of the saved intervals intervals = copy.deepcopy(list(self._tasks_from_interval.keys())) return sorted(intervals, key=lambda x: (x.lead,, x.kmin))
[docs] def refine_intervals(self, atol=1E-5, rtol=1E-5, limit=2000, error_op=None, intervals=None): r"""Refine intervals until the quadrature error is below tolerance. Parameters ---------- atol : float, optional Absolute accuracy requested. rtol : float, optional Relative accuracy requested. limit : integer, optional Maximum number of intervals stored in the solver. A warning is raised and the refinement stops if limit is reached. error_op : callable or `kwant.operator`, optional Observable used for the quadrature error estimate. Must have the calling signature of `kwant.operator`. Default: ``error_op`` from initialization. intervals : sequence of `tkwant.manybody.Interval`, optional Apply the refinement process only to the (Gauss-Kronrod) intervals given in the sequence. Note that in this case, all intervals must be present in the solver already. By default, the refinement is done on all Gauss-Kronrod intervals stored in the solver. Returns ------- abserr : float Estimate of the modulus of the absolute error, which should equal or exceed abs(i-result), where i is the exact integral value. If ``error_op`` has an array-like output, we report the maximal value of the error. (sum ``errors`` over all intervals and take the maximum element). intervals : list of `tkwant.manybody.Interval` All subintervals *J* taken into accound. Intervals are ordered according to `errors`. errors : `~numpy.ndarray` of floats Error estimates *E(J)* on the intervals in descending order. If ``error_op`` has an array-like output, the error is returned on all array points. The shape of ``errors`` is like ``error_op`` (its expectation value) with an additional first dimension for the interval index. Notes ----- This routine implements a globally adaptive strategy based on quadpacks QAG algorithm [1]_. It attemps to reduce the absolute error *abserr* by subdividing (bisecting) the interval with the largest error estimate. *result* corresponds to the quadrature estimate of the integral :math:`\int_a^b f(x) dx`: :math:`result = \sum_{J \in \mathcal{P}[a, b]} Q_f(J)`. Here *J* is a subinterval of the *[a, b]* interval and :math:`Q_f(J)` is a quadrature rule applied on function *f(x)* on interval *J*. In this algorithm, a *(2*n + 1)* Kronrod estimate is used to calculate *Q*. *abserr* corresponds to the sum of errors: :math:`abserr = \sum_{J \in \mathcal{P}[a, b]} E_f(J)`. We use the quadpack error estimate, described in method `_error_estimate_quadpack`, for :math:`E_f(J)`. If the refinement procedure is successful, the following inequality holds: :math:`abserr \leq \max \left\{ atol, rtol \cdot result \right\}` In the physical sense, the integral we estimate is the manybody integral :math:`\langle \hat{A}_{ij}(t) \rangle = \sum_{\alpha} \int_{- \pi}^{\pi} \frac{dk}{2 \pi} v_{\alpha}(k) \theta(v_{\alpha}(k)) f_\alpha(k) [\psi_{\alpha, k}(t)]_i \hat{A} [\psi^\dagger_{\alpha, k}(t)]_j` where :math:`\hat{A}` corresponds to the ``error_op`` and the error is estimated for the expectation value :math:`\langle \hat{A}_{ij}(t) \rangle`. Note that above inequality condition must be fulfilled on each site *i* and *j* individually. This is the case if ``error_op`` generates an array-like output. However, the inequality condition must be fulfilled only at the current time *t* of the solver. Moreover, note that only intervals stored in the solver are refined. One-body states, that are not part of an interval, are not altered by this method. .. [1] Piessens, R., de Doncker-Kapenga, E., Ueberhuber, C. W., and Kahaner, D. K., QUADPACK A Subroutine Package for Automatic Integration, Springer-Verlag, Berlin, (1983). """ assert _common.is_type(atol, 'real_number') if atol < 0: raise ValueError('atol={} is negative.'.format(atol)) assert _common.is_type(rtol, 'real_number') if rtol < 0: raise ValueError('rtol={} is negative.'.format(rtol)) assert _common.is_type(limit, 'integer') if limit <= 1: raise ValueError('limit={} must be > 1.'.format(limit)) tol = min(1E-14, 0.5 * (atol + rtol)) if error_op is None: error_op = self.error_op def observable_with_error(_intervals): observable = [] errors = [] for interval in _intervals: error, kronrod = self._error_estimate_quadpack(interval, error_op, return_estimate=True, tol=0.5 * (atol + rtol)) observable.append(kronrod) errors.append(error) return np.array(observable), np.array(errors) if intervals is None: intervals = self.get_intervals() else: intervals = copy.deepcopy(intervals) if not isinstance(intervals, intervals = [intervals] # refine only intervals with Gauss-Kronrod quadrature intervals[:] = [interval for interval in intervals if interval.quadrature == 'kronrod'] results, errors = observable_with_error(intervals) result = np.sum(results, axis=0) # sum of the integrals over the subintervals errsum = np.sum(errors, axis=0) # sum of the errors over the subintervals i = 0 while True: # loop until converged errbnd = np.maximum(atol, rtol * np.abs(result)) # requested accuracy # order all intervals by error, decreasing order try: max_error_per_interval = np.max(errors - errbnd[np.newaxis, :], axis=1) except: max_error_per_interval = errors - errbnd error_idx = np.argsort(max_error_per_interval)[::-1] intervals = [intervals[i] for i in error_idx] results = results[error_idx] errors = errors[error_idx] max_error_per_interval = max_error_per_interval[error_idx] errmax = errors[0] resultmax = results[0]'refinement step={}, time={}, max errsum={}, ' 'min errbnd={}, total number of intervals={}'. format(i, self.time, np.max(errsum), np.min(errbnd), len(intervals))) for j, (inte, err) in enumerate(zip(intervals, errors)): logger.debug("interval_num={}, {}, max error={}".format(j, inte, np.max(err))) if (errsum <= errbnd).all(): # converged'refinement converged') break if len(intervals) >= limit: logger.warning('maximum number of intervals reached') break # bisect the interval with the largest error interval_largest_error = intervals.pop(0) new_intervals = _split_interval(interval_largest_error, 2) intervals += new_intervals # update the intervals stored in the solver, evolve to current time self._remove_interval(interval_largest_error) for interval in new_intervals: self._add_interval(interval, tol=tol) self.evolve(time=self.time) # recalculate the error and observable estimate new_results, new_errors = observable_with_error(new_intervals) results = np.append(results, new_results, axis=0) errors = np.append(errors, new_errors, axis=0) results = np.delete(results, 0, axis=0) errors = np.delete(errors, 0, axis=0) result = result + new_results[0] + new_results[1] - resultmax errsum = errsum + new_errors[0] + new_errors[1] - errmax i += 1 return np.max(errsum), intervals, errors
def _error_estimate_quadpack(self, interval, error_op, return_estimate=False, tol=1e-5): r"""Error estimate for an integration quadrature. Parameters ---------- interval : `tkwant.manybody.Interval` Integration interval with momentum boundaries *[a,b]*. The attribute `interval.quadrature` attribute must be "kronrod". error_op : callable or `kwant.operator` Observable used for the quadrature error estimate. Must have the calling signature of `kwant.operator`. return_estimate : bool, optional If true, return also the expectation value calculated with ``error_op``. It corresponds to the Kronrod :math:`K_{2n + 1}` estimate of the integral. tol : float, optional Value to use instead of the |error_op| value when the later one becomes exactly zero to avoid a divide by zero error. This happens for instance when a site is completely deconnected from the rest and only switched on with time. Returns ------- error : `~numpy.ndarray` of floats The quadrature error :math:`\varepsilon` estimated for the expectation value of the error_op. The output array has the same shape as evaluating ``error_op``. kronrod : `~numpy.ndarray` of floats, optional The expectation value of the error operator. Only returned if ``return_estimate`` is true. Notes ----- An interval that is passed as argument via ``intervals`` must already be present in the solver, othewise a ``KeyError`` is raised. The error estimate is based on Quadpacks algorithm QAG. It is :math:`\varepsilon = \tilde{I} \text{min} \bigl\{1, (200 \frac{G_n[a, b] - K_{2n + 1}[a, b]}{\tilde{I}})^{3/2} \bigr\}`, with :math:`\tilde{I} = \int_a^b |f(x) - \frac{K_{2n + 1}[a, b]}{b - a} |dx` where :math:`G_n[a, b]` is the Gauss and :math:`K_{2n + 1}[a, b]` the corresponding Kronrod estimate of the integral of *f(x)* over interval *[a, b]*. In our case, *f(x)* corresponds to the expectation value of the ``error_op`` at the current time of the solver. If the expectation value is an array, above expression for the error is evaluated point-wise on that array. .. [1] Piessens, R., de Doncker-Kapenga, E., Ueberhuber, C. W., and Kahaner, D. K., QUADPACK A Subroutine Package for Automatic Integration, Springer-Verlag, Berlin, (1983). .. [2] Gonnet, P., A Review of Error Estimation in Adaptive Quadrature, ACM Computing Surveys, Vol. 44, No. 4, Article 22, (2012). """ if not interval.quadrature == 'kronrod': raise ValueError('quadpack error estimate works only on ' 'Gauss-Kronrod quadrature intervals') (gauss, kronrod), func = self._evaluate_interval(interval, error_op, return_integrand=True) dk = interval.kmax - interval.kmin assert dk > 0 assert tol > 0 kronrod_scaled = kronrod / dk integral = 0 for key, f in func.items(): kronrod_weight = self.manybody_wavefunction.tasks[key].math_weight[1] integral += kronrod_weight * np.abs(f - kronrod_scaled) ik = self.comm.allreduce(integral) ik = np.where(ik > 0, ik, tol) # ik can become exactly zero assert (ik > 0).all() tmp = 200 * np.abs(gauss - kronrod) / ik error = np.squeeze(ik * np.minimum(1, tmp * np.sqrt(tmp))) if return_estimate: return error, kronrod return error
[docs] def estimate_error(self, intervals=None, error_op=None, estimate=None, full_output=False): r"""Estimate the numerical error of the integration quadrature. Parameters ---------- intervals : `tkwant.manybody.Interval` or sequence thereof, optional If present, the error is estimated on given intervals :math:`I_n`, otherwise the total error over all integrals is evaluated. error_op : callable or `kwant.operator`, optional Observable used for the quadrature error estimate. Must have the calling signature of `kwant.operator`. Default: ``error_op`` from initialization. estimate : callable, optional Function to estimate an error on an interval :math:`I_n`. Default: `_error_estimate_quadpack`, which estimates *abserr* of QUADPACK (see ``refine_intervals()``) full_output : bool, optional If the expectation value of ``error_op`` is an array and if ``full_output`` is true, then the error is estimated on each point of the array. By default, we only return the maximum value of the array error. Returns ------- error : `~numpy.ndarray` of floats Error estimate of the momentum (energy) quadrature. - Evaluating this function without an argument for ``interval`` is a measure of the total error and evaluates ``error`` = :math:`max(\sum_n^N I_n)`, where the sum runs over all *N* interval errors :math:`I_n` stored in the solver. - Evaluating this function on a sequence of intervals passed as argument via ``interval``, ``error`` is a sequence and the elements are :math:`max(I_n)`. (first index for intervals) The individual interval errors do NOT sum up to the total error mentioned above. This is due to the triangle inequality :math:`max(\sum_n^N I_n) \leq \sum_n^N max(I_n)`. - If ``full_output`` is true, no *max()* element is taken but the full array-like error of the observable is returned. - By default, the error :math:`I_n` is identical to *abserr* of QUADPACK, see ``refine_intervals()``. Notes ----- An interval that is passed as argument via ``intervals`` must already be present in the solver, othewise a ``KeyError`` is raised. """ if estimate is None: estimate = self._error_estimate_quadpack if error_op is None: error_op = self.error_op sum_up = False if intervals is None: intervals = self.get_intervals() sum_up = True if isinstance(intervals, errors_per_interval = np.array([estimate(interval, error_op) for interval in intervals]) if sum_up: errors = np.sum(errors_per_interval, axis=0) if not full_output: errors = np.max(errors) else: if full_output: errors = errors_per_interval else: errors = np.array([np.max(err) for err in errors_per_interval]) else: if full_output: errors = estimate(intervals, error_op) else: errors = np.max(estimate(intervals, error_op)) return errors
[docs] def evaluate(self, observable, root=0): """Evaluate the expectation value of an operator at the current time. Parameters ---------- observable : callable or `kwant.operator` An operator to evaluate the expectation value. Must have the calling signature of `kwant.operator`. root : int or None, optional MPI return rank on which to return the result. If ``root`` is an integer, it must be in the range of valid MPI ranks ``0 <= root < self.comm.size``. In that case, the calculated result is returned only on that specific MPI rank where ``rank == root``, whereas the result is `None` on all other MPI ranks with ``rank != root``. Alternatively, if ``root`` is `None`, the calculated result is returned on all MPI ranks. Setting ``root`` to `None` is generally slower. By default, the result is returned on MPI rank zero only. Returns ------- result : `~numpy.ndarray` The expectation value of ``observable``, integrated over all occupied bands. For Kronrod-like quadratures, the expectation value corresponding to the higher order rule is returned by default. Set the instance variable `return_element` to `None` or call the `evaluate` method of the base class directly, in order to get both, the higher and the lower order result. The result might not be returned on all MPI ranks; note the explanation above for input parameter ``root``. Notes ----- Returning the result on all MPI ranks (by setting ``root=None``), might be slower, as an additional broadcast step is needed. """ result = self.manybody_wavefunction.evaluate(observable, root) if self.return_element is None: return result try: return result[self.return_element] except Exception: # catch `None` result on all non-MPI root ranks return result
[docs] def evolve(self, time): """ Evolve all wavefunctions up to ``time``. Parameters ---------- time : int or float time argument up to which the solver should be evolved """ self.manybody_wavefunction.evolve(time) self.time = time
[docs] def add_boundstate(self, psi, energy, weight=1): """Add a boundstate to the manybody solver Parameters ---------- psi : `~numpy.ndarray` Projection of the boundstate wavefunction to the central scattering region energy : float Energy of the boundstate weight : float, optional Weight of the boundstate in the discrete sum. Set to one by default. """ key = self.manybody_wavefunction.get_free_key() task = {key: onebody.Task(lead=None, mode=None, energy=energy, weight=weight)} # make psi available on only one MPI rank psi_bs = mpi.distribute_dict({key: psi}, self.mpi_distribute, self.comm) self._add_boundstates(psi_bs, task)
def _add_boundstates(self, boundstate_psi, boundstate_tasks): """Add several boundstates to the manybody solver Parameters ---------- boundstate_psi : dict Dictionary with all boundstate wavefunctions. If a wavefunction is of type `tkwant.onebody.WaveFunction` and has an `evolve()` method, it is not changed. Otherwise, the wavefunction must be a `~numpy.ndarray` array and is taken as the initial value to construct a time dependent boundstate wave function from it. A dict key must be unique and present only on one MPI rank. For load balancing the dictionary should be distributed over all MPI ranks. boundstate_tasks : dict of `tkwant.onebody.Task` Each element in the dict represents a single boundstate state. An element must have at least the following attribute: - `weight` : float, weighting factor - `energy` : float, the energy of the bound state. ``boundstate_tasks`` must include all boundstates stored in ``boundstate_psi`` (all keys must be identical) and must be the same on all MPI ranks. """ # if onebody_wavefunction_type cannot be extracted from # scattering_state_type, this routine fails, do at least some warning if self.onebody_wavefunction_type is None: logger.warning("wavefunction type is None, ", "provided boundstates must be time dependent") make_boundstates_time_dependent(boundstate_psi, boundstate_tasks, self.extended_syst, boundaries=None, params=self._params, onebody_wavefunction=self.onebody_wavefunction_type) add_boundstates(self.manybody_wavefunction, boundstate_psi, boundstate_tasks)
[docs]@log_func def make_boundstates_time_dependent(boundstate_psi, boundstate_tasks, syst, boundaries, params=None, onebody_wavefunction=onebody.WaveFunction.from_kwant): """Make boundstates time dependent, such that they can evolve in time. Parameters ---------- boundstate_psi : dict Dictionary with all boundstate wavefunctions. If a wavefunction is of type `tkwant.onebody.WaveFunction` and has an `evolve()` method, it is not changed. Otherwise, the wavefunction must be a `~numpy.ndarray` array and is taken as the initial value to construct a time dependent boundstate wave function from it. The the original wavefunction is then replaced by the time dependent one in place. For load balancing the dictionary should be distributed over all MPI ranks. boundstate_tasks : dict of `tkwant.onebody.Task` Each element in the dict represents a single boundstate state. An element must have at least the following attribute: - `weight` : float, weighting factor - `energy` : float, the energy of the bound state. ``boundstate_tasks`` must include all boundstates stored in ``boundstate_psi`` (all keys must be identical) and must be the same on all MPI ranks. syst : `kwant.builder.FiniteSystem` The low level system. boundaries : sequence of `~tkwant.leads.BoundaryBase` The boundary conditions for each lead attached to ``syst``. Must have the same length as ``syst.leads``. params : dict, optional Extra arguments to pass to the Hamiltonian of ``syst``, excluding time. onebody_wavefunction : `tkwant.onebody.WaveFunction`, optional Class to evolve a single-particle wavefunction in time. Notes ----- This routine changes ``boundstate_psi`` in place. """ for key, psi in boundstate_psi.items(): if not hasattr(psi, 'evolve'): energy = boundstate_tasks[key].energy boundstate_psi[key] = onebody_wavefunction(psi_init=psi, syst=syst, boundaries=boundaries, energy=energy, params=params)
[docs]@log_func def add_boundstates(solver, boundstate_psi, boundstate_tasks): """Add a sequence of boundstates to the manybody solver Parameters ---------- solver : `tkwant.manybody.WaveFunction` Manybody solver to which the boundstates should be added. boundstate_psi : dict of `tkwant.onebody.WaveFunction` Dictionary with all boundstate wavefunctions. A dict key must be unique and present only on one MPI rank. For load balancing the dictionary should be distributed over all MPI ranks. boundstate_tasks : dict of `tkwant.onebody.Task` Each element in the dict represents a single boundstate state. An element must have at least the following attribute: - `weight` : float, weighting factor ``boundstate_tasks`` must include all boundstates stored in ``boundstate_psi`` (all keys must be identical) and must be the same on all MPI ranks. Moreover, the keys of the boundstates dicts (``boundstate_tasks`` and ``boundstate_psi``) must not be present in ``solver``. Notes ----- ``solver`` is modified in place. Only when the error "boundstate key=.. already present in solver" occurs, ``solver`` has not been modified. If any other exception is raised in this function however, there is no guarantee that the ``solver`` has not been modified. """ solver_keys = solver.get_keys() for b_key in boundstate_tasks.keys(): if b_key in solver_keys: raise KeyError('boundstate key={} already present in solver.' .format(b_key)) weight_shape = np.ones(shape=solver.weight_shape()) tasks = copy.deepcopy(boundstate_tasks) for key, task in tasks.items(): tasks[key].weight = task.weight * weight_shape solver.add_distributed_onebody_states(boundstate_psi, tasks)
@log_func def boundstates_present(syst, tol=1e-4, params=None): """Check if boundstates are present in the system Parameters ---------- syst : `kwant.builder.FiniteSystem` or `tkwant.system.ExtendedSystem` The low level system for which the wave functions are to be calculated. tol : foat, optional Tolerance to check density deviation from one. params : dict, optional Extra arguments to pass to the Hamiltonian of ``syst``, excluding time. """ # fill up all bands up to the highest energy occupations = lead_occupation(chemical_potential=None) state = State(syst, 1, occupations, params=params) # calculate the density at time t=0, should be one everywhere if no boundstates are present density_operator = kwant.operator.Density(syst) density = state.evaluate(density_operator, root=None) density_deviation = density - 1'max density deviation= {}'.format(np.max(np.abs(density_deviation)))) logger.debug('density deviation per site= {}'.format(density_deviation)) return np.any(np.abs(density_deviation) > tol) class ManybodyIntegrand: def __init__(self, syst, interval, operator, time=0, params=None, element=None): """Diagnostic routine to evaluate the integrand in the manybody integral For simplicity, this routine evaluates the integrand only on a single lead and band Parameters ---------- syst : `kwant.builder.FiniteSystem` The low level system for which the wave functions are to be calculated. interval : `tkwant.manybody.Interval` the interval specifying integration region, lead and band operator : callable or `kwant.operator`, optional operator representing the physical observable time : float, optional The time at which the integrand will be calculated. params : dict, optional Extra arguments to pass to the Hamiltonian of ``syst``, excluding time. element: int, optional if the operator has a vector like output, elemnt can be set to select only a specific element. """ tparams = add_time_to_params(params, time_name='time', time=0, check_numeric_type=True) spectra = kwantspectrum.spectra(syst.leads, params=tparams) self.boundaries = leads.automatic_boundary(spectra, tmax=time + 1) self.syst = syst = self.lead = interval.lead self.kmin = interval.kmin self.kmax = interval.kmax self.integration_variable = interval.integration_variable self.time = time self.operator = operator self.params = params self.spectrum = spectra[self.lead] self.element = element self.neval = 0 def func(self, x): """integrand for the manybody expectation value at zero temperature Parameters ---------- x : float integration varable, if ``interval.integration_variable`` == 'energy' x corresponds to the energy, else if ``interval.integration_variable`` == 'momentum' it is interpreted as momentum k. Returns ------- result : `~numpy.ndarray` The evaluated integrand """ self.neval += 1 if self.integration_variable == 'energy': energy = x velocity = 1 # for jacoby determinant mode = self.spectrum.energy_to_scattering_mode(energy,, self.kmin, self.kmax) elif self.integration_variable == 'momentum': energy = self.spectrum(x, velocity = self.spectrum(x,, derivative_order=1) if velocity < 0: msg = ('negative velocity, probably unphysical region: ' 'energy={}, k={}, velocity={}'.format(energy, x, velocity)) logger.warning(msg) mode = self.spectrum.momentum_to_scattering_mode(x, else: raise ValueError('integration_variable = {} not known.' .format(self.integration_variable)) if mode == - 1: # mode is closed msg = ('no open mode for energy={}, k={}, lead={}, band={}, kmin={}, kmax={}' .format(energy, x, self.lead,, self.kmin, self.kmax)) logger.warning(msg) return psi = onebody.ScatteringStates(self.syst, energy=energy, lead=self.lead, params=self.params, boundaries=self.boundaries)[mode] psi.evolve(self.time) result = velocity * psi.evaluate(self.operator) / (2 * np.pi) if result.shape == (1,): # if result is an array with only one element result = result[0] if self.element is None: return result return result[self.element] def vecfunc(self, k): """same as func, but for vector like arguments""" return np.array([self.func(kk) for kk in k])