Source code for pytac.lattice

"""Representation of a lattice object which contains all the elements of the
    machine.
"""
import logging

import numpy

import pytac
from pytac.data_source import DataSourceManager
from pytac.exceptions import (DataSourceException, FieldException,
                              UnitsException, HandleException)


[docs]class Lattice(object): """Representation of a lattice. Represents a lattice object that contains all elements of the ring. It has a name and a control system to be used for unit conversion. **Attributes:** Attributes: name (str): The name of the lattice. symmetry (int): The symmetry of the lattice (the number of cells). .. Private Attributes: _elements (list): The list of all the element objects in the lattice _data_source_manager (DataSourceManager): A class that manages the data sources associated with this lattice. """ def __init__(self, name, symmetry=None): """Args: name (str): The name of the lattice. symmetry (int): The symmetry of the lattice (the number of cells). **Methods:** """ self.name = name self.symmetry = symmetry self._elements = [] self._data_source_manager = DataSourceManager() @property def cell_length(self): """float: The average length of a cell in the lattice. """ if (self.symmetry is None) or (self.get_length() == 0): return None else: return self.get_length() / self.symmetry @property def cell_bounds(self): """list (str): The indexes of elements in which a cell boundary occurs. Examples: A lattice of 5 equal length elements with 2 fold symmetry would return [1, 4, 5] 1 - because it is the start of the first cell. 4 - because it is the first element in the second cell as the boundary between the first and second cells occurs halfway into the length of element 3. 5 - (len(lattice)) because it is the end of the second (last) cell. """ if (self.symmetry is None) or (len(self._elements) == 0): return None else: bounds = [1] for cell in range(2, self.symmetry + 1, 1): for elem in self._elements[bounds[-1]:]: if elem.cell == cell: bounds.append(elem.index) break bounds.append(len(self._elements)) return bounds def __getitem__(self, n): """Get the (n + 1)th element of the lattice. i.e. index 0 represents the first element in the lattice. Args: n (int): index. Returns: Element: indexed element. """ return self._elements[n] def __len__(self): """The number of elements in the lattice. Returns: int: The number of elements in the lattice. """ return len(self._elements)
[docs] def set_data_source(self, data_source, data_source_type): """Add a data source to the lattice. Args: data_source (DataSource): the data source to be set. data_source_type (str): the type of the data source being set pytac.LIVE or pytac.SIM. """ self._data_source_manager.set_data_source(data_source, data_source_type)
[docs] def get_fields(self): """Get the fields defined on the lattice. Includes all fields defined by all data sources. Returns: dict: A dictionary of all the fields defined on the lattice, separated by data source(key). """ return self._data_source_manager.get_fields()
[docs] def add_device(self, field, device, uc): """Add device and unit conversion objects to a given field. A DeviceDataSource must be set before calling this method, this defaults to pytac.LIVE as that is the only data source that currently uses devices. Args: field (str): The key to store the unit conversion and device objects. device (Device): The device object used for this field. uc (UnitConv): The unit conversion object used for this field. Raises: DataSourceException: if no DeviceDataSource is set. """ try: self._data_source_manager.add_device(field, device, uc) except DataSourceException: raise DataSourceException("No device data source on lattice {0}." .format(self))
[docs] def get_device(self, field): """Get the device for the given field. A DeviceDataSource must be set before calling this method, this defaults to pytac.LIVE as that is the only data source that currently uses devices. Args: field (str): The lookup key to find the device on the lattice. Returns: Device: The device on the given field. Raises: DataSourceException: if no DeviceDataSource is set. """ try: return self._data_source_manager.get_device(field) except DataSourceException: raise DataSourceException("No device data source on lattice {0}." .format(self))
[docs] def get_unitconv(self, field): """Get the unit conversion option for the specified field. Args: field (str): The field associated with this conversion. Returns: UnitConv: The object associated with the specified field. Raises: FieldException: if no unit conversion object is present. """ try: return self._data_source_manager.get_unitconv(field) except FieldException: raise FieldException("No unit conversion option for field {0} on " "lattice {1}.".format(field, self))
[docs] def set_unitconv(self, field, uc): """Set the unit conversion option for the specified field. Args: field (str): The field associated with this conversion. uc (UnitConv): The unit conversion object to be set. """ self._data_source_manager.set_unitconv(field, uc)
[docs] def get_value(self, field, handle=pytac.RB, units=pytac.DEFAULT, data_source=pytac.DEFAULT, throw=True): """Get the value for a field on the lattice. Returns the value of a field on the lattice. This value is uniquely identified by a field and a handle. The returned value is either in engineering or physics units. The data_source flag returns either real or simulated values. Args: field (str): The requested field. handle (str): pytac.SP or pytac.RB. units (str): pytac.ENG or pytac.PHYS returned. data_source (str): pytac.LIVE or pytac.SIM. throw (bool): On failure, if True raise ControlSystemException, if False None will be returned for any PV that fails and log a warning. Returns: float: The value of the requested field Raises: DataSourceException: if there is no data source on the given field. FieldException: if the lattice does not have the specified field. """ try: return self._data_source_manager.get_value(field, handle, units, data_source, throw) except DataSourceException: raise DataSourceException("No data source {0} on lattice {1}." .format(data_source, self)) except FieldException: raise FieldException("Lattice {0} does not have field {1} on data " "source {2}".format(self, field, data_source))
[docs] def set_value(self, field, value, handle=pytac.SP, units=pytac.DEFAULT, data_source=pytac.DEFAULT, throw=True): """Set the value for a field. This value can be set on the machine or the simulation. Args: field (str): The requested field. value (float): The value to set. handle (str): pytac.SP or pytac.RB. units (str): pytac.ENG or pytac.PHYS. data_source (str): pytac.LIVE or pytac.SIM. throw (bool): On failure, if True raise ControlSystemException, if False log a warning. Raises: DataSourceException: if arguments are incorrect. FieldException: if the lattice does not have the specified field. """ try: self._data_source_manager.set_value(field, value, handle, units, data_source, throw) except DataSourceException: raise DataSourceException("No data source {0} on lattice {1}." .format(data_source, self)) except FieldException: raise FieldException("Lattice {0} does not have field {1} on data " "source {2}".format(self, field, data_source))
[docs] def get_length(self): """Returns the length of the lattice, in meters. Returns: float: The length of the lattice (m). """ total_length = 0.0 for e in self._elements: total_length += e.length return total_length
[docs] def add_element(self, element): """Append an element to the lattice and update its lattice reference. Args: element (Element): element to append. """ element.set_lattice(self) self._elements.append(element)
[docs] def get_elements(self, family=None, cell=None): """Get the elements of a family from the lattice. If no family is specified it returns all elements. Elements are returned in the order they exist in the ring. Args: family (str): requested family. cell (int): restrict elements to those in the specified cell. Returns: list: list containing all elements of the specified family. Raises: ValueError: if there are no elements in the specified cell or family. """ if family is None: elements = self._elements[:] if len(elements) == 0: raise ValueError("No elements in lattice {0}.".format(self)) else: elements = [e for e in self._elements if family in e.families] if len(elements) == 0: raise ValueError("No elements in family {0}.".format(family)) if cell is not None: elements = [e for e in elements if e.cell == cell] if len(elements) == 0: raise ValueError("No elements in cell {0}.".format(cell)) return elements
[docs] def get_all_families(self): """Get all families of elements in the lattice. Returns: set: all defined families. """ families = set() for element in self._elements: families.update(element.families) return families
[docs] def get_family_s(self, family): """Get s positions for all elements from the same family. Args: family (str): requested family. Returns: list: list of s positions for each element. """ elements = self.get_elements(family) s_positions = [] for element in elements: s_positions.append(element.s) return s_positions
[docs] def get_element_devices(self, family, field): """Get devices for a specific field for elements in the specfied family. Typically all elements of a family will have devices associated with the same fields - for example, BPMs each have a device for fields 'x' and 'y'. Args: family (str): family of elements. field (str): field specifying the devices. Returns: list: devices for specified family and field. """ elements = self.get_elements(family) devices = [] for element in elements: try: devices.append(element.get_device(field)) except DataSourceException: logging.warning("No device for field {0} on element {1}." .format(field, element)) return devices
[docs] def get_element_device_names(self, family, field): """Get the names for devices attached to a specific field for elements in the specfied family. Typically all elements of a family will have devices associated with the same fields - for example, BPMs each have a device for fields 'x' and 'y'. Args: family (str): family of elements. field (str): field specifying the devices. Returns: list: device names for specified family and field. """ devices = self.get_element_devices(family, field) return [device.name for device in devices]
[docs] def get_element_values(self, family, field, handle=pytac.RB, units=pytac.DEFAULT, data_source=pytac.DEFAULT, dtype=None): """Get the value of the given field for all elements in the given family in the lattice. Args: family (str): family of elements to request the values of. field (str): field to request values for. handle (str): pytac.RB or pytac.SP. units (str): pytac.ENG or pytac.PHYS. data_source (str): pytac.LIVE or pytac.SIM. dtype (numpy.dtype): if None, return a list. If not None, return a numpy array of the specified type. Returns: list or numpy.array: The requested values. """ elements = self.get_elements(family) values = [element.get_value(field, handle, units, data_source) for element in elements] if dtype is not None: values = numpy.array(values, dtype=dtype) return values
[docs] def set_element_values(self, family, field, values, handle=pytac.SP, units=pytac.DEFAULT, data_source=pytac.DEFAULT): """Set the value of the given field for all elements in the given family in the lattice to the given values. Args: family (str): family of elements on which to set values. field (str): field to set values for. values (sequence): A list of values to assign. handle (str): pytac.SP or pytac.RB. units (str): pytac.ENG or pytac.PHYS. data_source (str): pytac.LIVE or pytac.SIM. Raises: IndexError: if the given list of values doesn't match the number of elements in the family. """ if handle != pytac.SP: raise HandleException("Must write using {0}.".format(pytac.SP)) elements = self.get_elements(family) if len(elements) != len(values): raise IndexError("Number of elements in given array({0}) must be " "equal to the number of elements in the " "family({1}).".format(len(values), len(elements))) for element, value in zip(elements, values): element.set_value(field, value, handle=pytac.SP, units=units, data_source=data_source)
[docs] def set_default_units(self, default_units): """Sets the default unit type for the lattice and all its elements. Args: default_units (str): The default unit type to be set across the entire lattice, pytac.ENG or pytac.PHYS. Raises: UnitsException: if specified default unit type is not a valid unit type. """ if default_units == pytac.ENG or default_units == pytac.PHYS: self._data_source_manager.default_units = default_units elems = self.get_elements() for elem in elems: elem._data_source_manager.default_units = default_units elif default_units is not None: raise UnitsException("{0} is not a unit type. Please enter {1} or " "{2}.".format(default_units, pytac.ENG, pytac.PHYS))
[docs] def set_default_data_source(self, default_ds): """Sets the default data source for the lattice and all its elements. Args: default_ds (str): The default data source to be set across the entire lattice, pytac.LIVE or pytac.SIM. Raises: DataSourceException: if specified default data source is not a valid data source. """ if (default_ds == pytac.LIVE) or (default_ds == pytac.SIM): self._data_source_manager.default_data_source = default_ds elems = self.get_elements() for elem in elems: elem._data_source_manager.default_data_source = default_ds elif default_ds is not None: raise DataSourceException("{0} is not a data source. Please enter " "{1} or {2}.".format(default_ds, pytac.LIVE, pytac.SIM))
[docs] def get_default_units(self): """Get the default unit type, pytac.ENG or pytac.PHYS. Returns: str: the default unit type for the entire lattice. """ return self._data_source_manager.default_units
[docs] def get_default_data_source(self): """Get the default data source, pytac.LIVE or pytac.SIM. Returns: str: the default data source for the entire lattice. """ return self._data_source_manager.default_data_source
[docs] def convert_family_values(self, family, field, values, origin, target): """Convert the given values according to the given origin and target units, using the unit conversion objects for the given field on the elements in the given family. Args: family (str): the family of elements which the values belong to. field (str): the field on the elements which the values are from. values (sequence): values to be converted. origin (str): pytac.ENG or pytac.PHYS. target (str): pytac.ENG or pytac.PHYS. """ elements = self.get_elements(family) if len(elements) != len(values): raise IndexError("Number of elements in given sequence({0}) must " "be equal to the number of elements in the " "family({1}).".format(len(values), len(elements))) converted_values = [] for elem, value in zip(elements, values): uc = elem.get_unitconv(field) converted_values.append(uc.convert(value, origin, target)) return converted_values
[docs]class EpicsLattice(Lattice): """EPICS-aware lattice class. Allows efficient get_values() and set_values() methods, and adds get_pv_names() method. **Attributes:** Attributes: name (str): The name of the lattice. symmetry (int): The symmetry of the lattice (the number of cells). .. Private Attributes: _elements (list): The list of all the element objects in the lattice _cs (ControlSystem): The control system to use for the more efficient batch getting and setting of PVs. _data_source_manager (DataSourceManager): A class that manages the data sources associated with this lattice. """ def __init__(self, name, epics_cs, symmetry=None): """ Args: name (str): The name of the epics lattice. epics_cs (ControlSystem): The control system used to store the values on a PV. symmetry (int): The symmetry of the lattice (the number of cells). **Methods:** """ super(EpicsLattice, self).__init__(name, symmetry) self._cs = epics_cs
[docs] def get_pv_name(self, field, handle): """Get the PV name for a specific field, and handle on this lattice. Args: field (str): The requested field. handle (str): pytac.RB or pytac.SP. Returns: str: The readback or setpoint PV for the specified field. """ try: return (self._data_source_manager._data_sources[pytac.LIVE] .get_device(field).get_pv_name(handle)) except KeyError: raise DataSourceException("Lattice {0} has no device for field " "{1}.".format(self, field)) except AttributeError: raise DataSourceException("Cannot get PV for field {0} on lattice " "{1}, as basic devices do not have " "associated PV's.".format(field, self))
[docs] def get_element_pv_names(self, family, field, handle): """Get the PV names for the given field, and handle, on all elements in the given family in the lattice. Assume that the elements are EpicsElements that have the get_pv_name() method. Args: family (str): The requested family. field (str): The requested field. handle (str): pytac.RB or pytac.SP. Returns: list: A list of PV names, strings. """ elements = self.get_elements(family) pv_names = [] for element in elements: pv_names.append(element.get_pv_name(field, handle)) return pv_names
[docs] def get_element_values(self, family, field, handle=pytac.RB, units=pytac.DEFAULT, data_source=pytac.DEFAULT, dtype=None): """Get the value of the given field for all elements in the given family in the lattice. Args: family (str): family of elements to request the values of. field (str): field to request values for. handle (str): pytac.RB or pytac.SP. units (str): pytac.ENG or pytac.PHYS. data_source (str): pytac.LIVE or pytac.SIM. dtype (numpy.dtype): if None, return a list. If not None, return a numpy array of the specified type. Returns: list or numpy.array: The requested values. """ if data_source == pytac.DEFAULT: data_source = self.get_default_data_source() if units == pytac.DEFAULT: units = self.get_default_units() if data_source == pytac.LIVE: pv_names = self.get_element_pv_names(family, field, handle) values = self._cs.get_multiple(pv_names) if units == pytac.PHYS: values = self.convert_family_values(family, field, values, pytac.ENG, pytac.PHYS) else: values = super(EpicsLattice, self).get_element_values(family, field, handle, units, data_source) if dtype is not None: values = numpy.array(values, dtype=dtype) return values
[docs] def set_element_values(self, family, field, values, handle=pytac.SP, units=pytac.DEFAULT, data_source=pytac.DEFAULT): """Set the value of the given field for all elements in the given family in the lattice to the given values. Args: family (str): family of elements on which to set values. field (str): field to set values for. values (sequence): A list of values to assign. handle (str): pytac.SP or pytac.RB. units (str): pytac.ENG or pytac.PHYS. data_source (str): pytac.LIVE or pytac.SIM. Raises: IndexError: if the given list of values doesn't match the number of elements in the family. """ if data_source == pytac.DEFAULT: data_source = self.get_default_data_source() if units == pytac.DEFAULT: units = self.get_default_units() if handle != pytac.SP: raise HandleException("Must write using {0}.".format(pytac.SP)) if data_source == pytac.LIVE: if units == pytac.PHYS: values = self.convert_family_values(family, field, values, pytac.PHYS, pytac.ENG) pv_names = self.get_element_pv_names(family, field, pytac.SP) if len(pv_names) != len(values): raise IndexError("Number of elements in given sequence({0}) " "must be equal to the number of elements in " "the family({1}).".format(len(values), len(pv_names))) self._cs.set_multiple(pv_names, values) else: super(EpicsLattice, self).set_element_values(family, field, values, pytac.SP, units, data_source)