#!/usr/bin/env python3
## Copyright (C) 2020 David Miguel Susano Pinto <carandraug@gmail.com>
## Copyright (C) 2020 Mick Phillips <mick.phillips@gmail.com>
##
## This file is part of Microscope.
##
## Microscope is free software: you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation, either version 3 of the License, or
## (at your option) any later version.
##
## Microscope is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Microscope. If not, see <http://www.gnu.org/licenses/>.
"""Abstract Base Classes for the different device types.
"""
import abc
import functools
import itertools
import logging
import queue
import threading
import time
import typing
from ast import literal_eval
from enum import EnumMeta
from threading import Thread
import numpy
import Pyro4
import microscope
_logger = logging.getLogger(__name__)
# Trigger types.
(TRIGGER_AFTER, TRIGGER_BEFORE, TRIGGER_DURATION, TRIGGER_SOFT) = range(4)
# Mapping of setting data types descriptors to allowed-value types.
#
# We use a descriptor for the type instead of the actual type because
# there may not be a unique proper type as for example in enum.
DTYPES = {
"int": (tuple,),
"float": (tuple,),
"bool": (type(None),),
"enum": (list, EnumMeta, dict, tuple),
"str": (int,),
"tuple": (type(None),),
}
def _call_if_callable(f):
"""Call callables, or return value of non-callables."""
return f() if callable(f) else f
class _Setting:
"""Create a setting.
Args:
name: the setting's name.
dtype: a data type from `"int"`, `"float"`, `"bool"`,
`"enum"`, or `"str"` (see `DTYPES`).
get_func: a function to get the current value.
set_func: a function to set the value.
values: a description of allowed values dependent on dtype, or
function that returns a description.
readonly: an optional function to indicate if the setting is
readonly. A setting may be readonly temporarily, so this
function will return `True` or `False` to indicate its
current state. If set to no `None` (default), then its
value will be dependent on the value of `set_func`.
A client needs some way of knowing a setting name and data type,
retrieving the current value and, if settable, a way to retrieve
allowable values, and set the value.
Setters and getters accept or return:
* the setting value for int, float, bool and str;
* the setting index into a list, dict or Enum type for enum.
.. todo::
refactor into subclasses to avoid if isinstance .. elif
.. else. Settings classes should be private: devices should
use a factory method rather than instantiate settings
directly; most already use add_setting for this.
"""
def __init__(
self,
name: str,
dtype: str,
get_func: typing.Optional[typing.Callable[[], typing.Any]],
set_func: typing.Optional[typing.Callable[[typing.Any], None]] = None,
values: typing.Any = None,
readonly: typing.Optional[typing.Callable[[], bool]] = None,
) -> None:
self.name = name
if dtype not in DTYPES:
raise ValueError("Unsupported dtype.")
elif not (isinstance(values, DTYPES[dtype]) or callable(values)):
raise TypeError(
"Invalid values type for %s '%s': expected function or %s"
% (dtype, name, DTYPES[dtype])
)
self.dtype = dtype
self._get = get_func
self._values = values
self._last_written = None
if self._get is not None:
self._set = set_func
else:
# Cache last written value for write-only settings.
def w(value):
self._last_written = value
set_func(value)
self._set = w
if readonly is None:
if self._set is None:
self._readonly = lambda: True
else:
self._readonly = lambda: False
else:
if self._set is None:
raise ValueError(
"`readonly` is not `None` but `set_func` is `None`"
)
else:
self._readonly = readonly
def describe(self):
return {
"type": self.dtype,
"values": self.values(),
"readonly": self.readonly(),
"cached": self._last_written is not None,
}
def get(self):
if self._get is not None:
value = self._get()
else:
value = self._last_written
if isinstance(self._values, EnumMeta):
return self._values(value).value
else:
return value
def readonly(self) -> bool:
return self._readonly()
def set(self, value) -> None:
"""Set a setting."""
if self._set is None:
raise NotImplementedError()
# TODO further validation.
if isinstance(self._values, EnumMeta):
value = self._values(value)
self._set(value)
def values(self):
if isinstance(self._values, EnumMeta):
return [(v.value, v.name) for v in self._values]
values = _call_if_callable(self._values)
if values is not None:
if self.dtype == "enum":
if isinstance(values, dict):
return list(values.items())
else:
# self._values is a list or tuple
return list(enumerate(values))
elif self._values is not None:
return values
[docs]class FloatingDeviceMixin(metaclass=abc.ABCMeta):
"""A mixin for devices that 'float'.
Some SDKs handling multiple devices do not allow for explicit
selection of a specific device. Instead, a device must be
initialized and then queried to determine its ID. This class is a
mixin which identifies a subclass as floating, and enforces the
implementation of a `get_id` method.
"""
[docs] @abc.abstractmethod
def get_id(self) -> str:
"""Return a unique hardware identifier such as a serial number."""
pass
[docs]class TriggerTargetMixin(metaclass=abc.ABCMeta):
"""Mixin for a device that may be the target of a hardware trigger.
.. todo::
Need some way to retrieve the supported trigger types and
modes. This is not just two lists, one for types and another
for modes, because some modes can only be used with certain
types and vice-versa.
"""
@property
@abc.abstractmethod
def trigger_mode(self) -> microscope.TriggerMode:
raise NotImplementedError()
@property
@abc.abstractmethod
def trigger_type(self) -> microscope.TriggerType:
raise NotImplementedError()
[docs] @abc.abstractmethod
def set_trigger(
self, ttype: microscope.TriggerType, tmode: microscope.TriggerMode
) -> None:
"""Set device for a specific trigger.
"""
raise NotImplementedError()
@abc.abstractmethod
def _do_trigger(self) -> None:
"""Actual trigger of the device.
Classes implementing this interface should implement this
method instead of `trigger`.
"""
raise NotImplementedError()
[docs] def trigger(self) -> None:
"""Trigger device.
The actual effect is device type dependent. For example, on a
`Camera` it triggers image acquisition while on a
`DeformableMirror` it applies a queued pattern. See
documentation for the devices implementing this interface for
details.
Raises:
microscope.IncompatibleStateError: if trigger type is not
set to `TriggerType.SOFTWARE`.
"""
if self.trigger_type is not microscope.TriggerType.SOFTWARE:
raise microscope.IncompatibleStateError(
"trigger type is not software"
)
_logger.debug("trigger by software")
self._do_trigger()
[docs]class Device(metaclass=abc.ABCMeta):
"""A base device class. All devices should subclass this class.
Args:
index: the index of the device on a shared library. This
argument is added by the deviceserver.
"""
def __init__(self, index: typing.Optional[int] = None) -> None:
self.enabled = False
self._settings: typing.Dict[str, _Setting] = {}
self._index = index
def __del__(self) -> None:
self.shutdown()
[docs] def get_is_enabled(self) -> bool:
return self.enabled
def _do_disable(self):
"""Do any device-specific work on disable.
Subclasses should override this method rather than modify
`disable`.
"""
return True
[docs] def disable(self) -> None:
"""Disable the device for a short period for inactivity."""
self._do_disable()
self.enabled = False
def _do_enable(self):
"""Do any device specific work on enable.
Subclasses should override this method, rather than modify
`enable`.
"""
return True
[docs] def enable(self) -> None:
"""Enable the device."""
try:
self.enabled = self._do_enable()
except Exception as err:
_logger.debug("Error in _do_enable:", exc_info=err)
@abc.abstractmethod
def _do_shutdown(self) -> None:
"""Private method - actual shutdown of the device.
Users should be calling :meth:`shutdown` and not this method.
Concrete implementations should implement this method instead
of `shutdown`.
"""
raise NotImplementedError()
[docs] def initialize(self) -> None:
"""Initialize the device.
If devices have this method (not required, and many don't),
then they should call it as part of the initialisation, i.e.,
they should call it on their `__init__` method.
"""
pass
[docs] def shutdown(self) -> None:
"""Shutdown the device.
Disable and disconnect the device. This method should be
called before destructing the device object, to ensure that
the device is actually shutdown.
After `shutdown`, the device object is no longer usable and
calling any other method is undefined behaviour. The only
exception `shutdown` itself which can be called consecutively,
and after the first time will have no effect.
A device object that has been shutdown can't be reinitialised.
Instead of reusing the object, a new one should be created
instead. This means that `shutdown` will leave the device in
a state that it can be reconnected.
.. code-block:: python
device = SomeDevice()
device.shutdown()
# Multiple calls to shutdown are OK
device.shutdown()
device.shutdown()
# After shutdown, everything else is undefined behaviour.
device.enable() # undefined behaviour
device.get_setting("speed") # undefined behaviour
# To reinitialise the device, construct a new instance.
device = SomeDevice()
.. note::
While `__del__` calls `shutdown`, one should not rely on
it. Python does not guarante that `__del__` will be
called when the interpreter exits so if `shutdown` is not
called explicitely, the devices might not be shutdown.
"""
try:
self.disable()
except Exception as e:
_logger.warning("Exception in disable() during shutdown: %s", e)
_logger.info("Shutting down ... ... ...")
self._do_shutdown()
_logger.info("... ... ... ... shut down completed.")
[docs] def add_setting(
self,
name,
dtype,
get_func,
set_func,
values,
readonly: typing.Optional[typing.Callable[[], bool]] = None,
) -> None:
"""Add a setting definition.
Args:
name: the setting's name.
dtype: a data type from `"int"`, `"float"`, `"bool"`,
`"enum"`, or `"str"` (see `DTYPES`).
get_func: a function to get the current value.
set_func: a function to set the value.
values: a description of allowed values dependent on
dtype, or function that returns a description.
readonly: an optional function to indicate if the setting
is readonly. A setting may be readonly temporarily,
so this function will return `True` or `False` to
indicate its current state. If set to no `None`
(default), then its value will be dependent on the
value of `set_func`.
A client needs some way of knowing a setting name and data
type, retrieving the current value and, if settable, a way to
retrieve allowable values, and set the value. We store this
info in a dictionary. I considered having a `Setting1 class
with getter, setter, etc., and adding `Setting` instances as
device attributes, but Pyro does not support dot notation to
access the functions we need (e.g. `Device.some_setting.set`),
so I'd have to write access functions, anyway.
"""
if dtype not in DTYPES:
raise ValueError("Unsupported dtype.")
elif not (isinstance(values, DTYPES[dtype]) or callable(values)):
raise TypeError(
"Invalid values type for %s '%s': expected function or %s"
% (dtype, name, DTYPES[dtype])
)
else:
self._settings[name] = _Setting(
name, dtype, get_func, set_func, values, readonly
)
[docs] def get_setting(self, name: str):
"""Return the current value of a setting."""
try:
return self._settings[name].get()
except Exception as err:
_logger.error("in get_setting(%s):", name, exc_info=err)
raise
[docs] def get_all_settings(self):
"""Return ordered settings as a list of dicts."""
# Fetching some settings may fail depending on device state.
# Report these values as 'None' and continue fetching other settings.
def catch(f):
try:
return f()
except Exception as err:
_logger.error("getting %s: %s", f.__self__.name, err)
return None
return {k: catch(v.get) for k, v in self._settings.items()}
[docs] def set_setting(self, name: str, value) -> None:
"""Set a setting."""
try:
self._settings[name].set(value)
except Exception as err:
_logger.error("in set_setting(%s):", name, exc_info=err)
raise
[docs] def describe_setting(self, name: str):
"""Return ordered setting descriptions as a list of dicts."""
return self._settings[name].describe()
[docs] def describe_settings(self):
"""Return ordered setting descriptions as a list of dicts."""
return [(k, v.describe()) for (k, v) in self._settings.items()]
[docs] def update_settings(self, incoming, init: bool = False):
"""Update settings based on dict of settings and values."""
if init:
# Assume nothing about state: set everything.
my_keys = set(self._settings.keys())
their_keys = set(incoming.keys())
update_keys = my_keys & their_keys
if update_keys != my_keys:
missing = ", ".join([k for k in my_keys - their_keys])
msg = (
"update_settings init=True but missing keys: %s." % missing
)
_logger.debug(msg)
raise Exception(msg)
else:
# Only update changed values.
my_keys = set(self._settings.keys())
their_keys = set(incoming.keys())
update_keys = set(
key
for key in my_keys & their_keys
if self.get_setting(key) != incoming[key]
)
results = {}
# Update values.
for key in update_keys:
if key not in my_keys or not self._settings[key].set:
# Setting not recognised or no set function implemented
results[key] = NotImplemented
update_keys.remove(key)
continue
if self._settings[key].readonly():
continue
self._settings[key].set(incoming[key])
# Read back values in second loop.
for key in update_keys:
results[key] = self._settings[key].get()
return results
[docs]def keep_acquiring(func):
"""Wrapper to preserve acquiring state of data capture devices."""
def wrapper(self, *args, **kwargs):
if self._acquiring:
self.abort()
result = func(self, *args, **kwargs)
self._do_enable()
else:
result = func(self, *args, **kwargs)
return result
return wrapper
[docs]class DataDevice(Device, metaclass=abc.ABCMeta):
"""A data capture device.
This class handles a thread to fetch data from a device and dispatch
it to a client. The client is set using set_client(uri) or (legacy)
receiveClient(uri).
Derived classed should implement::
* :meth:`abort` (required)
* :meth:`_fetch_data` (required)
* :meth:`_process_data` (optional)
Derived classes may override `__init__`, `enable` and `disable`,
but must ensure to call this class's implementations as indicated
in the docstrings.
"""
def __init__(self, buffer_length: int = 0, **kwargs) -> None:
"""Derived.__init__ must call this at some point."""
super().__init__(**kwargs)
# A thread to fetch and dispatch data.
self._fetch_thread = None
# A flag to control the _fetch_thread.
self._fetch_thread_run = False
# A flag to indicate that this class uses a fetch callback.
self._using_callback = False
# Clients to which we send data.
self._clientStack = []
# A set of live clients to avoid repeated dispatch to disconnected client.
self._liveClients = set()
# A thread to dispatch data.
self._dispatch_thread = None
# A buffer for data dispatch.
self._dispatch_buffer = queue.Queue(maxsize=buffer_length)
# A flag to indicate if device is ready to acquire.
self._acquiring = False
# A condition to signal arrival of a new data and unblock grab_next_data
self._new_data_condition = threading.Condition()
def __del__(self):
self.disable()
super().__del__()
# Wrap set_setting to pause and resume acquisition.
set_setting = keep_acquiring(Device.set_setting)
[docs] @abc.abstractmethod
def abort(self) -> None:
"""Stop acquisition as soon as possible."""
self._acquiring = False
[docs] def enable(self) -> None:
"""Enable the data capture device.
Ensures that a data handling threads are running. Implement
device specific code in `_do_enable`.
"""
_logger.debug("Enabling ...")
# Call device-specific code.
try:
result = self._do_enable()
except Exception as err:
_logger.debug("Error in _do_enable:", exc_info=err)
self.enabled = False
raise err
if not result:
self.enabled = False
else:
self.enabled = True
# Set up data fetching
if self._using_callback:
if self._fetch_thread:
self._fetch_thread_run = False
else:
if not self._fetch_thread or not self._fetch_thread.is_alive():
self._fetch_thread = Thread(target=self._fetch_loop)
self._fetch_thread.daemon = True
self._fetch_thread.start()
if (
not self._dispatch_thread
or not self._dispatch_thread.is_alive()
):
self._dispatch_thread = Thread(target=self._dispatch_loop)
self._dispatch_thread.daemon = True
self._dispatch_thread.start()
_logger.debug("... enabled.")
[docs] def disable(self) -> None:
"""Disable the data capture device.
Implement device-specific code in `_do_disable`.
"""
self.enabled = False
if self._fetch_thread:
if self._fetch_thread.is_alive():
self._fetch_thread_run = False
self._fetch_thread.join()
super().disable()
@abc.abstractmethod
def _fetch_data(self) -> None:
"""Poll for data and return it, with minimal processing.
If the device uses buffering in software, this function should
copy the data from the buffer, release or recycle the buffer,
then return a reference to the copy. Otherwise, if the SDK
returns a data object that will not be written to again, this
function can just return a reference to the object. If no
data is available, return `None`.
"""
return None
def _process_data(self, data):
"""Do any data processing and return data."""
return data
def _send_data(self, client, data, timestamp):
"""Dispatch data to the client."""
try:
# Cockpit will send a client with receiveData and expects
# two arguments (data and timestamp). But we really want
# to use Python's Queue and instead of just the timestamp
# we should be sending some proper metadata object. We
# don't have that proper metadata class yet so just send
# the image data as a numpy ndarray for now.
if hasattr(client, "put"):
client.put(data)
else:
client.receiveData(data, timestamp)
except (
Pyro4.errors.ConnectionClosedError,
Pyro4.errors.CommunicationError,
):
# Client not listening
_logger.info(
"Removing %s from client stack: disconnected.", client._pyroUri
)
self._clientStack = list(filter(client.__ne__, self._clientStack))
self._liveClients = self._liveClients.difference([client])
def _dispatch_loop(self) -> None:
"""Process data and send results to any client."""
while True:
client, data, timestamp = self._dispatch_buffer.get(block=True)
if client not in self._liveClients:
continue
err = None
if isinstance(data, Exception):
standard_exception = Exception(str(data).encode("ascii"))
try:
self._send_data(client, standard_exception, timestamp)
except Exception as e:
err = e
else:
try:
self._send_data(
client, self._process_data(data), timestamp
)
except Exception as e:
err = e
if err:
# Raising an exception will kill the dispatch loop. We need
# another way to notify the client that there was a problem.
_logger.error("in _dispatch_loop:", exc_info=err)
self._dispatch_buffer.task_done()
def _fetch_loop(self) -> None:
"""Poll source for data and put it into dispatch buffer."""
self._fetch_thread_run = True
while self._fetch_thread_run:
try:
data = self._fetch_data()
except Exception as e:
_logger.error("in _fetch_loop:", exc_info=e)
# Raising an exception will kill the fetch loop. We need
# another way to notify the client that there was a problem.
timestamp = time.time()
self._put(e, timestamp)
data = None
if data is not None:
# TODO Add support for timestamp from hardware.
timestamp = time.time()
self._put(data, timestamp)
else:
time.sleep(0.001)
@property
def _client(self):
"""A getter for the current client."""
return (self._clientStack or [None])[-1]
@_client.setter
def _client(self, val):
"""Push or pop a client from the _clientStack."""
if val is None:
self._clientStack.pop()
else:
self._clientStack.append(val)
self._liveClients = set(self._clientStack)
def _put(self, data, timestamp) -> None:
"""Put data and timestamp into dispatch buffer with target dispatch client."""
self._dispatch_buffer.put((self._client, data, timestamp))
[docs] def set_client(self, new_client) -> None:
"""Set up a connection to our client.
Clients now sit in a stack so that a single device may send
different data to multiple clients in a single experiment.
The usage is currently::
device.set_client(client) # Add client to top of stack
# do stuff, send triggers, receive data
device.set_client(None) # Pop top client off stack.
There is a risk that some other client calls ``None`` before
the current client is finished. Avoiding this will require
rework here to identify the caller and remove only that caller
from the client stack.
"""
if new_client is not None:
if isinstance(new_client, (str, Pyro4.core.URI)):
self._client = Pyro4.Proxy(new_client)
else:
self._client = new_client
else:
self._client = None
# _client uses a setter. Log the result of assignment.
if self._client is None:
_logger.info("Current client is None.")
else:
_logger.info("Current client is %s.", str(self._client))
[docs] @keep_acquiring
def update_settings(self, settings, init: bool = False) -> None:
"""Update settings, toggling acquisition if necessary."""
super().update_settings(settings, init)
# noinspection PyPep8Naming
[docs] def receiveClient(self, client_uri: str) -> None:
"""A passthrough for compatibility."""
self.set_client(client_uri)
[docs] def grab_next_data(self, soft_trigger: bool = True):
"""Returns results from next trigger via a direct call.
Args:
soft_trigger: calls :meth:`trigger` if `True`, waits for
hardware trigger if `False`.
"""
if not self.enabled:
raise microscope.DisabledDeviceError("Camera not enabled.")
self._new_data_condition.acquire()
# Push self onto client stack.
self.set_client(self)
# Wait for data from next trigger.
if soft_trigger:
self.trigger()
self._new_data_condition.wait()
# Pop self from client stack
self.set_client(None)
# Return the data.
return self._new_data
# noinspection PyPep8Naming
[docs] def receiveData(self, data, timestamp) -> None:
"""Unblocks grab_next_frame so it can return."""
with self._new_data_condition:
self._new_data = (data, timestamp)
self._new_data_condition.notify()
[docs]class Camera(TriggerTargetMixin, DataDevice):
"""Adds functionality to :class:`DataDevice` to support cameras.
Defines the interface for cameras. Applies a transform to
acquired data in the processing step.
"""
ALLOWED_TRANSFORMS = [p for p in itertools.product(*3 * [[False, True]])]
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
# A list of readout mode descriptions.
self._readout_modes = ["default"]
# The index of the current readout mode.
self._readout_mode = 0
# Transforms to apply to data (fliplr, flipud, rot90)
# Transform to correct for readout order.
self._readout_transform = (False, False, False)
# Transform supplied by client to correct for system geometry.
self._client_transform = (False, False, False)
# Result of combining client and readout transforms
self._transform = (False, False, False)
# A transform provided by the client.
self.add_setting(
"transform",
"enum",
lambda: Camera.ALLOWED_TRANSFORMS.index(self._transform),
lambda index: self.set_transform(Camera.ALLOWED_TRANSFORMS[index]),
Camera.ALLOWED_TRANSFORMS,
)
self.add_setting(
"readout mode",
"enum",
lambda: self._readout_mode,
self.set_readout_mode,
lambda: self._readout_modes,
)
self.add_setting("roi", "tuple", self.get_roi, self.set_roi, None)
def _process_data(self, data):
"""Apply self._transform to data."""
flips = (self._transform[0], self._transform[1])
rot = self._transform[2]
# Choose appropriate transform based on (flips, rot).
# Do rotation
data = numpy.rot90(data, rot)
# Flip
data = {
(0, 0): lambda d: d,
(0, 1): numpy.flipud,
(1, 0): numpy.fliplr,
(1, 1): lambda d: numpy.fliplr(numpy.flipud(d)),
}[flips](data)
return super()._process_data(data)
[docs] def set_readout_mode(self, description):
"""Set the readout mode and _readout_transform."""
pass
def _set_readout_transform(self, new_transform):
"""Update readout transform and update resultant transform."""
self._readout_transform = [bool(int(t)) for t in new_transform]
self.set_transform(self._client_transform)
[docs] @abc.abstractmethod
def set_exposure_time(self, value: float) -> None:
"""Set the exposure time on the device in seconds."""
pass
[docs] def get_exposure_time(self) -> float:
"""Return the current exposure time in seconds."""
pass
[docs] def get_cycle_time(self) -> float:
"""Return the cycle time in seconds."""
pass
@abc.abstractmethod
def _get_sensor_shape(self) -> typing.Tuple[int, int]:
"""Return a tuple of `(width, height)` indicating shape in pixels."""
pass
[docs] def get_sensor_shape(self) -> typing.Tuple[int, int]:
"""Return a tuple of `(width, height)` corrected for transform."""
shape = self._get_sensor_shape()
if self._transform[2]:
# 90 degree rotation
shape = (shape[1], shape[0])
return shape
@abc.abstractmethod
def _get_binning(self) -> microscope.Binning:
"""Return the current binning."""
pass
[docs] def get_binning(self) -> microscope.Binning:
"""Return the current binning corrected for transform."""
binning = self._get_binning()
if self._transform[2]:
# 90 degree rotation
binning = microscope.Binning(binning[1], binning[0])
return binning
@abc.abstractmethod
def _set_binning(self, binning: microscope.Binning):
"""Set binning along both axes. Return `True` if successful."""
pass
[docs] def set_binning(self, binning: microscope.Binning) -> None:
"""Set binning along both axes. Return `True` if successful."""
h_bin, v_bin = binning
if self._transform[2]:
# 90 degree rotation
binning = microscope.Binning(v_bin, h_bin)
else:
binning = microscope.Binning(h_bin, v_bin)
return self._set_binning(binning)
@abc.abstractmethod
def _get_roi(self) -> microscope.ROI:
"""Return the ROI as it is on hardware."""
raise NotImplementedError()
[docs] def get_roi(self) -> microscope.ROI:
"""Return current ROI. """
roi = self._get_roi()
if self._transform[2]:
# 90 degree rotation
roi = microscope.ROI(roi[1], roi[0], roi[3], roi[2])
return roi
@abc.abstractmethod
def _set_roi(self, roi: microscope.ROI):
"""Set the ROI on the hardware. Return `True` if successful."""
return False
[docs] def set_roi(self, roi: microscope.ROI) -> None:
"""Set the ROI according to the provided rectangle.
Return True if ROI set correctly, False otherwise.
"""
maxw, maxh = self.get_sensor_shape()
binning = self.get_binning()
left, top, width, height = roi
if not width: # 0 or None
width = maxw // binning.h
if not height: # 0 or None
height = maxh // binning.v
if self._transform[2]:
roi = microscope.ROI(left, top, height, width)
else:
roi = microscope.ROI(left, top, width, height)
return self._set_roi(roi)
[docs] def get_trigger_type(self):
"""Return the current trigger mode.
One of
TRIGGER_AFTER,
TRIGGER_BEFORE or
TRIGGER_DURATION (bulb exposure.)
"""
pass
[docs]class SerialDeviceMixin(metaclass=abc.ABCMeta):
"""Mixin for devices that are controlled via serial.
DEPRECATED: turns out that this was a bad idea. A device that has
a serial connection is not a serial connection. The "has a" and
the not "is a" should have told us that we should have been
using composition instead of subclassing, but there you go.
Currently handles the flushing and locking of the comms channel
until a command has finished, and the passthrough to the serial
channel.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
# TODO: We should probably construct the connection here but
# the Serial constructor takes a lot of arguments, and
# it becomes tricky to separate those from arguments to
# the constructor of other parent classes.
self.connection = None # serial.Serial (to be constructed by child)
self._comms_lock = threading.RLock()
def _readline(self) -> bytes:
"""Read a line from connection without leading and trailing whitespace.
"""
return self.connection.readline().strip()
def _write(self, command: bytes) -> int:
"""Send a command to the device.
This is not a simple passthrough to ``serial.Serial.write``,
it will append ``b'\\r\\n'`` to command. Override this method
if a device requires a specific format.
"""
return self.connection.write(command + b"\r\n")
[docs] @staticmethod
def lock_comms(func):
"""Decorator to flush input buffer and lock communications.
There have been problems with the DeepStar lasers returning
junk characters after the expected response, so it is
advisable to flush the input buffer prior to running a command
and subsequent readline. It also locks the comms channel so
that a function must finish all its communications before
another can run.
"""
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
with self._comms_lock:
self.connection.flushInput()
return func(self, *args, **kwargs)
return wrapper
[docs]class LightSource(TriggerTargetMixin, Device, metaclass=abc.ABCMeta):
"""Light source such as lasers or LEDs.
Light sources often, possibly always, only support the
`TriggerMode.BULB`. In this context, the trigger type changes
what happens when `enable` is called. `TriggerType.SOFTWARE`
means that `enable` will make the device emit light immediately,
and disable will make the device stop emit light.
`TriggerType.HIGH` or `TriggerType.LOW` means that `enable` will
set and unset the laser such that it only emits light while
receiving a high or low TTL, or digital, input signal.
"""
@abc.abstractmethod
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._set_point = 0.0
[docs] @abc.abstractmethod
def get_status(self) -> typing.List[str]:
"""Query and return the light source status."""
result = []
return result
[docs] @abc.abstractmethod
def get_is_on(self) -> bool:
"""Return True if the light source is currently able to produce light."""
pass
@abc.abstractmethod
def _do_get_power(self) -> float:
"""Internal function that actually returns the light source power."""
raise NotImplementedError()
@abc.abstractmethod
def _do_set_power(self, power: float) -> None:
"""Internal function that actually sets the light source power.
This function will be called by the `power` attribute setter
after clipping the argument to the [0, 1] interval.
"""
raise NotImplementedError()
@property
def power(self) -> float:
"""Light source power in the [0, 1] interval."""
return self._do_get_power()
@power.setter
def power(self, power: float) -> None:
"""Light source power in the [0, 1] interval.
The power value will be clipped to [0, 1] interval.
"""
clipped_power = max(min(power, 1.0), 0.0)
self._do_set_power(clipped_power)
self._set_point = clipped_power
[docs] def get_set_power(self) -> float:
"""Return the power set point."""
return self._set_point
[docs]class FilterWheel(Device, metaclass=abc.ABCMeta):
"""ABC for filter wheels, cube turrets, and filter sliders.
FilterWheel devices are devices that have specific positions to
hold different filters. Implementations will enable the change to
any of those positions, including positions that may not hold a
filter.
Args:
positions: total number of filter positions on this device.
"""
def __init__(self, positions: int, **kwargs) -> None:
super().__init__(**kwargs)
if positions < 1:
raise ValueError(
"positions must be a positive number (was %d)" % positions
)
self._positions = positions
# The position as an integer.
# Deprecated: clients should call get_position and set_position;
# still exposed as a setting until cockpit uses set_position.
self.add_setting(
"position",
"int",
self.get_position,
self.set_position,
lambda: (0, self.get_num_positions()),
)
@property
def n_positions(self) -> int:
"""Number of wheel positions."""
return self._positions
@property
def position(self) -> int:
"""Number of wheel positions (zero-based)."""
return self._do_get_position()
@position.setter
def position(self, new_position: int) -> None:
if 0 <= new_position < self.n_positions:
return self._do_set_position(new_position)
else:
raise ValueError(
"can't move to position %d, limits are [0 %d]"
% (new_position, self.n_positions - 1)
)
@abc.abstractmethod
def _do_get_position(self) -> int:
raise NotImplementedError()
@abc.abstractmethod
def _do_set_position(self, position: int) -> None:
raise NotImplementedError()
# Deprecated and kept for backwards compatibility.
[docs] def get_num_positions(self) -> int:
"""Deprecated, use the `n_positions` property."""
return self.n_positions
[docs] def get_position(self) -> int:
return self.position
[docs] def set_position(self, position: int) -> None:
self.position = position
[docs]class Controller(Device, metaclass=abc.ABCMeta):
"""Device that controls multiple devices.
Controller devices usually control multiple stage devices,
typically a XY and Z stage, a filterwheel, and a light source.
Controller devices also include multi light source engines.
Each of the controlled devices requires a name. The choice of
name and its documentation is left to the concrete class.
Shutting down a controller device must shutdown the controlled
devices. Concrete classes should be careful to prevent that the
shutdown of a controlled device does not shutdown the controller
and the other controlled devices. This might require that
controlled devices do nothing as part of their shutdown.
"""
@property
@abc.abstractmethod
def devices(self) -> typing.Mapping[str, Device]:
"""Map of names to the controlled devices."""
raise NotImplementedError()
def _do_shutdown(self) -> None:
for d in self.devices.values():
d.shutdown()
[docs]class StageAxis(metaclass=abc.ABCMeta):
"""A single dimension axis for a :class:`StageDevice`.
A `StageAxis` represents a single axis of a stage and is not a
:class:`Device` instance on itself. Even stages with a single
axis, such as Z-axis piezos, are implemented as a `StageDevice`
composed of a single `StageAxis` instance.
The interface for `StageAxis` maps to that of `StageDevice` so
refer to its documentation.
"""
[docs] @abc.abstractmethod
def move_by(self, delta: float) -> None:
"""Move axis by given amount."""
raise NotImplementedError()
[docs] @abc.abstractmethod
def move_to(self, pos: float) -> None:
"""Move axis to specified position."""
raise NotImplementedError()
@property
@abc.abstractmethod
def position(self) -> float:
"""Current axis position."""
raise NotImplementedError()
@property
@abc.abstractmethod
def limits(self) -> microscope.AxisLimits:
"""Upper and lower limits values for position."""
raise NotImplementedError()
[docs]class Stage(Device, metaclass=abc.ABCMeta):
"""A stage device, composed of :class:`StageAxis` instances.
A stage device can have any number of axes and dimensions. For a
single `StageDevice` instance each axis has a name that uniquely
identifies it. The names of the individual axes are hardware
dependent and will be part of the concrete class documentation.
They are typically strings such as `"x"` or `"y"`.
.. code-block:: python
stage = SomeStageDevice()
stage.enable() # may trigger a stage move
# move operations
stage.move_to({'x': 42.0, 'y': -5.1})
stage.move_by({'x': -5.3, 'y': 14.6})
# Individual StageAxis can be controlled directly.
x_axis = stage.axes['x']
y_axis = stage.axes['y']
x_axis.move_to(42.0)
y_axis.move_by(-5.3)
Not all stage devices support simultaneous move of multiple axes.
Because of this, there is no guarantee that move operations with
multiple axes are done simultaneously. Refer to the concrete
class documentation for hardware specific details.
If a move operation involves multiple axes and there is no support
for simultaneous move, the order of the moves is undefined. If a
specific order is required, one can either call the move functions
multiple times in the expected order, or do so via the individual
axes, like so:
.. code-block:: python
# Move the x axis first, then mvoe the y axis:
stage.move_by({'x': 10})
stage.move_by({'y': 4})
# The same thing but via the individual axes:
stage.axes['x'].move_by(10)
stage.axes['y'].move_by(4)
Move operations will not attempt to move a stage beyond its
limits. If a call to the move functions would require the stage
to move beyond its limits the move operation is clipped to the
axes limits. No exception is raised.
.. code-block:: python
# Moves x axis to the its upper limit:
x_axis.move_to(x_axis.limits.upper)
# The same as above since the move operations are clipped to
# the axes limits automatically.
import math
x_axis.move_to(math.inf)
x_axis.move_by(math.inf)
Some stages need to find a reference position, home, before being
able to be moved. If required, this happens automatically during
:func:`enable`.
"""
@property
@abc.abstractmethod
def axes(self) -> typing.Mapping[str, StageAxis]:
"""Map of axis names to the corresponding :class:`StageAxis`.
.. code-block:: python
for name, axis in stage.axes.items():
print(f'moving axis named {name}')
axis.move_by(1)
If an axis is not available then it is not included, i.e.,
given a stage with optional axes the missing axes will *not*
appear on the returned dict with a value of `None` or some
other special `StageAxis` instance.
"""
raise NotImplementedError()
@property
def position(self) -> typing.Mapping[str, float]:
"""Map of axis name to their current position.
.. code-block:: python
for name, position in stage.position.items():
print(f'{name} axis is at position {position}')
The units of the position is the same as the ones being
currently used for the absolute move (:func:`move_to`)
operations.
"""
return {name: axis.position for name, axis in self.axes.items()}
@property
def limits(self) -> typing.Mapping[str, microscope.AxisLimits]:
"""Map of axis name to its upper and lower limits.
.. code-block:: python
for name, limits in stage.limits.items():
print(f'{name} axis lower limit is {limits.lower}')
print(f'{name} axis upper limit is {limits.upper}')
These are the limits currently imposed by the device or
underlying software and may change over the time of the
`StageDevice` object.
The units of the limits is the same as the ones being
currently used for the move operations.
"""
return {name: axis.limits for name, axis in self.axes.items()}
[docs] @abc.abstractmethod
def move_by(self, delta: typing.Mapping[str, float]) -> None:
"""Move axes by the corresponding amounts.
Args:
delta: map of axis name to the amount to be moved.
.. code-block:: python
# Move 'x' axis by 10.2 units and the y axis by -5 units:
stage.move_by({'x': 10.2, 'y': -5})
# The above is equivalent, but possibly faster than:
stage.axes['x'].move_by(10.2)
stage.axes['y'].move_by(-5)
The axes will not move beyond :func:`limits`. If `delta`
would move an axis beyond it limit, no exception is raised.
Instead, the stage will move until the axis limit.
"""
# TODO: implement a software fallback that moves the
# individual axis, for stages that don't have provide
# simultaneous move of multiple axes.
raise NotImplementedError()
[docs] @abc.abstractmethod
def move_to(self, position: typing.Mapping[str, float]) -> None:
"""Move axes to the corresponding positions.
Args:
position: map of axis name to the positions to move to.
.. code-block:: python
# Move 'x' axis to position 8 and the y axis to position -5.3
stage.move_to({'x': 8, 'y': -5.3})
# The above is equivalent to
stage.axes['x'].move_to(8)
stage.axes['y'].move_to(-5.3)
The axes will not move beyond :func:`limits`. If `positions`
is beyond the limits, no exception is raised. Instead, the
stage will move until the axes limit.
"""
raise NotImplementedError()