accml.core.utils.ophyd_async package

Submodules

accml.core.utils.ophyd_async.diff_channel module

Todo

check if not better reimplemented based on derived signal

class accml.core.utils.ophyd_async.diff_channel.DiffChannel(*, name, parent)[source]

Bases: StandardReadable, Movable

Parameters:

parent (Device | None)

set(diff_value)[source]

Return a Status that is marked done when the device is done moving.

Return type:

Status

stage()[source]

Set up the device for acquisition.

Returns:

An AsyncStatus that is marked done when the device is done staging.

unstage()[source]

Clean up the device after acquisition.

Returns:

An AsyncStatus that is marked done when the device is done unstaging.

exception accml.core.utils.ophyd_async.diff_channel.NotInitalisedReferenceValue[source]

Bases: AssertionError

reference value not installed

accml.core.utils.ophyd_async.multiplexer_for_settable_devices module

A collection of power converters exported as if these were multiplexed

Todo

Review if the multiplex itself should be contained in this file too Or a helper function to generate it …

class accml.core.utils.ophyd_async.multiplexer_for_settable_devices.MultiplexerProxy(*args, name='', settable_devices, default_name='', ItemProxy=<class 'accml.core.utils.ophyd_async.multiplexer_for_settable_devices._MultiplexerItemProxy'>, **kwargs)[source]

Bases: StandardReadable, Stageable

Todo

check which other functions need to be overloaded

is the functionallity required that the user can request that all settable devices are added in the output?

Or would it be better to instruct the user to take care of it themselves? Just add them to the list of detectors to the RunEngine

Parameters:
  • name (str)

  • settable_devices (Dict[str, Movable | StandardReadable | Stageable])

  • default_name (str)

async connect(*args, **kwargs)[source]
Return type:

None

Todo

can I return all stats directly? learn how this is correctly done

async stop(success=False)[source]

accml.core.utils.ophyd_async.new_value module

async accml.core.utils.ophyd_async.new_value.aenumerate(aiterable)[source]

async version of enumerate

To be replaced by standard version as soon as available

async accml.core.utils.ophyd_async.new_value.wait_for_new_value(signal, timeout=None)[source]

ensure that new data arrived

It is a bit hacky: first value is ignored as this is the one to be assumed to be in the cache.

Need something like SubscriptionStatus(sig, cb, run=False)

Parameters:

signal (SignalR)

accml.core.utils.ophyd_async.pv_positioner_like_utils module

Module contents