Bots¶
A Bot is the term used for code that performs processing in
qalx
. You define what processing you want the bot to do by defining
step functions . These are normal python functions that will be
executed by the bot in a pre-defined order and in response to messages
added to the queue that the bot is configured to read from. All
interaction with qalx
by the bot will be recorded as being performed
by the bot rather than any user.
Essentially there is a three-phase process:
start up the bot
read from the queue and retrieve the
qalx
entity to work onprocess the entity
To manage this process, a Bot uses a job to allow you to track what your processing is doing and pass data between step functions. It will also allow you to tell the Bot to stop processing and ask the queue for a new job.
The detailed flow of these steps is shown below, each step is described in more detail in the following text. The envelope shows what information about the job is available at each step.
Basic usage¶
A bot instance is simply created with the name for the bot (for easier identification).
# my_bot_file.py
from pyqalx import Bot
my_bot = Bot(bot_name="my bot")
@my_bot.process
def do_process(job):
print("I'm going to process " + str(job.entity))
job.entity.data.processed = True
job.save_entity()
We then use the Command Line Interface to start the bot - supplying the queue name that the bot should read from:
$ qalx bot-start --queue-name "my queue" my_bot_file:my_bot
Note
By default the bot will read from the specified queue indefinitely. It is possible for the bot to be configured to exit if the queue is empty after it has checked it a certain amount of times. See bot configuration for more information.
Parallel processing¶
By default, the Command Line Interface will launch the bot in a single process.
However, if you want to run the function in multiple processes then you
can simply pass an argument to qalx
:
$ qalx bot-start --processes 4 --queue-name "my queue" my_bot_file:my_bot
Each of these processes creates an Worker
instance
which you can use to track what the job is doing.
Step functions¶
Introduction¶
Each bot has a number of pre-defined steps (listed below). These assist the developer in building robust, scalable bots that can handle unexpected input data or errors during processing. Each step will be given a job object to allow you to track what your processing is doing and pass data between step functions
Note
with the exception of process
, any step function can be left
undefined.
Steps¶
initialisation
¶
This is the first function called after Bot.start()
has been called.
This is the only function that the QalxSession
object will authenticate
against the user and not the bot. This is because, in terms of
qalx
resources, the bot will not be created unless the
initialisation function succeeds.
Note
If this function is defined it must return something that evaluates to
True
or the initialisation will fail.
parallel processing
as you can see from above it’s possible to launch a bot in multiple processes. The initialisation function is the only step called in the single parent process.
argument
this is the only function called without a QalxJob
argument, instead the only argument passed to
this function is the QalxSession
authenticated against the user.
purpose:¶
This step is used to check that the operating system and device are prepared for the bot to start. For example:
is a licence available for some software?
do I have the correct version of something installed?
do I have plenty of disk space?
begin
¶
Called after the bot has been created as a resource but before the bot makes any requests to the queue.
purpose:¶
This step is used to perform the same checks as in initialisation
but happens “post-fork”.
arguments:¶
job: a
QalxJob
object used to track what your processing is doing and pass data between step functions
preload
¶
Called after a message has been taken from the queue but before it has been resolved into an entity.
purpose:¶
This step is used to check that the message has the form that the bot expects.
Does the message have the correct entity type (i.e. I expect a
set
did I get one?)
arguments:¶
job: a
QalxJob
object used to track what your processing is doing and pass data between step functions
onload
¶
Called once the message has been resolved into a QalxEntity
.
purpose:¶
This step is used to check that the entity has the form that the bot expects.
Are all the fields I need populated?
Is there some information I want to add to
entity.meta
at this point?
Warning
Will this job take longer than 12 hours to process? If so the message should be manually removed from the queue at this point by running the `job.delete_message()` command. Be aware that manually removed messages will not be put back onto the queue if an unhandled error occurs with the bot. See how queues work for information on how qalx queues process messages.
arguments:¶
job: a
QalxJob
object used to track what your processing is doing and pass data between step functions
preprocess
¶
purpose:¶
This step can be used to transform the entity in some way before it is processed.
Do I need to convert the format of some fields?
Do I want to resolve some references to other entities here?
arguments:¶
job: a
QalxJob
object used to track what your processing is doing and pass data between step functions
process
¶
purpose:¶
This step is where the primary functionality should live
arguments:¶
job: a
QalxJob
object used to track what your processing is doing and pass data between step functions
Warning
Messages in qalx
only stay “in flight” on a queue for a maximum of 12 hours. During this time no other worker
will be able to access the message. However, if the message has not been completed after this 12 hour time limit expires,
the message will go back into the queue and will be picked up by another worker.
If you think that your processing could take longer than 12 hours you should delete the message manually
in the onload function.
If your processing takes less than 12 hours then qalx
will automatically delete the message when it is complete.
precompletion
¶
purpose:¶
This step can be used to check that the processing was successful and save some interim results
How long did the processing take?
Did the processing throw any warnings?
arguments:¶
job: a
QalxJob
object used to track what your processing is doing and pass data between step functions
postprocess
¶
purpose:¶
This step can be used to perform actions on the data following processing.
What was the maximum value of a metric during processing?
What was the standard deviation of a measure during processing?
Do I need to load an output file and save it to
qalx
?
arguments:¶
job: a
QalxJob
object used to track what your processing is doing and pass data between step functions
onwait
¶
Called between requests to the queue for a message.
purpose:¶
This step let’s you know that the bot is still alive and waiting for messages.
arguments:¶
job: a
QalxJob
object used to track what your processing is doing and pass data between step functions
ontermination
¶
Called before the bot exits
purpose:¶
This step let’s you do some final clean up or notifications before the bot dies
arguments:¶
job: a
QalxJob
object used to track what your processing is doing and pass data between step functions
QalxJob¶
Functions are called in the order outlined above and all except initialisation are passed a single
argument job of type QalxJob
. The job object has attributes and
methods that define and assist with processing:
job.session
¶
A QalxSession
object allowing interaction with qalx
.
This session is authenticated against a bot.
Note
this can also be accessed via job.s
job.entity
¶
The entity representing the job like Item
, Set
, Group
.
This is only available after preload.
Note
this can also be accessed via job.e
job.entity_adapter
¶
An instance of an adapter for the specific entity. i.e. if
the entity is a Item
then entity_adapter will be an instance of
QalxItem
job.context
¶
A dict
that can be used to pass data between steps
@bot.process
def process_function(job):
job.context.update({"example": "some example data"})
@bot.postprocess
def postprocess_function(job):
# We can access the context attribute here which has been updated in the process function
assert job.context["example"] == "some example data"
job.publish_status(status="")
¶
A shortcut to updating the status info on the Worker that is processing the job.
job.add_step_result(success=True)
¶
Define the result of the current step - whether it worked or not
Note
if you set success=False then no further processing steps will be executed.
job.last_step_result
¶
An object representing the last call to add_step_result
(or NoneType if there are no previous steps).
Note
this is True if the last step was successful, False otherwise and NoneType if there are no previous steps
job.log
¶
Returns a logger instance for the given job. Allows you to log as the worker from your step functions. See logging for more information.
job.save_entity()
¶
Will save the current entity back to qalx.
job.reload_entity()
¶
Will reload the current entity from qalx. Useful if the entity could be updated outside of the bot during processing.
job.delete_message()
¶
Deletes the message from the queue. See onload for why you may want to do this.
job.stop_processing()
¶
Should the worker that is running this job stop processing. See signals for more information.
job.resume_processing()
¶
Resume the worker if it is in a stopped state. See signals for more information.
job.terminate()
¶
Should this job terminate the worker? See signals for more information.
Helpers for sets¶
If the bot is processing a Set
entity there are a couple of extra helper functions:
job.add_item_data(item_key, data, meta=None)
¶
Will add a new item to the set with key item_key
and the data
and any meta
.