Queues

A Queue in qalx is where a bot gets data from in order to process it.

How Queues work

Queues in qalx operate on a simple first in/first out system. You add Items, Sets or Groups (known to the queue as messages) to a queue, and a bot will take those messages off the queue and process them in the order that they were added - deleting the message from the queue once processing is completed. Other than adding messages to the queue you will rarely need to interact directly with it. qalx takes care of that for you.

Note

When a bot takes a message off a queue the message is marked as “in flight”. “In flight” means means that no other bot can access this message - meaning data won’t get processed multiple times. Once processing is completed the message will be deleted from the queue. If any unhandled errors occur with your bot then the message will be put back into the queue and the bot will attempt processing again.

Warning

The maximum amount of time a message can stay “in flight” for is 12 hours. After this time the message will go back into the queue and will be reprocessed (even if the original bot that picked up the message is still processing it). If processing is likely to take more than 12 hours you must manually delete the message from the queue. See the onload step function for more information on how to do this.

Creating a queue

A queue must first be created in order to add messages and start a bot. All of the below methods will return a Queue instance

from pyqalx import QalxSession

qalx = QalxSession()

queue =  qalx.queue.add(name='MyQueue')

You can also get a queue by name if you want to add data to an existing queue.

from pyqalx import QalxSession

qalx = QalxSession()

queue = qalx.queue.get_by_name(name="MyQueue")

But what if you aren’t sure if the queue already exists? You can use get_or_create to handle this.

from pyqalx import QalxSession

qalx = QalxSession()

queue = qalx.queue.get_or_create(name="MyQueue")

Submitting data to queue

Now we have our queue we are ready to submit data. Data is added to a queue via the add_to_queue class method on the specific entity adapter that you are adding entities for.

Note

Ensure that you use the correct entity class for the type of entity that you wish to submit to the queue.

For example:

# This will work
item = qalx.item.add(data={"my": "data"})
Item.add_to_queue(payload=item)

# This will raise a QalxQueueError
Set.add_to_queue(payload=item)

To submit an entity to a queue call the add_to_queue method on the class that the entity you want to submit is for

from pyqalx import QalxSession

qalx = QalxSession()

# First we get the queue
queue = qalx.queue.get_by_name(name="MyQueue")

# Then we create the entity we want to submit.  For this example we're
# using an item, but this could be a set or a group.
item = qalx.item.add(data={"height":5, "width":5}, meta={"shape":"square"})

# We then submit this item to the queue.
# This will remain on the queue until a bot is started that is looking at the queue.
Item.add_to_queue(payload=item, queue=queue)

If you have a Group or Set instance you can submit all the children from the group/set by specifing the children=True argument when calling add_to_queue. This will put a single message on the queue for each of the sets in your group.

Submitting all children for a group to a queue
# set1 and set2 would have been created earlier
group = qalx.group.add(sets=[set1, set2])
# We then submit this entity to the queue.
# This will remain on the queue until a bot is started that is looking at the queue.
Group.add_to_queue(payload=group, queue=queue, children=True)
Submitting all children for a set to a queue
# set1 and set2 would have been created earlier
myset = qalx.set.add(items=[item1, item2])
# We then submit this entity to the queue.
# This will remain on the queue until a bot is started that is looking at the queue.
Set.add_to_queue(payload=myset, queue=queue, children=True)

You can even send a list of entities to a queue, with mixed types

Submitting a list of items with mixed types
item1 = qalx.item.add(data={'my': 'data'})
item2 = qalx.item.add(data={'my': 'data'})

# Here we are submitting a list of items. Some are Item instances, others
# are just guids
Item.add_to_queue(payload=[item1, item2.guid], queue=queue)

Purging queue

It is possible to delete all messages from a queue with the purge method of the Queue class.

from pyqalx import QalxSession

qalx = QalxSession()

# First we get the queue
queue = qalx.queue.get_by_name(name="MyQueue")

# We create 20 items and add them on the queue
items = []
for _ in range(20):
    items.append({"data": {"some": "data"}})
items = qalx.item.add_many(items).items_
Item.add_to_queue(payload=items, queue=queue)
# At this point the queue should have 20 messages on it. We purge the queue
queue.purge()
# And now all messages have been deleted from the queue

Warning

Purging a queue could take up to 60 seconds, regardless of the size of the queue. While a queue is being purged, any additional messages sent to it might also be deleted.