API Documentation¶
Core API¶
Sessions¶
The best way to interface between entities and qalx is with the session object. This has a series of helper methods to interface with the API using entity specific adapters
- class pyqalx.core.session.QalxSession(profile_name='default', config_class=None, skip_ini=False, rest_api_class=None, project_name=None)¶
The session that any interaction with the API will use.
- Parameters:
profile_name (str) – profile name to get from config_class (default=Config.default)
config_class (Config) – The class for the config that will be used for this session
skip_ini (bool) – Should loading the config from the inifile be skipped?
rest_api_class (PyQalxAPI) – The class to use for the rest api
project_name (str) – An optional name of a project. This will be used to tag all data that is created/modified with this session
- configure_logging()¶
Configures the log for this session. A log is namespaced for the config type and the profile name to allow a user to have multiple session instances with different configs/profiles logging to different places.
- Returns:
An instance of logging.getLogger
- property log¶
Returns an instance of logging.getLogger for this QalxSession instance This cannot be set as an instance attribute due to pickling issues on windows. Will log depending on the LOGGING_LEVEL config value.
Usage:
>>> QalxSession().log.debug("A debug message") >>> QalxSession().log.info("An info message") >>> QalxSession().log.warning("A warning message") >>> QalxSession().log.error("An error message")
- Returns:
logging.getLogger
- register(cls)¶
Registers the given
QalxEntity
subclass with the session. See entities docs for more info- Parameters:
cls (pyqalx.core.entities.entity.QalxEntity) – subclass of QalxEntity.
- unregister(cls)¶
Unregisters the given
QalxEntity
subclass from the session. See entities docs for more info- Parameters:
cls (pyqalx.core.entities.entity.QalxEntity) – subclass of QalxEntity.
- property worker¶
returns a
QalxWorker
adapter for this session- Returns:
Instance of
QalxWorker
- property factory¶
returns a
QalxFactory
adapter for this session- Returns:
Instance of
QalxFactory
- property blueprint¶
returns a
QalxBlueprint
adapter for this session- Returns:
Instance of
QalxBlueprint
- property notification¶
returns a
QalxNotification
adapter for this session- Returns:
Instance of
QalxNotification
Adapters¶
Adapters are how a QalxSession
interfaces with the API. These won’t typically
get accessed directly - instead the helper properties on the
QalxSession
should be used.
- class pyqalx.core.adapters.blueprint.QalxBlueprint(session, *args, **kwargs)¶
Provides an interface for accessing the API for
Blueprint
entities- add(name, schema, meta=None, **kwargs)¶
Adds a
Blueprint
with a valid jsonschema schema. If the schema is invalid then a jsonschema.SchemaError is raised
- save(entity, **kwargs)¶
Saves any updates to the given
Blueprint
. Validates that the schema on the entity is a valid jsonschema schema
- get(guid, child_fields=None, unpack=True, *args, **kwargs)¶
Gets the entity for the given guid and unpacks it if specified. Provide child_fields to restrict the fields returned on unpacked children
- get_by_name(name, **kwargs)¶
gets a single entity by name
- Parameters:
name (str) – name of entity
kwargs – Keyword arguments to be used in the find method
- Returns:
instance of self.entity_class
- Raises:
- get_or_create(name, meta=None, **kwargs)¶
Gets an entity by the given name or creates it if it doesn’t exist
- Parameters:
name (str) – The name that to use to get or create the entity by
meta – metadata about the entity
kwargs – Other kwargs on the entity to use when creating
- Returns:
instance of self.entity_class
- find(query=None, sort=None, skip=0, limit=25, many=True, child_fields=None, include_session_tags=True, *args, **kwargs)¶
Return multiple packed entities from the API
- Parameters:
query (dict) – The optional Mongo query to find entities
sort (list) – The keys to sort by
skip (int) – The number of results to skip (offset) by
limit (int) – How many results should the response be limited to
many (bool) – Should many entities be returned or just a single one
include_session_tags (bool) – Should the tags for the session be included in the query. Default is True
child_fields (list) – A list of fields that should be returned from child entities
- Returns:
A list of entities
- find_one(query, unpack=True, **kwargs)¶
Method for returning a unique entity. Will return the entity that matches the query
- Parameters:
- Returns:
an instance of self.entity_class
- Raises:
- reload(entity, unpack=True, *args, **kwargs)¶
Reloads the current entity from the API
- aggregate(aggregate, **kwargs)¶
Method for performing an aggregation on data. See https://docs.mongodb.com/manual/core/aggregation-pipeline/. If a subset of the data is required then provide the query within the aggregation pipeline
- Parameters:
aggregate (list) – The aggregation pipeline
- Returns:
- cache_kids(kwargs)¶
Override cache_kids because the self.kids key is not where we need to get the children from. Using deepcopy because schema is a mutable object(dict)
- save_kids_on_entity(entity, kids)¶
Override save_kids_on_entity because the self.kids key is not where the children should be saved.
- class pyqalx.core.adapters.bot.QalxBot(session, *args, **kwargs)¶
Provides an interface for accessing the API for
Bot
entities- add(name, config, meta=None, **kwargs)¶
Creates a Bot instance.
- get_by_name(name, **kwargs)¶
gets a single entity by name
- Parameters:
name (str) – name of entity
kwargs – Keyword arguments to be used in the find method
- Returns:
instance of self.entity_class
- Raises:
- get_or_create(name, meta=None, **kwargs)¶
Gets an entity by the given name or creates it if it doesn’t exist
- Parameters:
name (str) – The name that to use to get or create the entity by
meta – metadata about the entity
kwargs – Other kwargs on the entity to use when creating
- Returns:
instance of self.entity_class
- get(guid, *args, **kwargs)¶
Gets an instance of self.entity_class by the given guid.
- Parameters:
guid – The guid of the entity to get
- Returns:
An instance of self.entity_class
- save(entity, *args, **kwargs)¶
Saves entity to the database.
- Returns:
The updated instance of entity
- find(query=None, sort=None, skip=0, limit=25, many=True, include_session_tags=True, **kwargs)¶
Method for listing entities.
- Parameters:
query – The optional Mongo query to find entities
sort – The keys to sort by
skip – The number of results to skip (offset) by
limit – How many results should the response be limited to
many – Should many entities be returned or just a single one
include_session_tags – Should the tags for the session be included in the query. Default is True
kwargs – kwargs to query by
- Returns:
- find_one(query, **kwargs)¶
Method for returning a unique entity. Will return the entity that matches the query
- Parameters:
query (dict) – The mongoDB query
- Returns:
an instance of self.entity_class
- Raises:
- reload(entity, *args, **kwargs)¶
Reloads the current entity from the API
- Parameters:
entity – An instance of self.entity_class
- Returns:
A refreshed instance of self.entity_class
- aggregate(aggregate, **kwargs)¶
Method for performing an aggregation on data. See https://docs.mongodb.com/manual/core/aggregation-pipeline/. If a subset of the data is required then provide the query within the aggregation pipeline
- Parameters:
aggregate (list) – The aggregation pipeline
- Returns:
- replace_workers(bot_entity, workers)¶
Completely replaces any Workers on the given bot. Will return the replaced workers in an unpacked state
- resume(entity, **kwargs)¶
Resumes all the workers on the bot
- Parameters:
entity (
Bot
) – Bot instance
Factory adapter module. Contains the main class that provides factory related interaction with the qalx API: QalxFactory
- class pyqalx.core.adapters.factory.QalxFactory(*args, **kwargs)¶
Provides an interface for accessing the API for
Factory
entities- validate(plan_path)¶
Reads a .yaml file that contains the definition of the factory plan and then runs the factory validation endpoint in the API
- Parameters:
plan_path (str) – A local path to the .yaml file that contains the factory plan definition
- Raises:
QalxFactoryValidationError – If the .yaml file does not exist, if there are any errors parsing the yaml file to a dict, or if the yaml plan structure is invalid
- add(name, source=None, file_name='', meta=None, upload=True, **kwargs)¶
Adds a Factory instance
- Parameters:
name (str) – The name of the factory
source (str or io object) – file path to the factory plan or instance of StringIO or BytesIO (https://docs.python.org/3/library/io.html)
file_name (str) – Name for the factory plan input file. Optional if a file path is given for source parameter. Required if an io object
meta (dict) – A dictionary of metadata to store
upload (bool) – Whether the file should be automatically uploaded or not
- Returns:
A newly created Factory instance
- save(entity, source=None, file_name='', upload=True, *args, **kwargs)¶
Saves an updated existing Factory instance.
- Parameters:
entity (An instance of Factory) – The entity that we are saving
source (str or io object) – file path to the factory plan or instance of StringIO or BytesIO (https://docs.python.org/3/library/io.html)
file_name (str) – Name for the factory plan input file. Optional if a file path is given for source parameter. Required if an io object
upload (bool) – Whether the file should be automatically uploaded or not
- Returns:
An updated Factory instance
- get(guid, *args, **kwargs)¶
Gets an instance of self.entity_class by the given guid.
- Parameters:
guid – The guid of the entity to get
- Returns:
An instance of self.entity_class
- find(query=None, sort=None, skip=0, limit=25, many=True, include_session_tags=True, **kwargs)¶
Method for listing entities.
- Parameters:
query – The optional Mongo query to find entities
sort – The keys to sort by
skip – The number of results to skip (offset) by
limit – How many results should the response be limited to
many – Should many entities be returned or just a single one
include_session_tags – Should the tags for the session be included in the query. Default is True
kwargs – kwargs to query by
- Returns:
- find_one(query, **kwargs)¶
Method for returning a unique entity. Will return the entity that matches the query
- Parameters:
query (dict) – The mongoDB query
- Returns:
an instance of self.entity_class
- Raises:
- reload(entity, *args, **kwargs)¶
Reloads the current entity from the API
- Parameters:
entity – An instance of self.entity_class
- Returns:
A refreshed instance of self.entity_class
- aggregate(aggregate, **kwargs)¶
Method for performing an aggregation on data. See https://docs.mongodb.com/manual/core/aggregation-pipeline/. If a subset of the data is required then provide the query within the aggregation pipeline
- Parameters:
aggregate (list) – The aggregation pipeline
- Returns:
- class pyqalx.core.adapters.group.QalxGroup(session, *args, **kwargs)¶
Provides an interface for accessing the API for
Group
entities- add(sets, meta=None, **kwargs)¶
When adding a Group ensure that the sets posted to the api are in the format {<key>:
Set
}
- get(guid, child_fields=None, unpack=False, *args, **kwargs)¶
Gets the entity for the given guid and unpacks it if specified. Provide child_fields to restrict the fields returned on unpacked children
- reload(entity, unpack=False, *args, **kwargs)¶
Reloads the current entity from the API.
- Parameters:
entity – An instance of self.entity_class
unpack (bool) – Should the child entities by unpacked? Defaults to False
- Returns:
A refreshed instance of self.entity_class
- find(query=None, sort=None, skip=0, limit=25, many=True, child_fields=None, include_session_tags=True, *args, **kwargs)¶
Return multiple packed entities from the API
- Parameters:
query (dict) – The optional Mongo query to find entities
sort (list) – The keys to sort by
skip (int) – The number of results to skip (offset) by
limit (int) – How many results should the response be limited to
many (bool) – Should many entities be returned or just a single one
include_session_tags (bool) – Should the tags for the session be included in the query. Default is True
child_fields (list) – A list of fields that should be returned from child entities
- Returns:
A list of entities
- find_one(query, unpack=False, **kwargs)¶
Method for returning a unique entity. Will return the entity that matches the query
- Parameters:
- Returns:
- Raises:
- aggregate(aggregate, **kwargs)¶
Method for performing an aggregation on data. See https://docs.mongodb.com/manual/core/aggregation-pipeline/. If a subset of the data is required then provide the query within the aggregation pipeline
- Parameters:
aggregate (list) – The aggregation pipeline
- Returns:
- save(entity, *args, **kwargs)¶
When saving an unpacked entity the kids need to be packed. To save having to unpack them again just save the original kids and replace them with the packed values after saving
- Parameters:
entity – An instance of self.entity_class
- Returns:
An updated instance of self.entity_class
- class pyqalx.core.adapters.item.QalxItem(*args, **kwargs)¶
Provides an interface for accessing the API for
Item
entities- add(data: dict | None = None, source: str | StringIO | BytesIO | None = None, file_name: str = '', meta: dict | None = None, blueprint_name: str | None = None, upload: bool = True, encrypt: bool = True, as_file: bool = False, **kwargs)¶
Adds an Item instance that can contain either data (as a dict), a file or both.
- Parameters:
data – Optional data to store against this Item
source – file path or instance of StringIO or BytesIO (https://docs.python.org/3/library/io.html)
file_name – input file name. Optional if a file path is given for source. Required if an io object
meta – A dictionary of metadata to store
blueprint_name – An optional blueprint name to use to validate this item against an existing Blueprint
upload – Whether the file should be automatically uploaded or not
encrypt – Whether the file should be automatically encrypted if the KEYFILE is present
as_file – Whether the data dict should be stored in S3 rather than in Mongo
- Returns:
Item
instance
- save(entity, source=None, file_name='', blueprint_name=None, upload=True, encrypt=True, as_file=False, **kwargs)¶
Saves an updated existing Item instance. To remove a file from an Item update the entity so that entity[‘file’] = {}
- Parameters:
entity (An instance of Item) – The entity that we are saving
source (str or io object) – file path or instance of StringIO or BytesIO (https://docs.python.org/3/library/io.html)
file_name (str) – input file name. Optional if a file path is given for source. Required if an io object
meta (dict) – A dictionary of metadata to store
blueprint_name (str) – An optional blueprint name to use if you want to validate this item against an existing Blueprint
upload (bool) – Whether the file should be automatically uploaded or not
encrypt (bool) – Whether the file should be automatically encrypted if the KEYFILE is present
as_file – Whether the data dict should be stored in S3 rather than in Mongo
- Returns:
An updated Item instance
- add_many(items, encrypt=True, as_file=False, **kwargs)¶
Adds multiple items at once.
- Parameters:
items (list) – A list of item data that you want to create. This can be a mixture of non file items and file items. The keys of each item in the list should be the same as if a single item were being created via the add method
encrypt (bool) – Whether the file should be automatically encrypted if the KEYFILE is present. All files are encrypted or not encrypted based on this value
- Returns:
- find(query=None, sort=None, skip=0, limit=25, many=True, include_session_tags=True, **kwargs)¶
Method for listing entities.
- Parameters:
query – The optional Mongo query to find entities
sort – The keys to sort by
skip – The number of results to skip (offset) by
limit – How many results should the response be limited to
many – Should many entities be returned or just a single one
include_session_tags – Should the tags for the session be included in the query. Default is True
kwargs – kwargs to query by
- Returns:
- find_one(query, **kwargs)¶
Method for returning a unique entity. Will return the entity that matches the query
- Parameters:
query (dict) – The mongoDB query
- Returns:
an instance of self.entity_class
- Raises:
- reload(entity, *args, **kwargs)¶
Reloads the current entity from the API
- Parameters:
entity – An instance of self.entity_class
- Returns:
A refreshed instance of self.entity_class
- aggregate(aggregate, **kwargs)¶
Method for performing an aggregation on data. See https://docs.mongodb.com/manual/core/aggregation-pipeline/. If a subset of the data is required then provide the query within the aggregation pipeline
- Parameters:
aggregate (list) – The aggregation pipeline
- Returns:
- archive(entity, *args, **kwargs)¶
Archives entity
- Parameters:
entity – An instance of self.entity_class
- Returns:
The archived instance of entity
- delete(entity, *args, **kwargs)¶
Deletes the given entity from the API
- Parameters:
entity – An instance of QalxEntity
- class pyqalx.core.adapters.notification.QalxNotification(session, *args, **kwargs)¶
Provides an interface for accessing the API for
Notification
entities- add(subject, message, to, cc=None, bcc=None, **kwargs)¶
Posts a notification request to the API, to send an email with the provided subject, message and recipient list. Optionally, lists for CC and BCC recipients can be provided
- Parameters:
subject (str) – The subject of the message
message (str) – The message to send
to (list) – A list with the addresses of the recipients of the message
cc (list) – An optional list with the addresses of th recipients of a carbon copy(CC) of the message
bcc (list) – An optional list with the addresses of th recipients of a blind carbon copy(BCC) of the message
- class pyqalx.core.adapters.queue.QalxQueue(session, *args, **kwargs)¶
Provides an interface for accessing the API for
Queue
entities- add(name, meta=None, **kwargs)¶
Queues are created with a name. This name is stored in the metadata of the Queue instance
- get_messages(worker)¶
Gets the messages on the Queue instance
- :param worker:An instance of
Worker
that called this method
- Returns:
A list of
QueueMessage
instances
- :param worker:An instance of
- get_by_name(name, **kwargs)¶
a single queue by name
- Parameters:
name (str) – name of queue
- Returns:
- Raises:
- get_or_create(name, meta=None, **kwargs)¶
Gets a Queue by the given name or creates it if it doesn’t exist
- get(guid, *args, **kwargs)¶
Gets an instance of self.entity_class by the given guid.
- Parameters:
guid – The guid of the entity to get
- Returns:
An instance of self.entity_class
- save(entity, *args, **kwargs)¶
Saves entity to the database.
- Returns:
The updated instance of entity
- find(query=None, sort=None, skip=0, limit=25, many=True, include_session_tags=True, **kwargs)¶
Method for listing entities.
- Parameters:
query – The optional Mongo query to find entities
sort – The keys to sort by
skip – The number of results to skip (offset) by
limit – How many results should the response be limited to
many – Should many entities be returned or just a single one
include_session_tags – Should the tags for the session be included in the query. Default is True
kwargs – kwargs to query by
- Returns:
- find_one(query, **kwargs)¶
Method for returning a unique entity. Will return the entity that matches the query
- Parameters:
query (dict) – The mongoDB query
- Returns:
an instance of self.entity_class
- Raises:
- reload(entity, *args, **kwargs)¶
Reloads the current entity from the API
- Parameters:
entity – An instance of self.entity_class
- Returns:
A refreshed instance of self.entity_class
- aggregate(aggregate, **kwargs)¶
Method for performing an aggregation on data. See https://docs.mongodb.com/manual/core/aggregation-pipeline/. If a subset of the data is required then provide the query within the aggregation pipeline
- Parameters:
aggregate (list) – The aggregation pipeline
- Returns:
- class pyqalx.core.adapters.set.QalxSet(session, *args, **kwargs)¶
Provides an interface for accessing the API for
Set
entities- add(items, meta=None, blueprint_name=None, **kwargs)¶
When adding a Set ensure that the items posted to the api are in the format {<key>:
Item
}
- get(guid, child_fields=None, unpack=True, *args, **kwargs)¶
Gets an instance of self.entity_class by the given guid.
- Parameters:
guid – The guid of the entity to get
- Returns:
An instance of self.entity_class
- save(entity, blueprint_name=None, *args, **kwargs)¶
When saving entities that inherit from this class, pyqalx will get the blueprint based on the blueprint_name and then ensure that the schema validates correctly against the data that you are updating the entity with
- Parameters:
entity – The entity instance that you are saving
blueprint_name (str, None) – the name of the blueprint
- Returns:
A valid self.entity_class instance
- Raises:
jsonschema.ValidationError
- find(query=None, sort=None, skip=0, limit=25, many=True, include_session_tags=True, **kwargs)¶
Return multiple packed entities from the API
- Parameters:
query (dict) – The optional Mongo query to find entities
sort (list) – The keys to sort by
skip (int) – The number of results to skip (offset) by
limit (int) – How many results should the response be limited to
many (bool) – Should many entities be returned or just a single one
include_session_tags (bool) – Should the tags for the session be included in the query. Default is True
child_fields (list) – A list of fields that should be returned from child entities
- Returns:
A list of entities
- find_one(query, unpack=True, **kwargs)¶
Method for returning a unique entity. Will return the entity that matches the query
- Parameters:
- Returns:
an instance of self.entity_class
- Raises:
- reload(entity, unpack=True, *args, **kwargs)¶
Reloads the current entity from the API.
- Parameters:
entity (pyqalx.core.entities.set.Set) – An instance of Set
unpack (bool) – Should the child entities by unpacked? Defaults to False
- Returns:
- aggregate(aggregate, **kwargs)¶
Method for performing an aggregation on data. See https://docs.mongodb.com/manual/core/aggregation-pipeline/. If a subset of the data is required then provide the query within the aggregation pipeline
- Parameters:
aggregate (list) – The aggregation pipeline
- Returns:
- class pyqalx.core.adapters.worker.QalxWorker(session, *args, **kwargs)¶
Provides an interface for accessing the API for
Worker
entities- signal_class¶
alias of
QalxWorkerSignal
- list_endpoint(*args, bot_entity, **kwargs)¶
Builds the list_endpoint for workers. This requires the bot_entity which will get passed down from the calling method via kwargs. :param bot_entity:An instance of Bot :type bot_entity:~entities.bot.Bot :return:The Worker list endpoint
- detail_endpoint(guid, *args, bot_entity, **kwargs)¶
Builds the detail_endpoint for workers. This requires the bot_entity which will get passed down from the calling method via kwargs.
- Parameters:
guid (uuid) – The guid of the Worker
bot_entity (Bot) – An instance of Bot
- Returns:
The Worker list endpoint
- get(guid, bot_entity, *args, **kwargs)¶
Gets an individual worker.
- get_signal(entity, bot_entity, *args, **kwargs)¶
Gets the signal for a specific entity
- Parameters:
- Returns:
- terminate(entity, bot_entity, *args, **kwargs)¶
Terminates a specific entity
- reload(entity, bot_entity, **kwargs)¶
Reloads the current entity from the API
- update_status(entity, bot_entity, status)¶
Updates the workers status
- stop(entity, bot_entity, **kwargs)¶
Sends a stop signal to the worker
- resume(entity, bot_entity, **kwargs)¶
Sends a resume signal to the worker
Entities¶
Root entities - these are children of dict
but with special methods. They
allow for attribute and key lookup
- class pyqalx.core.entities.bot.Bot(*args, **kwargs)¶
QalxEntity with entity_type Bot
- class pyqalx.core.entities.entity.QalxAddict(*args, **kwargs)¶
A qalx subclass of addict.Dict. Enables us to have entities that operate like normal dicts, but allow dot notation for all the keys. Ensures that if something is passed to it that is already an instance of QalxEntity that we don’t mutate the type to a QalxAddict instance
- class pyqalx.core.entities.entity.BaseQalxEntity(*args, **kwargs)¶
The base class for all QalxEntity instances. Operates like a normal dict but utilises addict.Dict to allow a user to do dot notation lookup of all keys recursively.
- class pyqalx.core.entities.entity.QalxListEntity(pyqalxapi_list_response_dict, **kwargs)¶
Simple wrapper around a pyqalxapi_dict so we can keep extra keys on the API list response. Instantiates each entity in data to the correct QalxEntity subclass.
- Parameters:
pyqalxapi_list_response_dict (dict) – A dict response from a REST API list endpoint
- class pyqalx.core.entities.entity.AggregationResult(pyqalxapi_list_response_dict, **kwargs)¶
Class for aggregation result responses from the API
- Parameters:
pyqalxapi_list_response_dict (dict) – A dict response from a REST API list endpoint
- class pyqalx.core.entities.entity.QalxEntity(*args, **kwargs)¶
Base class for a response from the qalx API
QalxEntity children need to be populated with either a requests.models.Response which is the type returned by the methods on
PyQalxAPI
or with a dict.Entities can behave either like a dict or attribute lookups can be used as getters/setters
>>> class AnEntity(QalxEntity): ... pass >>> c = AnEntity({"guid":"123456789", "info":{"some":"info"}}) >>> # dict style lookups >>> c['guid'] '123456789' >>> # attribute style lookups >>> c.guid '123456789'
- Parameters:
pyqalxapi_dict (dict) – a ‘dict’ representing a qalx entity object to populate the entity
- classmethod pluralise()¶
Pluralises the entity type
- class pyqalx.core.entities.entity.QalxFileEntity(*args, **kwargs)¶
QalxFileEntity subclasses can be provided with a keyfile kwarg. This gets set on the instance of the entity and will allow for seamless decryption of files
- read_file(retry=True)¶
If this Item contains a file, will read the file data and cache it against the Item.
- save_file_to_disk(filepath, filename=None)¶
If this Item contains a file, will read the file from the URL (or from the cached bytes on the instance) and save the file to disk. Provide an optional filename argument if you don’t want to use the same filename as the one stored on the Item
- Parameters:
filepath (str) – The path where this file should be saved
filename (str) – The optional name of this file. Defaults to the name of the file on the instance
key_file (str) – The path to the encryption key to be used for decrypting the file data. If it is not provided and the data was encrypted, the data will be read in the encrypted format
- Returns:
Path to file on disk
- Raises:
- class pyqalx.core.entities.entity.QalxQueueableEntity(*args, **kwargs)¶
A mixin which allows an entity to be sumitted to queue
- classmethod add_to_queue(payload, queue, children=False, **message_kwargs)¶
Submits an entity, entity guid or list of entities/entity guids to the given queue
Example usage for an Item:
>>> Item.add_to_queue(item, queue) >>> Item.add_to_queue(item.guid, queue) >>> Item.add_to_queue([item, item.guid], queue)
- Parameters:
payload – subclassed instance of
QalxEntity
, a guid, or a list containing a combination of bothchildren (bool) – Whether we are submitting the child entities to the queue rather than the given entity
- class pyqalx.core.entities.group.Group(*args, **kwargs)¶
A group is a collection of sets. These are useful for sending a lot of sets to a queue and being able to track when they have all been processed.
>>> from pyqalx import QalxSession >>> qalx = QalxSession() >>> steel = qalx.item.add(source="path/to/316_datasheet.pdf", ... data={"rho":8000, "E":193e9}, ... meta={"library":"materials", "family":"steel", ... "grade":"316"}) >>> steel_squares = {} >>> for size in range(2, 500): ... dims = qalx.item.add(data={"height":size, "width":size}, ... meta={"shape":"square"}) ... steel_square_set = qalx.set.add(items={"shape":dims, "material":steel}, ... meta={"profile":"square_steel"}) ... steel_squares[size] = steel_square_set >>> all_squares = qalx.group.add(sets=steel_squares)
- class pyqalx.core.entities.item.Item(*args, **kwargs)¶
An item is the core of qalx.
They are structured data or a file or some combination of the two. For example:
>>> from pyqalx import QalxSession >>> qalx = QalxSession() >>> dims = qalx.item.add(data={"height":5, "width":5}, meta={"shape":"square"}) >>> steel = qalx.item.add(source="path/to/316_datasheet.pdf", ... data={"rho":8000, "E":193e9}, ... meta={"library":"materials", ... "family":"steel", ... "grade":"316"})
We can then use the
find_one
andfind
methods to search for items>>> from pyqalx import QalxSession >>> qalx = QalxSession() >>> steel_316_item = qalx.item.find_one(query={"metadata.data.library": "materials", ... "metadata.data.family": "steel", ... "metadata.data.grade": "316"}) >>> steels = qalx.item.find(query={"metadata.data.family": "steel"}) >>> squares = qalx.item.find(query={"metadata.data.shape": "square"}) >>> quads = qalx.item.find(query={"$or": [{"metadata.data.shape": "square"}, ... {"metadata.data.shape": "rectangle"}]})
We can edit an item once we have retrieved it and save it back to qalx. You can either use attribute style getters/setters (my_shape.data.height = 10) or key style getters/setters (my_shape[‘data’][‘height’] = 10)
>>> from pyqalx import QalxSession >>> qalx = QalxSession() >>> my_shape = qalx.item.find_one(query={"data.height": 5, "data.width": 5}) >>> my_shape.data.height = 10 >>> my_shape.meta.shape = 'rectangle' >>> qalx.item.save(my_shape) >>> # If we think that someone else might have edited my_shape we can reload it: >>> my_shape = qalx.item.reload(my_shape)
QalxFileEntity subclasses can be provided with a keyfile kwarg. This gets set on the instance of the entity and will allow for seamless decryption of files
- class pyqalx.core.entities.item.ItemAddManyEntity(pyqalxapi_list_response_dict, **kwargs)¶
The entity that gets returned when the
add_many()
method is called- property items_¶
Helper to allow easier lookup for items. A user can continue to do resp[‘items’] should they choose - but using this property leads to neater code.
Usage: resp.items_
- Returns:
dict
- Raises:
KeyError if items key doesn’t exist on ItemAddManyEntity
- class pyqalx.core.entities.queue.QueueResponse(response)¶
a response from a remote queue
make new response
- Parameters:
response – response from queue
- class pyqalx.core.entities.queue.QueueMessage(message, worker, visibility, *args, **kwargs)¶
a message from the queue. subclasses threading.Thread so that a heartbeat can be started as soon as the message is received from the Queue
- Parameters:
- run()¶
A QalxJob might take a long time to process. Therefore, we use a heartbeat on the message to keep it in flight for as long as possible. This runs in a Thread on
QueueMessage
instantiation to ensure that the message timeout doesn’t expire before processing even begins (i.e. if a batch of messages was returned)As per the SQS docs here
This will specify the initial visibility timeout (for example, 2 minutes) and then — as long as your consumer still works on the message — keep extending the visibility timeout by 2 minutes every minute. This uses visibility as the initial visibility timeout so every visibility / 2 seconds will increase the timeout by visibility * number of hearbeats from the initial visibility time.
- class pyqalx.core.entities.queue.Queue(*args, **kwargs)¶
QalxEntity with entity_type Queue
- property broker_client¶
An authenticated client to communicate with the remote message broker.
Windows can’t pickle the client if it gets set on __init__ therefore, we set it to an internal attribute and cache it for the duration of the Queue to avoid getting the client multiple times
- get_messages(max_num_msg, visibility, waittime, worker)¶
get messages from the queue
- delete_message(message)¶
remove message from the queue
- Parameters:
message (
QueueMessage
) – message to delete
- purge()¶
Purge the queue. All messages on the queue will be deleted.
This might take up to 60 seconds and any messages sent to the queue while it is being purged might also be deleted
- class pyqalx.core.entities.set.Set(*args, **kwargs)¶
A set is simply a collection of items.
They are mapped with keys so that you can retrieve specific items from the set later:
>>> from pyqalx import QalxSession >>> qalx = QalxSession() >>> dims = qalx.item.add(data={"height":5, "width":5}, meta={"shape":"square"}) >>> steel = qalx.item.add(source="path/to/316_datasheet.pdf", ... data={"rho":8000, "E":193e9}, ... meta={"library":"materials", ... "family":"steel", "grade":"316"}) >>> steel_square_set = qalx.set.add(items={"shape":dims, "material":steel}, ... meta={"profile":"square_steel"})
As with items we can then use the
find_one
andfind
methods to search for sets and easily get the item data:. You can use attribute style getters/setters or key style getters/setters>>> from pyqalx import QalxSession >>> qalx = QalxSession() >>> my_set = qalx.set.find_one(query={"metadata.data.profile": "square_steel"}) >>> # Key style >>> youngs_mod = my_set.get_item_data("material")['E'] >>> # Attribute style >>> younds_mod = my_set.get_item_data("material").E
Note
Sets store a reference to an item rather than a copy of the item. Changes to the item will be reflected in all sets containing that item.
- get_item_data(item_key)¶
helper method to get data from an item in the set
- Parameters:
item_key (str) – key on the set to add the item as
- Returns:
dict or None if key is missing
- property items_¶
Helper to allow easier lookup for items. A user can continue to do myset[‘items’] should they choose - but using this property leads to neater code.
Usage: myset.items_
- Returns:
dict
- Raises:
KeyError if items key doesn’t exist on Set
- class pyqalx.core.entities.worker.Worker(*args, **kwargs)¶
QalxEntity with entity_type Worker
- class pyqalx.core.entities.blueprint.Blueprint(*args, **kwargs)¶
Provides an interface for creating blueprints. Blueprints allow a user to create items or sets in a consistent fashion
- check_schema(schema=None)¶
Checks that the given schema is structure correctly
- Parameters:
schema (None or Blueprint) – The schema to validate. Uses self if None
- Raises:
jsonschema.SchemaError
- validate(entity)¶
Validate an entity against the blueprint schema locally.
- Parameters:
entity – A full schema for an entity that can have a Blueprint
- add_schema(schema_value)¶
Update the entire Blueprint schema at once. Will completely overwrite all properties that are already on this Blueprint instance
- Parameters:
schema_value – A full schema for an entity that can have a Blueprint
- Returns:
An updated instance of the blueprint
- add_data(key, schema_value, required=False)¶
Adds data to this item blueprint instance.
- add_meta(key, schema_value, required=False)¶
Adds meta to this item blueprint instance.
- add_item(key, item_blueprint=None, required=False)¶
Adds an item to this set blueprint instance. If item_blueprint is provided then the set schema will always require that specific item blueprint for key. If item_blueprint is not provided then the set blueprint will accept any data for key when being used
- Parameters:
- Returns:
An updated instance of the set blueprint
Factories¶
Factories provide an automated way to create multiple bots - both locally and on remote servers
Factory module. Contains the main factory class
- class pyqalx.factories.factory.Factory(session, user_profile='default', bot_profile='default', aws_profile='default')¶
Main factory class. Inherits from BaseFactory and contains methods for specific functionality related to the distinct stages of a factory
- Parameters:
session (QalxSession) – An instance of
QalxSession
user_profile (str) – The user profile to use
bot_profile (str) – The bot profile to user
aws_profile (str) – The AWS profile to use
- validate(plan)¶
Calls the validate method on the adapter
- Parameters:
plan (str) – Path to a .yaml file containing the definition of the factory plan
- Raises:
QalxFactoryValidationError – If there is a problem validating the plan
- pack(plan, stage=None, delete=True)¶
Packs a factory. Calls validate method on the plan provided
- Parameters:
- Raises:
QalxFactoryPackError – If a stage is provided and it cannot be found on the plan, or if there are any issues creating the build directory
- build(plan, stage)¶
Builds a factory. Calls pack method with the provided plan and stage.
- Parameters:
- Raises:
if there is a problem with the build
- demolish(factory_guid)¶
Demolishes a factory
- Param:
factory_guid: The guid of the factory entity to demolish
Transport¶
The transport layer of pyqalx
. Links adapters to the REST API
- class pyqalx.transport.api.PyQalxAPI(session)¶
The main interface between adapters and the REST API.
Usage:
>>> from pyqalx import QalxSession >>> qalx = QalxSession() >>> qalx.rest_api
- get(endpoint, **kwargs)¶
Performs a GET request to the specified endpoint
- post(endpoint, **kwargs)¶
Performs a POST request to the specified endpoint
- patch(endpoint, **kwargs)¶
Performs a PATCH request to the specified endpoint
- put(endpoint, **kwargs)¶
Performs a PUT request to the specified endpoint
- delete(endpoint, **kwargs)¶
Performs a DELETE request to the specified endpoint
- exception pyqalx.transport.core.PyQalxAPIException¶
A generic error for PyQalxAPI
Config¶
- class pyqalx.config.configs.Config¶
- property defaults¶
Returns a dict of default values for config options from ~pyqalx.config.defaults.__init__
:return : dict of options
- classmethod configure(profile_name, config_items=None)¶
When given a profile name and dict of config items will write the config file to disk. If a profile_name is given that already exists on the profile then the profile_name on the profile will be completely replaced with the values from config_items
- class pyqalx.config.configs.BotConfig¶
Works exactly like a dict but provides ways to fill it from ini files and environment variables. There are two common patterns to populate the config.
Either you can add them to the .bots file:
>>> bot.config.from_botsfile(profile_name=BotConfig.default)
Or alternatively you can define the configuration from environment variables starting with QALX_BOT_. These will be populated automatically but values defined in .bots will overwrite these if bot.config.from_botsfile() is called.
To set environment variables before launching the application you have to set this environment variable to the name and value you want to use. On Linux and OS X use the export statement:
>>> export QALX_BOT_LICENCE_FILE='/path/to/licence/file'
On windows use set instead.
- property defaults¶
Loads the default bot config from ~pyqalx.config.defaults.bots. Any config items that exists in ~pyqalx.config.defaults.__init__ will be overwritten by config values that exist in ~pyqalx.config.defaults.bots
- Returns:
dict of default config options
- class pyqalx.config.configs.UserConfig¶
Works exactly like a dict but provides ways to fill it from ini files and environment variables. There are two common patterns to populate the config. Either you can add them to the .qalx_bot file:
>>> qalx_bot.config.from_qalxfile(profile_name=UserConfig.default)
Or alternatively you can define the configuration from environment variables starting with QALX_USER_. These will be populated automatically but values defined in .qalx will overwrite these if qalx_bot.config.from_qalxfile() is called.
To set environment variables before launching the application you have to set this environment variable to the name and value you want to use. On Linux and OS X use the export statement:
>>>export QALX_USER_EMPLOYEE_NUMBER=1280937
On windows use set instead.
Logging¶
- class pyqalx.core.log.QalxRotatingFileHandler(*args, **kwargs)¶
We always force some defaults for a qalx log
Open the specified file and use it as the stream for logging.
By default, the file grows indefinitely. You can specify particular values of maxBytes and backupCount to allow the file to rollover at a predetermined size.
Rollover occurs whenever the current log file is nearly maxBytes in length. If backupCount is >= 1, the system will successively create new files with the same pathname as the base file, but with extensions “.1”, “.2” etc. appended to it. For example, with a backupCount of 5 and a base file name of “app.log”, you would get “app.log”, “app.log.1”, “app.log.2”, … through to “app.log.5”. The file being written to is always “app.log” - when it gets filled up, it is closed and renamed to “app.log.1”, and if files “app.log.1”, “app.log.2” etc. exist, then they are renamed to “app.log.2”, “app.log.3” etc. respectively.
If maxBytes is zero, rollover never occurs.
- rotate(*args, **kwargs)¶
Overridden in order to ignore permission errors. This could happen when multiple processes are accessing the same log file. If this happens, rotation is suppressed until the lock is dropped. The log files will exceed the “maxBytes” value but logging will continue normally.
- class pyqalx.core.log.QalxLogFormatter(style='%', *args, **kwargs)¶
Sometimes our log messages contain extra data. Use this formatter to allow all handlers to use the same format while also handling the potential extra data
Initialize the formatter with specified format strings.
Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.
Use a style parameter of ‘%’, ‘{’ or ‘$’ to specify that you want to use one of %-formatting,
str.format()
({}
) formatting orstring.Template
formatting in your format string.Changed in version 3.2: Added the
style
parameter.- format(record)¶
Format the specified record as text.
The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.
- pyqalx.core.log.configure_logging(name, filename, level)¶
Configures a log. Loggers are tied directly to an instance of
QalxSession
orWorker
and should be accessed via instance.log.<level>. All pyqalx loggers are prefixed with <LOGGER_NAME_PREFIX>
- pyqalx.core.log.session_logging_suffix(config)¶
Returns the suffix to use for QalxSession logging instances based on the config
- pyqalx.core.log.get_logger()¶
For all the given configured loggers will try and get a user session log to log the unhandled exception to. If there are multiple configured user sessions just grab the first one. If there are no user sessions configured (i.e. the error occurred before QalxSession was instantiated) then return the root log
- pyqalx.core.log.exception_hook(exc_type, exc_value, exc_traceback, extra=None, log=None)¶
We want to log any unhandled exception that occurs in pyqalx. This includes Qalx* errors (i.e. because a user has misconfigured something) and also if they have made an error in one of their decorated functions (i.e. @bot.process). This catches all errors and logs them and also prints out the traceback
Signals¶
- class pyqalx.core.signals.QalxSignal(entity)¶
A set of helper methods to aid in managing signals on Workers
- property terminate¶
Checks whether the signal was a terminate signal
- property terminate_cold¶
Checks whether the signal was a cold terminate signal
- property terminate_warm¶
Checks whether the signal was a warm terminate signal
- property requeue_job¶
Checks whether the signal wants the job to be requeued
- property stop¶
Checks whether the signal wants to stop the job
Errors¶
pyqalx.core.errors defines QalxError exception and a load of children.
If pyqalx is going raise an error you know about then use one of these or create a new one.
- exception pyqalx.core.errors.QalxError¶
Base qalx error. Take responsibility!
- exception pyqalx.core.errors.QalxAuthError¶
qalx did not find a way to authenticate or the authentication didn’t work
- exception pyqalx.core.errors.QalxNoGUIDError¶
A QalxEntity without a guid is like a dog without a bone.
- exception pyqalx.core.errors.QalxNoInfoError¶
A QalxEntity without info is like a dog without a bone.
- exception pyqalx.core.errors.QalxConfigProfileNotFound¶
The profile wasn’t in the file or the file wasn’t properly formed.
- exception pyqalx.core.errors.QalxConfigFileNotFound¶
There should be a file in the users home directory (either a .bots or .qalx)
- exception pyqalx.core.errors.QalxQueueError¶
There wasn’t the correct information to connect to the remote queue.
- exception pyqalx.core.errors.QalxBotInitialisationFailed¶
The bot initialisation function returned something falsey.
- exception pyqalx.core.errors.QalxEntityTypeNotFound¶
We couldn’t find the entity type you were looking for.
- exception pyqalx.core.errors.QalxEntityNotFound¶
We couldn’t find the entity you were looking for.
- exception pyqalx.core.errors.QalxMultipleEntityReturned¶
We found more than one entity, but you just wanted the one hey?
- exception pyqalx.core.errors.QalxConfigError¶
Something about an attempted load of config didn’t work
- exception pyqalx.core.errors.QalxAPIResponseError¶
There was a problem with some kind of API request.
- exception pyqalx.core.errors.QalxEntityUnchanged¶
Saved something which hadn’t actually been changed when we thought it had.
- exception pyqalx.core.errors.QalxInvalidSession¶
The qalx_session argument passed to an adapter isn’t a valid QalxSession instance
- exception pyqalx.core.errors.QalxNoEntity¶
A user tried to access the entity attribute on QalxAdapter when they hadn’t set an entity
- exception pyqalx.core.errors.QalxIncorrectEntityType¶
The entity on the QalxAdapter is of a different type to the type of adapter
- exception pyqalx.core.errors.QalxFileError¶
There is something wrong with the file details passed to the QalxAdapter
- exception pyqalx.core.errors.QalxAlreadyRegistered¶
The entity is already registered with the session
- exception pyqalx.core.errors.QalxCannotUnregister¶
The entity cannot be unregistered
- exception pyqalx.core.errors.QalxRegistrationClassNotFound¶
The registration class for registering a custom class was not found
- exception pyqalx.core.errors.QalxInvalidBlueprintError¶
The blueprint is not valid.
- exception pyqalx.core.errors.QalxInvalidTagError¶
The user does not have access to write to the specific tags
- exception pyqalx.core.errors.QalxStepFunctionNotDefined¶
A specific Bot step function has not been defined by the user
- exception pyqalx.core.errors.QalxFileRetrievalError¶
Error when retrieving a file from qalx
- exception pyqalx.core.errors.QalxFactoryError¶
Generic error with a Qalx factory
- exception pyqalx.core.errors.QalxFactoryValidationError¶
Error when validating a factory
- exception pyqalx.core.errors.QalxFactoryPackError¶
Error when packing a factory
- exception pyqalx.core.errors.QalxFactoryBuildError¶
Error when building a factory
- exception pyqalx.core.errors.QalxFactoryDemolishError¶
Error when demolishing a factory
Bot API¶
- pyqalx.bot.cannot_be_undefined(*args, **kwargs)¶
Raises an exception if a step function cannot be left undefined. Handled here to avoid a user trying to call the specific step function directly.
- class pyqalx.bot.QalxJob(worker_process)¶
a Job to process
- Parameters:
worker_process (Worker) – The instance of the worker that is running this job
- property log¶
An instance of the workers log to enable logging from step functions
- property e¶
shortcut to QalxJob().entity
- property s¶
shortcut to QalxJob().session
- property queue¶
shortcut to the queue instance on the bot entity
- stop_processing()¶
Stop the worker from processing. Will cause the worker to wait until it receives a resume signal
- resume_processing()¶
Resume the worker.
- terminate(warm=True, requeue_job=False)¶
The job has been told to terminate the worker
- publish_status(status)¶
update the status of the worker
- Parameters:
status (str) – short message about the worker status
- add_step_result(success=True)¶
indicate success or failure of the step function
- Parameters:
success (bool) – was the step successful or not
- property last_step_result¶
The success flag for the previous step result (or None if no previous steps) for this job
- get_entity(entity_type, entity_guid)¶
get an entity
- Parameters:
- Returns:
a child of
QalxEntity
if the entity is found on the API, otherwise None
- add_item_data(item_key, data, meta=None)¶
helper method to add item data to the set
- save_entity()¶
Updates the API representation of this entity with the data currently stored in the job.
- Returns:
a child of
QalxEntity
of the correct type based on the type of self.entity
- reload_entity()¶
Reloads self.entity from the API
- Returns:
a child of
QalxEntity
of the correct type based on the type of self.entity
- delete_message()¶
Helper method for deleting a message from a Queue
- class pyqalx.bot.Bot(bot_name)¶
Bots allow automation of tasks. A bot reads messages from a
Queue
and processes the response through user configured step_functions.- Parameters:
bot_name (str) – The unique name for this bot
- start(queue_name, processes=1, qalx_session_class=None, user_config_class=None, bot_config_class=None, entity_classes=None, user_profile_name='default', bot_profile_name='default', skip_ini=False, stop=False, meta=None, workflow=None)¶
Starts the bot with processes number of
Worker
instances. Each worker will be able to read a message from the queue in parallel- Parameters:
queue_name (str) – The unique name for the queue
qalx_session_class (pyqalx.core.session.QalxSession) – The
QalxSession
class that this Bot and Workers should useuser_config_class (pyqalx.config.configs.UserConfig) – The
UserConfig
class that will be used when creating the Botbot_config_class (pyqalx.config.configs.BotConfig) – The
BotConfig
class that will be used when using the bot sessionentity_classes (list) – A list of
QalxEntity
subclasses that you want this Bot to use once it is starteduser_profile_name (str) – The user profile name that will be used when creating the Bot
bot_profile_name (str) – The bot profile name that will be used when using the bot session
skip_ini (bool) – Should reading from the ini file be skipped when instantiating the user and bot sessions
processes (int) – The number of
Worker
processes this bot should spawnstop (bool) – Should the bot be stopped right after startup. This is essential functionality during factory building
meta (dict) – Metadata to be saved on the bot. Optional parameter
workflow (list) – An iterable of other bots that defines the interaction of the bot with other bots in the context of a workflow
- initialisation(func)¶
decorator to to register the initialisation
initialisation is executed with a QalxSession instance authenticated againt the user_config_class __init__ argument :param func: function to be executed
- begin(func)¶
decorator to to register the begin
begin is executed with a QalxJob instance
- Parameters:
func – function to be executed
- preload(func)¶
decorator executed before entity is loaded from qalx.
preload is executed with a QalxJob instance
- Parameters:
func – function to be executed
- onload(func)¶
decorator to register the onload
onload is executed with a QalxJob instance
- Parameters:
func – function to be executed
- preprocess(func)¶
decorator to register preprocess function
preprocess is executed with a QalxJob instance
- Parameters:
func – function to register
- process(func)¶
decorator to register process function
process is executed with a QalxJob instance
- Parameters:
func – function to register
- precompletion(func)¶
decorator to register precompletion function
precompletion is executed with a QalxJob instance
- Parameters:
func – function to register
- postprocess(func)¶
decorator to register postprocess function
postprocess is executed with a QalxJob instance
- Parameters:
func – function to register
- onwait(func)¶
decorator to register onwait function
onwait is executed with a QalxJob instance
- Parameters:
func – function to register
- ontermination(func)¶
decorator to register ontermination function
ontermination is executed with a QalxJob instance
- Parameters:
func – function to register
- class pyqalx.bot.Worker(steps, queue, bot_entity, api_entity, worker_adapter: QalxWorker, index, workflow, **kwargs)¶
exit codes (start at 72590):
72590 base exit code
72591 warm exit
72592 cold exit
72593 config triggered (e.g. KILL_AFTER)
- configure_logging()¶
Configures the log for the Worker. Each Worker logs to its own file. This prevents issues with multiple processes trying to write to the same file
- run()¶
Run our custom functions. Catch any exception that may occur with it and pass that to our custom uncaught exception handler