API Documentation

Core API

Sessions

The best way to interface between entities and qalx is with the session object. This has a series of helper methods to interface with the API using entity specific adapters

class pyqalx.core.session.QalxSession(profile_name='default', config_class=None, skip_ini=False, rest_api_class=None, project_name=None)

The session that any interaction with the API will use.

Parameters:
  • profile_name (str) – profile name to get from config_class (default=Config.default)

  • config_class (Config) – The class for the config that will be used for this session

  • skip_ini (bool) – Should loading the config from the inifile be skipped?

  • rest_api_class (PyQalxAPI) – The class to use for the rest api

  • project_name (str) – An optional name of a project. This will be used to tag all data that is created/modified with this session

configure_logging()

Configures the log for this session. A log is namespaced for the config type and the profile name to allow a user to have multiple session instances with different configs/profiles logging to different places.

Returns:

An instance of logging.getLogger

property log

Returns an instance of logging.getLogger for this QalxSession instance This cannot be set as an instance attribute due to pickling issues on windows. Will log depending on the LOGGING_LEVEL config value.

Usage:

>>> QalxSession().log.debug("A debug message")
>>> QalxSession().log.info("An info message")
>>> QalxSession().log.warning("A warning message")
>>> QalxSession().log.error("An error message")
Returns:

logging.getLogger

register(cls)

Registers the given QalxEntity subclass with the session. See entities docs for more info

Parameters:

cls (pyqalx.core.entities.entity.QalxEntity) – subclass of QalxEntity.

unregister(cls)

Unregisters the given QalxEntity subclass from the session. See entities docs for more info

Parameters:

cls (pyqalx.core.entities.entity.QalxEntity) – subclass of QalxEntity.

property item

returns a QalxItem adapter for this session

Returns:

Instance of QalxItem

property set

returns a QalxSet adapter for this session

Returns:

Instance of QalxSet

property group

returns a QalxGroup adapter for this session

Returns:

Instance of QalxGroup

property queue

returns a QalxQueue adapter for this session

Returns:

Instance of QalxQueue

property bot

returns a QalxBot adapter for this session

Returns:

Instance of QalxBot

property worker

returns a QalxWorker adapter for this session

Returns:

Instance of QalxWorker

property factory

returns a QalxFactory adapter for this session

Returns:

Instance of QalxFactory

property blueprint

returns a QalxBlueprint adapter for this session

Returns:

Instance of QalxBlueprint

property notification

returns a QalxNotification adapter for this session

Returns:

Instance of QalxNotification

Adapters

Adapters are how a QalxSession interfaces with the API. These won’t typically get accessed directly - instead the helper properties on the QalxSession should be used.

class pyqalx.core.adapters.blueprint.QalxBlueprint(session, *args, **kwargs)

Provides an interface for accessing the API for Blueprint entities

child_entity_class

alias of Blueprint

add(name, schema, meta=None, **kwargs)

Adds a Blueprint with a valid jsonschema schema. If the schema is invalid then a jsonschema.SchemaError is raised

Parameters:
  • name (str) – The name of this blueprint

  • schema (dict) – The schema that you want to set on the Blueprint.

  • meta (dict) – A dictionary of metadata to store

Returns:

Blueprint

Raises:

jsonschema.SchemaError

save(entity, **kwargs)

Saves any updates to the given Blueprint. Validates that the schema on the entity is a valid jsonschema schema

Parameters:
  • entity (Blueprint) – A valid Blueprint instance

  • kwargs – Any kwargs you want to save against this Blueprint

Returns:

Blueprint

Raises:

jsonschema.SchemaError

get(guid, child_fields=None, unpack=True, *args, **kwargs)

Gets the entity for the given guid and unpacks it if specified. Provide child_fields to restrict the fields returned on unpacked children

Parameters:
  • guid (str) – The guid of the entity to get

  • child_fields (list) – A list of fields that should be returned from child entities

  • unpack (bool) – Should the child entities be unpacked. Defaults to True

Returns:

An unpacked entity

get_by_name(name, **kwargs)

gets a single entity by name

Parameters:
  • name (str) – name of entity

  • kwargs – Keyword arguments to be used in the find method

Returns:

instance of self.entity_class

Raises:
get_or_create(name, meta=None, **kwargs)

Gets an entity by the given name or creates it if it doesn’t exist

Parameters:
  • name (str) – The name that to use to get or create the entity by

  • meta – metadata about the entity

  • kwargs – Other kwargs on the entity to use when creating

Returns:

instance of self.entity_class

find(query=None, sort=None, skip=0, limit=25, many=True, child_fields=None, include_session_tags=True, *args, **kwargs)

Return multiple packed entities from the API

Parameters:
  • query (dict) – The optional Mongo query to find entities

  • sort (list) – The keys to sort by

  • skip (int) – The number of results to skip (offset) by

  • limit (int) – How many results should the response be limited to

  • many (bool) – Should many entities be returned or just a single one

  • include_session_tags (bool) – Should the tags for the session be included in the query. Default is True

  • child_fields (list) – A list of fields that should be returned from child entities

Returns:

A list of entities

find_one(query, unpack=True, **kwargs)

Method for returning a unique entity. Will return the entity that matches the query

Parameters:
  • query (dict) – The mongoDB query

  • unpack (bool) – Should any child entities be automatically unpacked?

Returns:

an instance of self.entity_class

Raises:
reload(entity, unpack=True, *args, **kwargs)

Reloads the current entity from the API

Parameters:
  • entity (Blueprint) – An instance of Blueprint

  • unpack (bool) – Should any child entities be automatically unpacked?

Returns:

A refreshed instance of self._entity_class

aggregate(aggregate, **kwargs)

Method for performing an aggregation on data. See https://docs.mongodb.com/manual/core/aggregation-pipeline/. If a subset of the data is required then provide the query within the aggregation pipeline

Parameters:

aggregate (list) – The aggregation pipeline

Returns:

AggregationResult

cache_kids(kwargs)

Override cache_kids because the self.kids key is not where we need to get the children from. Using deepcopy because schema is a mutable object(dict)

save_kids_on_entity(entity, kids)

Override save_kids_on_entity because the self.kids key is not where the children should be saved.

class pyqalx.core.adapters.bot.QalxBot(session, *args, **kwargs)

Provides an interface for accessing the API for Bot entities

add(name, config, meta=None, **kwargs)

Creates a Bot instance.

Parameters:
  • name (str) – The name that this bot will be given

  • config (dict) – The bots config

  • meta (dict) – A dictionary of metadata to store

Returns:

The newly created Bot instance

get_by_name(name, **kwargs)

gets a single entity by name

Parameters:
  • name (str) – name of entity

  • kwargs – Keyword arguments to be used in the find method

Returns:

instance of self.entity_class

Raises:
get_or_create(name, meta=None, **kwargs)

Gets an entity by the given name or creates it if it doesn’t exist

Parameters:
  • name (str) – The name that to use to get or create the entity by

  • meta – metadata about the entity

  • kwargs – Other kwargs on the entity to use when creating

Returns:

instance of self.entity_class

get(guid, *args, **kwargs)

Gets an instance of self.entity_class by the given guid.

Parameters:

guid – The guid of the entity to get

Returns:

An instance of self.entity_class

save(entity, *args, **kwargs)

Saves entity to the database.

Returns:

The updated instance of entity

find(query=None, sort=None, skip=0, limit=25, many=True, include_session_tags=True, **kwargs)

Method for listing entities.

Parameters:
  • query – The optional Mongo query to find entities

  • sort – The keys to sort by

  • skip – The number of results to skip (offset) by

  • limit – How many results should the response be limited to

  • many – Should many entities be returned or just a single one

  • include_session_tags – Should the tags for the session be included in the query. Default is True

  • kwargs – kwargs to query by

Returns:

find_one(query, **kwargs)

Method for returning a unique entity. Will return the entity that matches the query

Parameters:

query (dict) – The mongoDB query

Returns:

an instance of self.entity_class

Raises:
reload(entity, *args, **kwargs)

Reloads the current entity from the API

Parameters:

entity – An instance of self.entity_class

Returns:

A refreshed instance of self.entity_class

aggregate(aggregate, **kwargs)

Method for performing an aggregation on data. See https://docs.mongodb.com/manual/core/aggregation-pipeline/. If a subset of the data is required then provide the query within the aggregation pipeline

Parameters:

aggregate (list) – The aggregation pipeline

Returns:

AggregationResult

replace_workers(bot_entity, workers)

Completely replaces any Workers on the given bot. Will return the replaced workers in an unpacked state

Parameters:
  • bot_entity (Bot) – The Bot instance that is being changed

  • workers (int) – The number of workers that this bot should have

Returns:

A Bot instance with the updated workers

stop(entity, **kwargs)

Stops all the workers on the bot

Parameters:

entity (Bot) – Bot instance

resume(entity, **kwargs)

Resumes all the workers on the bot

Parameters:

entity (Bot) – Bot instance

terminate(entity, *args, **kwargs)

Terminates all the workers on the bot

Parameters:

entity (Bot) – Bot instance

update_status(entity, status, **kwargs)

Updates the status of an entity

Parameters:
  • entity – The entity that is being updated

  • status (str) – The status to update to

Factory adapter module. Contains the main class that provides factory related interaction with the qalx API: QalxFactory

class pyqalx.core.adapters.factory.QalxFactory(*args, **kwargs)

Provides an interface for accessing the API for Factory entities

validate(plan_path)

Reads a .yaml file that contains the definition of the factory plan and then runs the factory validation endpoint in the API

Parameters:

plan_path (str) – A local path to the .yaml file that contains the factory plan definition

Raises:

QalxFactoryValidationError – If the .yaml file does not exist, if there are any errors parsing the yaml file to a dict, or if the yaml plan structure is invalid

add(name, source=None, file_name='', meta=None, upload=True, **kwargs)

Adds a Factory instance

Parameters:
  • name (str) – The name of the factory

  • source (str or io object) – file path to the factory plan or instance of StringIO or BytesIO (https://docs.python.org/3/library/io.html)

  • file_name (str) – Name for the factory plan input file. Optional if a file path is given for source parameter. Required if an io object

  • meta (dict) – A dictionary of metadata to store

  • upload (bool) – Whether the file should be automatically uploaded or not

Returns:

A newly created Factory instance

save(entity, source=None, file_name='', upload=True, *args, **kwargs)

Saves an updated existing Factory instance.

Parameters:
  • entity (An instance of Factory) – The entity that we are saving

  • source (str or io object) – file path to the factory plan or instance of StringIO or BytesIO (https://docs.python.org/3/library/io.html)

  • file_name (str) – Name for the factory plan input file. Optional if a file path is given for source parameter. Required if an io object

  • upload (bool) – Whether the file should be automatically uploaded or not

Returns:

An updated Factory instance

get(guid, *args, **kwargs)

Gets an instance of self.entity_class by the given guid.

Parameters:

guid – The guid of the entity to get

Returns:

An instance of self.entity_class

find(query=None, sort=None, skip=0, limit=25, many=True, include_session_tags=True, **kwargs)

Method for listing entities.

Parameters:
  • query – The optional Mongo query to find entities

  • sort – The keys to sort by

  • skip – The number of results to skip (offset) by

  • limit – How many results should the response be limited to

  • many – Should many entities be returned or just a single one

  • include_session_tags – Should the tags for the session be included in the query. Default is True

  • kwargs – kwargs to query by

Returns:

find_one(query, **kwargs)

Method for returning a unique entity. Will return the entity that matches the query

Parameters:

query (dict) – The mongoDB query

Returns:

an instance of self.entity_class

Raises:
reload(entity, *args, **kwargs)

Reloads the current entity from the API

Parameters:

entity – An instance of self.entity_class

Returns:

A refreshed instance of self.entity_class

aggregate(aggregate, **kwargs)

Method for performing an aggregation on data. See https://docs.mongodb.com/manual/core/aggregation-pipeline/. If a subset of the data is required then provide the query within the aggregation pipeline

Parameters:

aggregate (list) – The aggregation pipeline

Returns:

AggregationResult

class pyqalx.core.adapters.group.QalxGroup(session, *args, **kwargs)

Provides an interface for accessing the API for Group entities

child_entity_class

alias of Set

add(sets, meta=None, **kwargs)

When adding a Group ensure that the sets posted to the api are in the format {<key>: Set}

Parameters:
  • sets (dict) – A dictionary of Sets to create on the group

  • meta (dict) – A dictionary of metadata to store

Returns:

A newly created Group instance

get(guid, child_fields=None, unpack=False, *args, **kwargs)

Gets the entity for the given guid and unpacks it if specified. Provide child_fields to restrict the fields returned on unpacked children

Parameters:
  • guid (str) – The guid of the entity to get

  • child_fields (list) – A list of fields that should be returned from child entities

  • unpack (bool) – Should the child entities be unpacked. Defaults to False

Returns:

An unpacked entity

reload(entity, unpack=False, *args, **kwargs)

Reloads the current entity from the API.

Parameters:
  • entity – An instance of self.entity_class

  • unpack (bool) – Should the child entities by unpacked? Defaults to False

Returns:

A refreshed instance of self.entity_class

find(query=None, sort=None, skip=0, limit=25, many=True, child_fields=None, include_session_tags=True, *args, **kwargs)

Return multiple packed entities from the API

Parameters:
  • query (dict) – The optional Mongo query to find entities

  • sort (list) – The keys to sort by

  • skip (int) – The number of results to skip (offset) by

  • limit (int) – How many results should the response be limited to

  • many (bool) – Should many entities be returned or just a single one

  • include_session_tags (bool) – Should the tags for the session be included in the query. Default is True

  • child_fields (list) – A list of fields that should be returned from child entities

Returns:

A list of entities

find_one(query, unpack=False, **kwargs)

Method for returning a unique entity. Will return the entity that matches the query

Parameters:
  • query (dict) – The mongoDB query

  • unpack (bool) – Should any child entities be automatically unpacked?

Returns:

Group

Raises:
aggregate(aggregate, **kwargs)

Method for performing an aggregation on data. See https://docs.mongodb.com/manual/core/aggregation-pipeline/. If a subset of the data is required then provide the query within the aggregation pipeline

Parameters:

aggregate (list) – The aggregation pipeline

Returns:

AggregationResult

save(entity, *args, **kwargs)

When saving an unpacked entity the kids need to be packed. To save having to unpack them again just save the original kids and replace them with the packed values after saving

Parameters:

entity – An instance of self.entity_class

Returns:

An updated instance of self.entity_class

class pyqalx.core.adapters.item.QalxItem(*args, **kwargs)

Provides an interface for accessing the API for Item entities

add(data: dict | None = None, source: str | StringIO | BytesIO | None = None, file_name: str = '', meta: dict | None = None, blueprint_name: str | None = None, upload: bool = True, encrypt: bool = True, as_file: bool = False, **kwargs)

Adds an Item instance that can contain either data (as a dict), a file or both.

Parameters:
  • data – Optional data to store against this Item

  • source – file path or instance of StringIO or BytesIO (https://docs.python.org/3/library/io.html)

  • file_name – input file name. Optional if a file path is given for source. Required if an io object

  • meta – A dictionary of metadata to store

  • blueprint_name – An optional blueprint name to use to validate this item against an existing Blueprint

  • upload – Whether the file should be automatically uploaded or not

  • encrypt – Whether the file should be automatically encrypted if the KEYFILE is present

  • as_file – Whether the data dict should be stored in S3 rather than in Mongo

Returns:

Item instance

save(entity, source=None, file_name='', blueprint_name=None, upload=True, encrypt=True, as_file=False, **kwargs)

Saves an updated existing Item instance. To remove a file from an Item update the entity so that entity[‘file’] = {}

Parameters:
  • entity (An instance of Item) – The entity that we are saving

  • source (str or io object) – file path or instance of StringIO or BytesIO (https://docs.python.org/3/library/io.html)

  • file_name (str) – input file name. Optional if a file path is given for source. Required if an io object

  • meta (dict) – A dictionary of metadata to store

  • blueprint_name (str) – An optional blueprint name to use if you want to validate this item against an existing Blueprint

  • upload (bool) – Whether the file should be automatically uploaded or not

  • encrypt (bool) – Whether the file should be automatically encrypted if the KEYFILE is present

  • as_file – Whether the data dict should be stored in S3 rather than in Mongo

Returns:

An updated Item instance

add_many(items, encrypt=True, as_file=False, **kwargs)

Adds multiple items at once.

Parameters:
  • items (list) – A list of item data that you want to create. This can be a mixture of non file items and file items. The keys of each item in the list should be the same as if a single item were being created via the add method

  • encrypt (bool) – Whether the file should be automatically encrypted if the KEYFILE is present. All files are encrypted or not encrypted based on this value

Returns:

find(query=None, sort=None, skip=0, limit=25, many=True, include_session_tags=True, **kwargs)

Method for listing entities.

Parameters:
  • query – The optional Mongo query to find entities

  • sort – The keys to sort by

  • skip – The number of results to skip (offset) by

  • limit – How many results should the response be limited to

  • many – Should many entities be returned or just a single one

  • include_session_tags – Should the tags for the session be included in the query. Default is True

  • kwargs – kwargs to query by

Returns:

find_one(query, **kwargs)

Method for returning a unique entity. Will return the entity that matches the query

Parameters:

query (dict) – The mongoDB query

Returns:

an instance of self.entity_class

Raises:
reload(entity, *args, **kwargs)

Reloads the current entity from the API

Parameters:

entity – An instance of self.entity_class

Returns:

A refreshed instance of self.entity_class

aggregate(aggregate, **kwargs)

Method for performing an aggregation on data. See https://docs.mongodb.com/manual/core/aggregation-pipeline/. If a subset of the data is required then provide the query within the aggregation pipeline

Parameters:

aggregate (list) – The aggregation pipeline

Returns:

AggregationResult

archive(entity, *args, **kwargs)

Archives entity

Parameters:

entity – An instance of self.entity_class

Returns:

The archived instance of entity

delete(entity, *args, **kwargs)

Deletes the given entity from the API

Parameters:

entity – An instance of QalxEntity

class pyqalx.core.adapters.notification.QalxNotification(session, *args, **kwargs)

Provides an interface for accessing the API for Notification entities

add(subject, message, to, cc=None, bcc=None, **kwargs)

Posts a notification request to the API, to send an email with the provided subject, message and recipient list. Optionally, lists for CC and BCC recipients can be provided

Parameters:
  • subject (str) – The subject of the message

  • message (str) – The message to send

  • to (list) – A list with the addresses of the recipients of the message

  • cc (list) – An optional list with the addresses of th recipients of a carbon copy(CC) of the message

  • bcc (list) – An optional list with the addresses of th recipients of a blind carbon copy(BCC) of the message

class pyqalx.core.adapters.queue.QalxQueue(session, *args, **kwargs)

Provides an interface for accessing the API for Queue entities

add(name, meta=None, **kwargs)

Queues are created with a name. This name is stored in the metadata of the Queue instance

Parameters:
  • name (str) – The name we want to assign the Queue

  • meta (dict) – A dictionary of metadata to store

  • kwargs – Any other kwargs we are setting on the Queue

Returns:

A newly created Queue instance

get_messages(worker)

Gets the messages on the Queue instance

:param worker:An instance of Worker

that called this method

Returns:

A list of QueueMessage instances

get_by_name(name, **kwargs)

a single queue by name

Parameters:

name (str) – name of queue

Returns:

Queue

Raises:
get_or_create(name, meta=None, **kwargs)

Gets a Queue by the given name or creates it if it doesn’t exist

Parameters:
  • name (str) –

  • meta (dict) – metadata about the queue

Returns:

Queue

get(guid, *args, **kwargs)

Gets an instance of self.entity_class by the given guid.

Parameters:

guid – The guid of the entity to get

Returns:

An instance of self.entity_class

save(entity, *args, **kwargs)

Saves entity to the database.

Returns:

The updated instance of entity

find(query=None, sort=None, skip=0, limit=25, many=True, include_session_tags=True, **kwargs)

Method for listing entities.

Parameters:
  • query – The optional Mongo query to find entities

  • sort – The keys to sort by

  • skip – The number of results to skip (offset) by

  • limit – How many results should the response be limited to

  • many – Should many entities be returned or just a single one

  • include_session_tags – Should the tags for the session be included in the query. Default is True

  • kwargs – kwargs to query by

Returns:

find_one(query, **kwargs)

Method for returning a unique entity. Will return the entity that matches the query

Parameters:

query (dict) – The mongoDB query

Returns:

an instance of self.entity_class

Raises:
reload(entity, *args, **kwargs)

Reloads the current entity from the API

Parameters:

entity – An instance of self.entity_class

Returns:

A refreshed instance of self.entity_class

aggregate(aggregate, **kwargs)

Method for performing an aggregation on data. See https://docs.mongodb.com/manual/core/aggregation-pipeline/. If a subset of the data is required then provide the query within the aggregation pipeline

Parameters:

aggregate (list) – The aggregation pipeline

Returns:

AggregationResult

class pyqalx.core.adapters.set.QalxSet(session, *args, **kwargs)

Provides an interface for accessing the API for Set entities

child_entity_class

alias of Item

add(items, meta=None, blueprint_name=None, **kwargs)

When adding a Set ensure that the items posted to the api are in the format {<key>: Item}

Parameters:
  • items (dict) – A dictionary of Items to create on the set

  • meta (dict) – A dictionary of metadata to store on this set

  • blueprint_name (str) – An optional blueprint name to use if you want to validate this set against an existing Blueprint

Returns:

Set

get(guid, child_fields=None, unpack=True, *args, **kwargs)

Gets an instance of self.entity_class by the given guid.

Parameters:

guid – The guid of the entity to get

Returns:

An instance of self.entity_class

save(entity, blueprint_name=None, *args, **kwargs)

When saving entities that inherit from this class, pyqalx will get the blueprint based on the blueprint_name and then ensure that the schema validates correctly against the data that you are updating the entity with

Parameters:
  • entity – The entity instance that you are saving

  • blueprint_name (str, None) – the name of the blueprint

Returns:

A valid self.entity_class instance

Raises:

jsonschema.ValidationError

find(query=None, sort=None, skip=0, limit=25, many=True, include_session_tags=True, **kwargs)

Return multiple packed entities from the API

Parameters:
  • query (dict) – The optional Mongo query to find entities

  • sort (list) – The keys to sort by

  • skip (int) – The number of results to skip (offset) by

  • limit (int) – How many results should the response be limited to

  • many (bool) – Should many entities be returned or just a single one

  • include_session_tags (bool) – Should the tags for the session be included in the query. Default is True

  • child_fields (list) – A list of fields that should be returned from child entities

Returns:

A list of entities

find_one(query, unpack=True, **kwargs)

Method for returning a unique entity. Will return the entity that matches the query

Parameters:
  • query (dict) – The mongoDB query

  • unpack (bool) – Should any child entities be automatically unpacked?

Returns:

an instance of self.entity_class

Raises:
reload(entity, unpack=True, *args, **kwargs)

Reloads the current entity from the API.

Parameters:
Returns:

Set

aggregate(aggregate, **kwargs)

Method for performing an aggregation on data. See https://docs.mongodb.com/manual/core/aggregation-pipeline/. If a subset of the data is required then provide the query within the aggregation pipeline

Parameters:

aggregate (list) – The aggregation pipeline

Returns:

AggregationResult

class pyqalx.core.adapters.worker.QalxWorker(session, *args, **kwargs)

Provides an interface for accessing the API for Worker entities

signal_class

alias of QalxWorkerSignal

list_endpoint(*args, bot_entity, **kwargs)

Builds the list_endpoint for workers. This requires the bot_entity which will get passed down from the calling method via kwargs. :param bot_entity:An instance of Bot :type bot_entity:~entities.bot.Bot :return:The Worker list endpoint

detail_endpoint(guid, *args, bot_entity, **kwargs)

Builds the detail_endpoint for workers. This requires the bot_entity which will get passed down from the calling method via kwargs.

Parameters:
  • guid (uuid) – The guid of the Worker

  • bot_entity (Bot) – An instance of Bot

Returns:

The Worker list endpoint

get(guid, bot_entity, *args, **kwargs)

Gets an individual worker.

Parameters:
  • guid (uuid) – The guid of the Worker

  • bot_entity (Bot) – An instance of Bot

Returns:

Worker

get_signal(entity, bot_entity, *args, **kwargs)

Gets the signal for a specific entity

Parameters:
  • entity (Worker) – An instance of Worker

  • bot_entity (Bot) – An instance of Bot

Returns:

QalxWorkerSignal

terminate(entity, bot_entity, *args, **kwargs)

Terminates a specific entity

Parameters:
  • entity (Worker) – An instance of Worker

  • bot_entity (Bot) – An instance of Bot

reload(entity, bot_entity, **kwargs)

Reloads the current entity from the API

Parameters:
  • entity (Worker) – An instance of Worker

  • bot_entity (Bot) – An instance of Bot

Returns:

Worker

update_status(entity, bot_entity, status)

Updates the workers status

Parameters:
  • entity (Worker) – An instance of Worker

  • bot_entity (Bot) – An instance of Bot

  • status (str) – The status to update to

stop(entity, bot_entity, **kwargs)

Sends a stop signal to the worker

Parameters:
  • entity (Worker) – An instance of Worker

  • bot_entity (Bot) – An instance of Bot

resume(entity, bot_entity, **kwargs)

Sends a resume signal to the worker

Parameters:
  • entity (Worker) – An instance of Worker

  • bot_entity (Bot) – An instance of Bot

save(entity, bot_entity, *args, **kwargs)

Saves the Worker instance

Parameters:
  • entity (Worker) – An instance of Worker

  • bot_entity (Bot) – An instance of Bot

Returns:

Worker

Entities

Root entities - these are children of dict but with special methods. They allow for attribute and key lookup

class pyqalx.core.entities.bot.Bot(*args, **kwargs)

QalxEntity with entity_type Bot

class pyqalx.core.entities.entity.QalxAddict(*args, **kwargs)

A qalx subclass of addict.Dict. Enables us to have entities that operate like normal dicts, but allow dot notation for all the keys. Ensures that if something is passed to it that is already an instance of QalxEntity that we don’t mutate the type to a QalxAddict instance

class pyqalx.core.entities.entity.BaseQalxEntity(*args, **kwargs)

The base class for all QalxEntity instances. Operates like a normal dict but utilises addict.Dict to allow a user to do dot notation lookup of all keys recursively.

class pyqalx.core.entities.entity.QalxListEntity(pyqalxapi_list_response_dict, **kwargs)

Simple wrapper around a pyqalxapi_dict so we can keep extra keys on the API list response. Instantiates each entity in data to the correct QalxEntity subclass.

Parameters:

pyqalxapi_list_response_dict (dict) – A dict response from a REST API list endpoint

class pyqalx.core.entities.entity.AggregationResult(pyqalxapi_list_response_dict, **kwargs)

Class for aggregation result responses from the API

Parameters:

pyqalxapi_list_response_dict (dict) – A dict response from a REST API list endpoint

class pyqalx.core.entities.entity.QalxEntity(*args, **kwargs)

Base class for a response from the qalx API

QalxEntity children need to be populated with either a requests.models.Response which is the type returned by the methods on PyQalxAPI or with a dict.

Entities can behave either like a dict or attribute lookups can be used as getters/setters

>>> class AnEntity(QalxEntity):
...     pass
>>> c = AnEntity({"guid":"123456789", "info":{"some":"info"}})
>>> # dict style lookups
>>> c['guid']
'123456789'
>>> # attribute style lookups
>>> c.guid
'123456789'
Parameters:

pyqalxapi_dict (dict) – a ‘dict’ representing a qalx entity object to populate the entity

classmethod pluralise()

Pluralises the entity type

class pyqalx.core.entities.entity.QalxFileEntity(*args, **kwargs)

QalxFileEntity subclasses can be provided with a keyfile kwarg. This gets set on the instance of the entity and will allow for seamless decryption of files

read_file(retry=True)

If this Item contains a file, will read the file data and cache it against the Item.

Parameters:

retry (bool) – Should the download retry if there is a problem getting the file from S3? By default, the download will retry once

Returns:

The content of the URL as a bytes object. Accessible from the _file_bytes attribute

Raises:

QalxError

save_file_to_disk(filepath, filename=None)

If this Item contains a file, will read the file from the URL (or from the cached bytes on the instance) and save the file to disk. Provide an optional filename argument if you don’t want to use the same filename as the one stored on the Item

Parameters:
  • filepath (str) – The path where this file should be saved

  • filename (str) – The optional name of this file. Defaults to the name of the file on the instance

  • key_file (str) – The path to the encryption key to be used for decrypting the file data. If it is not provided and the data was encrypted, the data will be read in the encrypted format

Returns:

Path to file on disk

Raises:

QalxError

class pyqalx.core.entities.entity.QalxQueueableEntity(*args, **kwargs)

A mixin which allows an entity to be sumitted to queue

classmethod add_to_queue(payload, queue, children=False, **message_kwargs)

Submits an entity, entity guid or list of entities/entity guids to the given queue

Example usage for an Item:

>>> Item.add_to_queue(item, queue)
>>> Item.add_to_queue(item.guid, queue)
>>> Item.add_to_queue([item, item.guid], queue)
Parameters:
  • payload – subclassed instance of QalxEntity, a guid, or a list containing a combination of both

  • queue (Queue) – An instance of Queue

  • children (bool) – Whether we are submitting the child entities to the queue rather than the given entity

class pyqalx.core.entities.group.Group(*args, **kwargs)

A group is a collection of sets. These are useful for sending a lot of sets to a queue and being able to track when they have all been processed.

>>> from pyqalx import QalxSession
>>> qalx = QalxSession()
>>> steel = qalx.item.add(source="path/to/316_datasheet.pdf",
...                       data={"rho":8000, "E":193e9},
...                       meta={"library":"materials", "family":"steel",
...                             "grade":"316"})
>>> steel_squares = {}
>>> for size in range(2, 500):
...     dims = qalx.item.add(data={"height":size, "width":size},
...                          meta={"shape":"square"})
...     steel_square_set = qalx.set.add(items={"shape":dims, "material":steel},
...                                     meta={"profile":"square_steel"})
...     steel_squares[size] = steel_square_set
>>> all_squares = qalx.group.add(sets=steel_squares)
child_entity_class

alias of Set

class pyqalx.core.entities.item.Item(*args, **kwargs)

An item is the core of qalx.

They are structured data or a file or some combination of the two. For example:

>>> from pyqalx import QalxSession
>>> qalx = QalxSession()
>>> dims = qalx.item.add(data={"height":5, "width":5}, meta={"shape":"square"})
>>> steel = qalx.item.add(source="path/to/316_datasheet.pdf",
...                       data={"rho":8000, "E":193e9},
...                       meta={"library":"materials",
...                             "family":"steel",
...                             "grade":"316"})

We can then use the find_one and find methods to search for items

>>> from pyqalx import QalxSession
>>> qalx = QalxSession()
>>> steel_316_item = qalx.item.find_one(query={"metadata.data.library": "materials",
...                                            "metadata.data.family": "steel",
...                                            "metadata.data.grade": "316"})
>>> steels = qalx.item.find(query={"metadata.data.family": "steel"})
>>> squares = qalx.item.find(query={"metadata.data.shape": "square"})
>>> quads = qalx.item.find(query={"$or": [{"metadata.data.shape": "square"},
...                                       {"metadata.data.shape": "rectangle"}]})

We can edit an item once we have retrieved it and save it back to qalx. You can either use attribute style getters/setters (my_shape.data.height = 10) or key style getters/setters (my_shape[‘data’][‘height’] = 10)

>>> from pyqalx import QalxSession
>>> qalx = QalxSession()
>>> my_shape = qalx.item.find_one(query={"data.height": 5, "data.width": 5})
>>> my_shape.data.height = 10
>>> my_shape.meta.shape = 'rectangle'
>>> qalx.item.save(my_shape)
>>> # If we think that someone else might have edited my_shape we can reload it:
>>> my_shape = qalx.item.reload(my_shape)

QalxFileEntity subclasses can be provided with a keyfile kwarg. This gets set on the instance of the entity and will allow for seamless decryption of files

class pyqalx.core.entities.item.ItemAddManyEntity(pyqalxapi_list_response_dict, **kwargs)

The entity that gets returned when the add_many() method is called

property items_

Helper to allow easier lookup for items. A user can continue to do resp[‘items’] should they choose - but using this property leads to neater code.

Usage: resp.items_

Returns:

dict

Raises:

KeyError if items key doesn’t exist on ItemAddManyEntity

class pyqalx.core.entities.queue.QueueResponse(response)

a response from a remote queue

make new response

Parameters:

response – response from queue

class pyqalx.core.entities.queue.QueueMessage(message, worker, visibility, *args, **kwargs)

a message from the queue. subclasses threading.Thread so that a heartbeat can be started as soon as the message is received from the Queue

Parameters:
  • message (dict) – the full raw message from the queue

  • worker (Worker) – An instance of :class:~pyqalx.bot.Worker

  • visibility (int) – How long should the heartbeat “beat” for (in seconds)?

run()

A QalxJob might take a long time to process. Therefore, we use a heartbeat on the message to keep it in flight for as long as possible. This runs in a Thread on QueueMessage instantiation to ensure that the message timeout doesn’t expire before processing even begins (i.e. if a batch of messages was returned)

As per the SQS docs here

This will specify the initial visibility timeout (for example, 2 minutes) and then — as long as your consumer still works on the message — keep extending the visibility timeout by 2 minutes every minute. This uses visibility as the initial visibility timeout so every visibility / 2 seconds will increase the timeout by visibility * number of hearbeats from the initial visibility time.

class pyqalx.core.entities.queue.Queue(*args, **kwargs)

QalxEntity with entity_type Queue

property broker_client

An authenticated client to communicate with the remote message broker.

Windows can’t pickle the client if it gets set on __init__ therefore, we set it to an internal attribute and cache it for the duration of the Queue to avoid getting the client multiple times

get_messages(max_num_msg, visibility, waittime, worker)

get messages from the queue

Parameters:
  • max_num_msg (int) – maximum number to retrieve

  • visibility (int) – time in seconds until the message becomes visible again on the queue

  • waittime (int) – time to wait for a message

  • worker (Worker) – The Worker instance that called this method

Returns:

List of QueueMessage

delete_message(message)

remove message from the queue

Parameters:

message (QueueMessage) – message to delete

purge()

Purge the queue. All messages on the queue will be deleted.

This might take up to 60 seconds and any messages sent to the queue while it is being purged might also be deleted

class pyqalx.core.entities.set.Set(*args, **kwargs)

A set is simply a collection of items.

They are mapped with keys so that you can retrieve specific items from the set later:

>>> from pyqalx import QalxSession
>>> qalx = QalxSession()
>>> dims = qalx.item.add(data={"height":5, "width":5}, meta={"shape":"square"})
>>> steel = qalx.item.add(source="path/to/316_datasheet.pdf",
...                       data={"rho":8000, "E":193e9},
...                       meta={"library":"materials",
...                             "family":"steel", "grade":"316"})
>>> steel_square_set = qalx.set.add(items={"shape":dims, "material":steel},
...                                 meta={"profile":"square_steel"})

As with items we can then use the find_one and find methods to search for sets and easily get the item data:. You can use attribute style getters/setters or key style getters/setters

>>> from pyqalx import QalxSession
>>> qalx = QalxSession()
>>> my_set = qalx.set.find_one(query={"metadata.data.profile": "square_steel"})
>>> # Key style
>>> youngs_mod = my_set.get_item_data("material")['E']
>>> # Attribute style
>>> younds_mod = my_set.get_item_data("material").E

Note

Sets store a reference to an item rather than a copy of the item. Changes to the item will be reflected in all sets containing that item.

child_entity_class

alias of Item

get_item_data(item_key)

helper method to get data from an item in the set

Parameters:

item_key (str) – key on the set to add the item as

Returns:

dict or None if key is missing

property items_

Helper to allow easier lookup for items. A user can continue to do myset[‘items’] should they choose - but using this property leads to neater code.

Usage: myset.items_

Returns:

dict

Raises:

KeyError if items key doesn’t exist on Set

class pyqalx.core.entities.worker.Worker(*args, **kwargs)

QalxEntity with entity_type Worker

class pyqalx.core.entities.blueprint.Blueprint(*args, **kwargs)

Provides an interface for creating blueprints. Blueprints allow a user to create items or sets in a consistent fashion

classmethod item()

A blueprint that gives the default blueprint structure for an Item entity

classmethod set()

A blueprint that gives the default blueprint structure for an Set entity

check_schema(schema=None)

Checks that the given schema is structure correctly

Parameters:

schema (None or Blueprint) – The schema to validate. Uses self if None

Raises:

jsonschema.SchemaError

validate(entity)

Validate an entity against the blueprint schema locally.

Parameters:

entity – A full schema for an entity that can have a Blueprint

add_schema(schema_value)

Update the entire Blueprint schema at once. Will completely overwrite all properties that are already on this Blueprint instance

Parameters:

schema_value – A full schema for an entity that can have a Blueprint

Returns:

An updated instance of the blueprint

add_data(key, schema_value, required=False)

Adds data to this item blueprint instance.

Parameters:
  • key (str) – The key in the data you want to add

  • schema_value (str) – The schema value you want to add

  • required (bool) – Is this key required?

Returns:

An updated instance of the item blueprint

add_meta(key, schema_value, required=False)

Adds meta to this item blueprint instance.

Parameters:
  • key (str) – The key in the meta you want to add

  • schema_value (str) – The schema value you want to add

  • required (bool) – Is this key required?

Returns:

An updated instance of the item blueprint

add_item(key, item_blueprint=None, required=False)

Adds an item to this set blueprint instance. If item_blueprint is provided then the set schema will always require that specific item blueprint for key. If item_blueprint is not provided then the set blueprint will accept any data for key when being used

Parameters:
  • key (str) – The key of the item you want the blueprint to validate

  • item_blueprint (str) – If provided will save the name from the blueprint against the key. This means that whenever the set is created you must use the same blueprint.

  • required (bool) – Is this key required?

Returns:

An updated instance of the set blueprint

Factories

Factories provide an automated way to create multiple bots - both locally and on remote servers

Factory module. Contains the main factory class

class pyqalx.factories.factory.Factory(session, user_profile='default', bot_profile='default', aws_profile='default')

Main factory class. Inherits from BaseFactory and contains methods for specific functionality related to the distinct stages of a factory

Parameters:
  • session (QalxSession) – An instance of QalxSession

  • user_profile (str) – The user profile to use

  • bot_profile (str) – The bot profile to user

  • aws_profile (str) – The AWS profile to use

validate(plan)

Calls the validate method on the adapter

Parameters:

plan (str) – Path to a .yaml file containing the definition of the factory plan

Raises:

QalxFactoryValidationError – If there is a problem validating the plan

pack(plan, stage=None, delete=True)

Packs a factory. Calls validate method on the plan provided

Parameters:
  • plan (str) – Path to a .yaml file containing the definition of the factory plan

  • stage (str) – Stage to be packed. This argument is optional. If None is provided, then all stages will be packed

  • delete (bool) – Should the packed code be deleted when this command completes?

Raises:

QalxFactoryPackError – If a stage is provided and it cannot be found on the plan, or if there are any issues creating the build directory

build(plan, stage)

Builds a factory. Calls pack method with the provided plan and stage.

Parameters:
  • plan (str) – Path to a .yaml file containing the definition of the factory plan

  • stage (str) – Stage to be built

Raises:

QalxFactoryBuildError

  • if there is a problem with the build

demolish(factory_guid)

Demolishes a factory

Param:

factory_guid: The guid of the factory entity to demolish

Transport

The transport layer of pyqalx. Links adapters to the REST API

class pyqalx.transport.api.PyQalxAPI(session)

The main interface between adapters and the REST API.

Usage:

>>> from pyqalx import QalxSession
>>> qalx = QalxSession()
>>> qalx.rest_api
get(endpoint, **kwargs)

Performs a GET request to the specified endpoint

Parameters:
  • endpoint (str) – The endpoint you want to query

  • kwargs (dict) – Any arguments you want to pass to the request.

Returns:

tuple of (bool, dict). The first element is whether the response was successful, the second element is the response data

post(endpoint, **kwargs)

Performs a POST request to the specified endpoint

Parameters:
  • endpoint (str) – The endpoint you want to query

  • kwargs (dict) – Any arguments you want to pass to the request.

Returns:

tuple of (bool, dict). The first element is whether the response was successful, the second element is the response data

patch(endpoint, **kwargs)

Performs a PATCH request to the specified endpoint

Parameters:
  • endpoint (str) – The endpoint you want to query

  • kwargs (dict) – Any arguments you want to pass to the request.

Returns:

tuple of (bool, dict). The first element is whether the response was successful, the second element is the response data

put(endpoint, **kwargs)

Performs a PUT request to the specified endpoint

Parameters:
  • endpoint (str) – The endpoint you want to query

  • kwargs (dict) – Any arguments you want to pass to the request.

Returns:

tuple of (bool, dict). The first element is whether the response was successful, the second element is the response data

delete(endpoint, **kwargs)

Performs a DELETE request to the specified endpoint

Parameters:
  • endpoint (str) – The endpoint you want to query

  • kwargs (dict) – Any arguments you want to pass to the request.

Returns:

tuple of (bool, dict). The first element is whether the response was successful, the second element is the response data

exception pyqalx.transport.core.PyQalxAPIException

A generic error for PyQalxAPI

class pyqalx.transport.core.BasePyQalxAPI(session)

Core functionality for the transport layer. Handles the building of the request and the response

property token

Returns the current API Token that is in use for this session

Returns:

str

static is_filestream(input_object)

Determines whether the given input object is a file stream or a filepath

Parameters:

input_object – An instance of IOBase or a filepath

Type:

io.IOBase or str

Returns:

bool

Config

class pyqalx.config.configs.Config
property defaults

Returns a dict of default values for config options from ~pyqalx.config.defaults.__init__

:return : dict of options

classmethod configure(profile_name, config_items=None)

When given a profile name and dict of config items will write the config file to disk. If a profile_name is given that already exists on the profile then the profile_name on the profile will be completely replaced with the values from config_items

Parameters:
  • profile_name (str) – The name of the profile to write

  • config_items (dict) – A dict of items that should be written to the config

class pyqalx.config.configs.BotConfig

Works exactly like a dict but provides ways to fill it from ini files and environment variables. There are two common patterns to populate the config.

Either you can add them to the .bots file:

>>> bot.config.from_botsfile(profile_name=BotConfig.default)

Or alternatively you can define the configuration from environment variables starting with QALX_BOT_. These will be populated automatically but values defined in .bots will overwrite these if bot.config.from_botsfile() is called.

To set environment variables before launching the application you have to set this environment variable to the name and value you want to use. On Linux and OS X use the export statement:

>>> export QALX_BOT_LICENCE_FILE='/path/to/licence/file'

On windows use set instead.

property defaults

Loads the default bot config from ~pyqalx.config.defaults.bots. Any config items that exists in ~pyqalx.config.defaults.__init__ will be overwritten by config values that exist in ~pyqalx.config.defaults.bots

Returns:

dict of default config options

class pyqalx.config.configs.UserConfig

Works exactly like a dict but provides ways to fill it from ini files and environment variables. There are two common patterns to populate the config. Either you can add them to the .qalx_bot file:

>>> qalx_bot.config.from_qalxfile(profile_name=UserConfig.default)

Or alternatively you can define the configuration from environment variables starting with QALX_USER_. These will be populated automatically but values defined in .qalx will overwrite these if qalx_bot.config.from_qalxfile() is called.

To set environment variables before launching the application you have to set this environment variable to the name and value you want to use. On Linux and OS X use the export statement:

>>>export QALX_USER_EMPLOYEE_NUMBER=1280937

On windows use set instead.

Logging

class pyqalx.core.log.QalxRotatingFileHandler(*args, **kwargs)

We always force some defaults for a qalx log

Open the specified file and use it as the stream for logging.

By default, the file grows indefinitely. You can specify particular values of maxBytes and backupCount to allow the file to rollover at a predetermined size.

Rollover occurs whenever the current log file is nearly maxBytes in length. If backupCount is >= 1, the system will successively create new files with the same pathname as the base file, but with extensions “.1”, “.2” etc. appended to it. For example, with a backupCount of 5 and a base file name of “app.log”, you would get “app.log”, “app.log.1”, “app.log.2”, … through to “app.log.5”. The file being written to is always “app.log” - when it gets filled up, it is closed and renamed to “app.log.1”, and if files “app.log.1”, “app.log.2” etc. exist, then they are renamed to “app.log.2”, “app.log.3” etc. respectively.

If maxBytes is zero, rollover never occurs.

rotate(*args, **kwargs)

Overridden in order to ignore permission errors. This could happen when multiple processes are accessing the same log file. If this happens, rotation is suppressed until the lock is dropped. The log files will exceed the “maxBytes” value but logging will continue normally.

class pyqalx.core.log.QalxLogFormatter(style='%', *args, **kwargs)

Sometimes our log messages contain extra data. Use this formatter to allow all handlers to use the same format while also handling the potential extra data

Initialize the formatter with specified format strings.

Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.

Use a style parameter of ‘%’, ‘{’ or ‘$’ to specify that you want to use one of %-formatting, str.format() ({}) formatting or string.Template formatting in your format string.

Changed in version 3.2: Added the style parameter.

format(record)

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

pyqalx.core.log.configure_logging(name, filename, level)

Configures a log. Loggers are tied directly to an instance of QalxSession or Worker and should be accessed via instance.log.<level>. All pyqalx loggers are prefixed with <LOGGER_NAME_PREFIX>

Parameters:
  • name (str) – The name of the log

  • filename (str) – The full filename for where the log should be saved

  • level (str) – The level of the log

Returns:

logging.getLogger()

pyqalx.core.log.session_logging_suffix(config)

Returns the suffix to use for QalxSession logging instances based on the config

Parameters:

config (Config) – A subclass of Config

Returns:

The key_prefix split. i.e. user for QALX_USER_ and bot for QALX_BOT_

pyqalx.core.log.get_logger()

For all the given configured loggers will try and get a user session log to log the unhandled exception to. If there are multiple configured user sessions just grab the first one. If there are no user sessions configured (i.e. the error occurred before QalxSession was instantiated) then return the root log

pyqalx.core.log.exception_hook(exc_type, exc_value, exc_traceback, extra=None, log=None)

We want to log any unhandled exception that occurs in pyqalx. This includes Qalx* errors (i.e. because a user has misconfigured something) and also if they have made an error in one of their decorated functions (i.e. @bot.process). This catches all errors and logs them and also prints out the traceback

Signals

class pyqalx.core.signals.QalxSignal(entity)

A set of helper methods to aid in managing signals on Workers

property terminate

Checks whether the signal was a terminate signal

property terminate_cold

Checks whether the signal was a cold terminate signal

property terminate_warm

Checks whether the signal was a warm terminate signal

property requeue_job

Checks whether the signal wants the job to be requeued

property stop

Checks whether the signal wants to stop the job

class pyqalx.core.signals.QalxWorkerSignal(entity)

Handles signals for individual instances of Worker

Errors

pyqalx.core.errors defines QalxError exception and a load of children.

If pyqalx is going raise an error you know about then use one of these or create a new one.

exception pyqalx.core.errors.QalxError

Base qalx error. Take responsibility!

exception pyqalx.core.errors.QalxAuthError

qalx did not find a way to authenticate or the authentication didn’t work

exception pyqalx.core.errors.QalxNoGUIDError

A QalxEntity without a guid is like a dog without a bone.

exception pyqalx.core.errors.QalxNoInfoError

A QalxEntity without info is like a dog without a bone.

exception pyqalx.core.errors.QalxConfigProfileNotFound

The profile wasn’t in the file or the file wasn’t properly formed.

exception pyqalx.core.errors.QalxConfigFileNotFound

There should be a file in the users home directory (either a .bots or .qalx)

exception pyqalx.core.errors.QalxQueueError

There wasn’t the correct information to connect to the remote queue.

exception pyqalx.core.errors.QalxBotInitialisationFailed

The bot initialisation function returned something falsey.

exception pyqalx.core.errors.QalxEntityTypeNotFound

We couldn’t find the entity type you were looking for.

exception pyqalx.core.errors.QalxEntityNotFound

We couldn’t find the entity you were looking for.

exception pyqalx.core.errors.QalxMultipleEntityReturned

We found more than one entity, but you just wanted the one hey?

exception pyqalx.core.errors.QalxConfigError

Something about an attempted load of config didn’t work

exception pyqalx.core.errors.QalxAPIResponseError

There was a problem with some kind of API request.

exception pyqalx.core.errors.QalxEntityUnchanged

Saved something which hadn’t actually been changed when we thought it had.

exception pyqalx.core.errors.QalxInvalidSession

The qalx_session argument passed to an adapter isn’t a valid QalxSession instance

exception pyqalx.core.errors.QalxNoEntity

A user tried to access the entity attribute on QalxAdapter when they hadn’t set an entity

exception pyqalx.core.errors.QalxIncorrectEntityType

The entity on the QalxAdapter is of a different type to the type of adapter

exception pyqalx.core.errors.QalxFileError

There is something wrong with the file details passed to the QalxAdapter

exception pyqalx.core.errors.QalxAlreadyRegistered

The entity is already registered with the session

exception pyqalx.core.errors.QalxCannotUnregister

The entity cannot be unregistered

exception pyqalx.core.errors.QalxRegistrationClassNotFound

The registration class for registering a custom class was not found

exception pyqalx.core.errors.QalxInvalidBlueprintError

The blueprint is not valid.

exception pyqalx.core.errors.QalxInvalidTagError

The user does not have access to write to the specific tags

exception pyqalx.core.errors.QalxStepFunctionNotDefined

A specific Bot step function has not been defined by the user

exception pyqalx.core.errors.QalxFileRetrievalError

Error when retrieving a file from qalx

exception pyqalx.core.errors.QalxFactoryError

Generic error with a Qalx factory

exception pyqalx.core.errors.QalxFactoryValidationError

Error when validating a factory

exception pyqalx.core.errors.QalxFactoryPackError

Error when packing a factory

exception pyqalx.core.errors.QalxFactoryBuildError

Error when building a factory

exception pyqalx.core.errors.QalxFactoryDemolishError

Error when demolishing a factory

Bot API

pyqalx.bot.cannot_be_undefined(*args, **kwargs)

Raises an exception if a step function cannot be left undefined. Handled here to avoid a user trying to call the specific step function directly.

class pyqalx.bot.QalxJob(worker_process)

a Job to process

Parameters:

worker_process (Worker) – The instance of the worker that is running this job

property log

An instance of the workers log to enable logging from step functions

property e

shortcut to QalxJob().entity

property s

shortcut to QalxJob().session

property queue

shortcut to the queue instance on the bot entity

stop_processing()

Stop the worker from processing. Will cause the worker to wait until it receives a resume signal

resume_processing()

Resume the worker.

terminate(warm=True, requeue_job=False)

The job has been told to terminate the worker

Parameters:
  • warm (bool) – If True will completely finish running through the processes for the current job. If False will exit as soon as the current process function is complete

  • requeue_job (bool) – Should the job be requeued if we are terminating with warm=False

publish_status(status)

update the status of the worker

Parameters:

status (str) – short message about the worker status

add_step_result(success=True)

indicate success or failure of the step function

Parameters:

success (bool) – was the step successful or not

property last_step_result

The success flag for the previous step result (or None if no previous steps) for this job

get_entity(entity_type, entity_guid)

get an entity

Parameters:
  • entity_type (str) – the type of entity e.g. item, set, group

  • entity_guid (str) – the guid of the entity

Returns:

a child of QalxEntity if the entity is found on the API, otherwise None

add_item_data(item_key, data, meta=None)

helper method to add item data to the set

Parameters:
  • item_key (str) – key on the set to add the item as

  • data (dict) – data to add

  • meta (dict) – metadata to add

Returns:

The specific Item instance that has been added

save_entity()

Updates the API representation of this entity with the data currently stored in the job.

Returns:

a child of QalxEntity of the correct type based on the type of self.entity

reload_entity()

Reloads self.entity from the API

Returns:

a child of QalxEntity of the correct type based on the type of self.entity

delete_message()

Helper method for deleting a message from a Queue

class pyqalx.bot.Bot(bot_name)

Bots allow automation of tasks. A bot reads messages from a Queue and processes the response through user configured step_functions.

Parameters:

bot_name (str) – The unique name for this bot

start(queue_name, processes=1, qalx_session_class=None, user_config_class=None, bot_config_class=None, entity_classes=None, user_profile_name='default', bot_profile_name='default', skip_ini=False, stop=False, meta=None, workflow=None)

Starts the bot with processes number of Worker instances. Each worker will be able to read a message from the queue in parallel

Parameters:
  • queue_name (str) – The unique name for the queue

  • qalx_session_class (pyqalx.core.session.QalxSession) – The QalxSession class that this Bot and Workers should use

  • user_config_class (pyqalx.config.configs.UserConfig) – The UserConfig class that will be used when creating the Bot

  • bot_config_class (pyqalx.config.configs.BotConfig) – The BotConfig class that will be used when using the bot session

  • entity_classes (list) – A list of QalxEntity subclasses that you want this Bot to use once it is started

  • user_profile_name (str) – The user profile name that will be used when creating the Bot

  • bot_profile_name (str) – The bot profile name that will be used when using the bot session

  • skip_ini (bool) – Should reading from the ini file be skipped when instantiating the user and bot sessions

  • processes (int) – The number of Worker processes this bot should spawn

  • stop (bool) – Should the bot be stopped right after startup. This is essential functionality during factory building

  • meta (dict) – Metadata to be saved on the bot. Optional parameter

  • workflow (list) – An iterable of other bots that defines the interaction of the bot with other bots in the context of a workflow

initialisation(func)

decorator to to register the initialisation

initialisation is executed with a QalxSession instance authenticated againt the user_config_class __init__ argument :param func: function to be executed

begin(func)

decorator to to register the begin

begin is executed with a QalxJob instance

Parameters:

func – function to be executed

preload(func)

decorator executed before entity is loaded from qalx.

preload is executed with a QalxJob instance

Parameters:

func – function to be executed

onload(func)

decorator to register the onload

onload is executed with a QalxJob instance

Parameters:

func – function to be executed

preprocess(func)

decorator to register preprocess function

preprocess is executed with a QalxJob instance

Parameters:

func – function to register

process(func)

decorator to register process function

process is executed with a QalxJob instance

Parameters:

func – function to register

precompletion(func)

decorator to register precompletion function

precompletion is executed with a QalxJob instance

Parameters:

func – function to register

postprocess(func)

decorator to register postprocess function

postprocess is executed with a QalxJob instance

Parameters:

func – function to register

onwait(func)

decorator to register onwait function

onwait is executed with a QalxJob instance

Parameters:

func – function to register

ontermination(func)

decorator to register ontermination function

ontermination is executed with a QalxJob instance

Parameters:

func – function to register

class pyqalx.bot.Worker(steps, queue, bot_entity, api_entity, worker_adapter: QalxWorker, index, workflow, **kwargs)

exit codes (start at 72590):

  • 72590 base exit code

  • 72591 warm exit

  • 72592 cold exit

  • 72593 config triggered (e.g. KILL_AFTER)

configure_logging()

Configures the log for the Worker. Each Worker logs to its own file. This prevents issues with multiple processes trying to write to the same file

run()

Run our custom functions. Catch any exception that may occur with it and pass that to our custom uncaught exception handler