"""Module to load the elements of the machine from csv files.
The csv files are stored in one directory with specified names:
* elements.csv
* devices.csv
* families.csv
* unitconv.csv
* uc_poly_data.csv
* uc_pchip_data.csv
"""
from __future__ import print_function
import sys
import os
import csv
import pytac
from pytac import lattice, element, device, model, units, utils
import collections
ELEMENTS_FILENAME = 'elements.csv'
DEVICES_FILENAME = 'devices.csv'
FAMILIES_FILENAME = 'families.csv'
UNITCONV_FILENAME = 'unitconv.csv'
POLY_FILENAME = 'uc_poly_data.csv'
PCHIP_FILENAME = 'uc_pchip_data.csv'
[docs]def get_div_rigidity(energy):
rigidity = utils.rigidity(energy)
def div_rigidity(input):
return input / rigidity
return div_rigidity
[docs]def get_mult_rigidity(energy):
rigidity = utils.rigidity(energy)
def mult_rigidity(input):
return input * rigidity
return mult_rigidity
[docs]def load_poly_unitconv(filename):
"""Load polynomial unit conversions from a csv file."""
unitconvs = {}
data = collections.defaultdict(list)
with open(filename) as poly:
csv_reader = csv.DictReader(poly)
for item in csv_reader:
data[(int(item['uc_id']))].append((int(item['coeff']), float(item['val'])))
# Create PolyUnitConv for each item and put in the dict
for uc_id in data:
u = units.PolyUnitConv([x[1] for x in reversed(sorted(data[uc_id]))])
unitconvs[uc_id] = u
return unitconvs
[docs]def load_pchip_unitconv(filename):
"""Load pchip unit conversions from a csv file."""
unitconvs = {}
data = collections.defaultdict(list)
with open(filename) as pchip:
csv_reader = csv.DictReader(pchip)
for item in csv_reader:
data[(int(item['uc_id']))].append((float(item['eng']), float(item['phy'])))
# Create PchipUnitConv for each item and put in the dict
for uc_id in data:
eng = [x[0] for x in sorted(data[uc_id])]
phy = [x[1] for x in sorted(data[uc_id])]
u = units.PchipUnitConv(eng, phy)
unitconvs[uc_id] = u
return unitconvs
[docs]def load_unitconv(directory, mode, lattice):
"""Load the unit conversion objects from a file.
Args:
directory (str): The directory where the data is stored.
mode (str): The name of the mode that is used.
lattice(Lattice): The lattice object that will be used.
"""
unitconvs = {}
# Assemble datasets from the polynomial file
poly_file = os.path.join(directory, mode, POLY_FILENAME)
unitconvs.update(load_poly_unitconv(poly_file))
# Assemble datasets from the pchip file
pchip_file = os.path.join(directory, mode, PCHIP_FILENAME)
unitconvs.update(load_pchip_unitconv(pchip_file))
# Add the unitconv objects to the elements
with open(os.path.join(directory, mode, UNITCONV_FILENAME)) as unitconv:
csv_reader = csv.DictReader(unitconv)
for item in csv_reader:
element = lattice[int(item['el_id']) - 1]
# For certain magnet types, we need an additional rigidity
# conversion factor as well as the raw conversion.
if element.families.intersection(('HSTR', 'VSTR', 'QUAD', 'SEXT')):
unitconvs[int(item['uc_id'])]._post_eng_to_phys = get_div_rigidity(lattice.get_energy())
unitconvs[int(item['uc_id'])]._pre_phys_to_eng = get_mult_rigidity(lattice.get_energy())
element._uc[item['field']] = unitconvs[int(item['uc_id'])]
[docs]def load(mode, control_system=None, directory=None):
"""Load the elements of a lattice from a directory.
Args:
mode (str): The name of the mode to be loaded.
control_system (ControlSystem): The control system to be used. If none is provided
an EpicsControlSystem will be created.
directory (str): Directory where to load the files from. If no directory is given
the data directory at the root of the repository is used.
Returns:
Lattice: The lattice containing all elements.
"""
try:
if control_system is None:
# Don't import epics unless we need it to avoid unnecessary
# installation of cothread
from pytac import epics
control_system = epics.EpicsControlSystem()
except ImportError:
print(('To load a lattice using the default control system,'
' please install cothread.'),
file=sys.stderr)
return None
if directory is None:
directory = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'data')
lat = lattice.Lattice(mode, control_system, 3000)
s = 0
index = 1
with open(os.path.join(directory, mode, ELEMENTS_FILENAME)) as elements:
csv_reader = csv.DictReader(elements)
for item in csv_reader:
length = float(item['length'])
cell = int(item['cell']) if item['cell'] else None
e = element.Element(item['name'], length,
item['type'], s, index, cell)
e.add_to_family(item['type'])
e.set_model(model.DeviceModel(), pytac.LIVE)
lat.add_element(e)
s += length
index += 1
with open(os.path.join(directory, mode, DEVICES_FILENAME)) as devices:
csv_reader = csv.DictReader(devices)
for item in csv_reader:
name = item['name']
get_pv = item['get_pv'] if item['get_pv'] else None
set_pv = item['set_pv'] if item['set_pv'] else None
pve = True
d = device.Device(name, control_system, pve, get_pv, set_pv)
lat[int(item['id']) - 1].add_device(item['field'], d, units.UnitConv())
with open(os.path.join(directory, mode, FAMILIES_FILENAME)) as families:
csv_reader = csv.DictReader(families)
for item in csv_reader:
lat[int(item['id']) - 1].add_to_family(item['family'])
if os.path.exists(os.path.join(directory, mode, UNITCONV_FILENAME)):
load_unitconv(directory, mode, lat)
return lat