Bots

A Bot is the term used for code that performs processing in qalx. You define what processing you want the bot to do by defining step functions . These are normal python functions that will be executed by the bot in a pre-defined order and in response to messages added to the queue that the bot is configured to read from. All interaction with qalx by the bot will be recorded as being performed by the bot rather than any user.

Essentially there is a three-phase process:

  1. start up the bot

  2. read from the queue and retrieve the qalx entity to work on

  3. process the entity

To manage this process, a Bot uses a job to allow you to track what your processing is doing and pass data between step functions. It will also allow you to tell the Bot to stop processing and ask the queue for a new job.

The detailed flow of these steps is shown below, each step is described in more detail in the following text. The envelope shows what information about the job is available at each step.

_images/bot_flow_vert.png

Basic usage

A bot instance is simply created with the name for the bot (for easier identification).

# my_bot_file.py
from pyqalx import Bot

my_bot = Bot(bot_name="my bot")

@my_bot.process
def do_process(job):
    print("I'm going to process " + str(job.entity))
    job.entity.data.processed = True
    job.save_entity()

We then use the Command Line Interface to start the bot - supplying the queue name that the bot should read from:

$ qalx bot-start --queue-name "my queue" my_bot_file:my_bot

Note

By default the bot will read from the specified queue indefinitely. It is possible for the bot to be configured to exit if the queue is empty after it has checked it a certain amount of times. See bot configuration for more information.

Parallel processing

By default, the Command Line Interface will launch the bot in a single process. However, if you want to run the function in multiple processes then you can simply pass an argument to qalx:

$ qalx bot-start --processes 4 --queue-name "my queue" my_bot_file:my_bot

Each of these processes creates an Worker instance which you can use to track what the job is doing.

Step functions

Introduction

Each bot has a number of pre-defined steps (listed below). These assist the developer in building robust, scalable bots that can handle unexpected input data or errors during processing. Each step will be given a job object to allow you to track what your processing is doing and pass data between step functions

Note

with the exception of process, any step function can be left undefined.

Steps

initialisation

This is the first function called after Bot.start() has been called. This is the only function that the QalxSession object will authenticate against the user and not the bot. This is because, in terms of qalx resources, the bot will not be created unless the initialisation function succeeds.

Note

If this function is defined it must return something that evaluates to True or the initialisation will fail.

parallel processing

as you can see from above it’s possible to launch a bot in multiple processes. The initialisation function is the only step called in the single parent process.

argument

this is the only function called without a QalxJob argument, instead the only argument passed to this function is the QalxSession authenticated against the user.

purpose:

This step is used to check that the operating system and device are prepared for the bot to start. For example:

  • is a licence available for some software?

  • do I have the correct version of something installed?

  • do I have plenty of disk space?

begin

Called after the bot has been created as a resource but before the bot makes any requests to the queue.

purpose:

This step is used to perform the same checks as in initialisation but happens “post-fork”.

arguments:
  1. job: a QalxJob object used to track what your processing is doing and pass data between step functions

preload

Called after a message has been taken from the queue but before it has been resolved into an entity.

purpose:

This step is used to check that the message has the form that the bot expects.

  • Does the message have the correct entity type (i.e. I expect a set did I get one?)

arguments:
  1. job: a QalxJob object used to track what your processing is doing and pass data between step functions

onload

Called once the message has been resolved into a QalxEntity.

purpose:

This step is used to check that the entity has the form that the bot expects.

  • Are all the fields I need populated?

  • Is there some information I want to add to entity.meta at this point?

Warning

Will this job take longer than 12 hours to process? If so the message should be manually removed from the queue at this point by running the `job.delete_message()` command. Be aware that manually removed messages will not be put back onto the queue if an unhandled error occurs with the bot. See how queues work for information on how qalx queues process messages.

arguments:
  1. job: a QalxJob object used to track what your processing is doing and pass data between step functions

preprocess

purpose:

This step can be used to transform the entity in some way before it is processed.

  • Do I need to convert the format of some fields?

  • Do I want to resolve some references to other entities here?

arguments:
  1. job: a QalxJob object used to track what your processing is doing and pass data between step functions

process

purpose:

This step is where the primary functionality should live

arguments:
  1. job: a QalxJob object used to track what your processing is doing and pass data between step functions

Warning

Messages in qalx only stay “in flight” on a queue for a maximum of 12 hours. During this time no other worker will be able to access the message. However, if the message has not been completed after this 12 hour time limit expires, the message will go back into the queue and will be picked up by another worker. If you think that your processing could take longer than 12 hours you should delete the message manually in the onload function. If your processing takes less than 12 hours then qalx will automatically delete the message when it is complete.

precompletion

purpose:

This step can be used to check that the processing was successful and save some interim results

  • How long did the processing take?

  • Did the processing throw any warnings?

arguments:
  1. job: a QalxJob object used to track what your processing is doing and pass data between step functions

postprocess

purpose:

This step can be used to perform actions on the data following processing.

  • What was the maximum value of a metric during processing?

  • What was the standard deviation of a measure during processing?

  • Do I need to load an output file and save it to qalx?

arguments:
  1. job: a QalxJob object used to track what your processing is doing and pass data between step functions

onwait

Called between requests to the queue for a message.

purpose:

This step let’s you know that the bot is still alive and waiting for messages.

arguments:
  1. job: a QalxJob object used to track what your processing is doing and pass data between step functions

ontermination

Called before the bot exits

purpose:

This step let’s you do some final clean up or notifications before the bot dies

arguments:
  1. job: a QalxJob object used to track what your processing is doing and pass data between step functions

QalxJob

Functions are called in the order outlined above and all except initialisation are passed a single argument job of type QalxJob. The job object has attributes and methods that define and assist with processing:

job.session

A QalxSession object allowing interaction with qalx. This session is authenticated against a bot.

Note

this can also be accessed via job.s

job.entity

The entity representing the job like Item, Set, Group. This is only available after preload.

Note

this can also be accessed via job.e

job.entity_adapter

An instance of an adapter for the specific entity. i.e. if the entity is a Item then entity_adapter will be an instance of QalxItem

job.context

A dict that can be used to pass data between steps

@bot.process
def process_function(job):
   job.context.update({"example": "some example data"})


@bot.postprocess
def postprocess_function(job):
    # We can access the context attribute here which has been updated in the process function
    assert job.context["example"] == "some example data"

job.publish_status(status="")

A shortcut to updating the status info on the Worker that is processing the job.

job.add_step_result(success=True)

Define the result of the current step - whether it worked or not

Note

if you set success=False then no further processing steps will be executed.

job.last_step_result

An object representing the last call to add_step_result (or NoneType if there are no previous steps).

Note

this is True if the last step was successful, False otherwise and NoneType if there are no previous steps

job.log

Returns a logger instance for the given job. Allows you to log as the worker from your step functions. See logging for more information.

job.save_entity()

Will save the current entity back to qalx.

job.reload_entity()

Will reload the current entity from qalx. Useful if the entity could be updated outside of the bot during processing.

job.delete_message()

Deletes the message from the queue. See onload for why you may want to do this.

job.stop_processing()

Should the worker that is running this job stop processing. See signals for more information.

job.resume_processing()

Resume the worker if it is in a stopped state. See signals for more information.

job.terminate()

Should this job terminate the worker? See signals for more information.

Helpers for sets

If the bot is processing a Set entity there are a couple of extra helper functions:

job.add_item_data(item_key, data, meta=None)

Will add a new item to the set with key item_key and the data and any meta.