Using Datastore as a Job Queue
Learn how to implement a job queue system using the datastore command
The datastore can be used as a job queue system. This guide explains how to create, schedule, and process jobs using the datastore.
How It Works in 3 Steps
1. Create a Job Definition (a datastore item)
Create an item in the datastore that includes a status in its metadata (like “not_started”). This is your job definition and status tracker. Put any data you need to send to the worker in the contents of the item.
1a. Attach an API Trigger
As part of the item creation process, attach an API trigger to your worker automation. Your worker automation will receive a payload containing details about the job (the item’s content and metadata).
2. Create an Orchestration Automation
Create an automation that will be your orchestration automation. This automation will query for jobs with a status of “not_started” and then update the item’s status to “started”. This will automatically fire your trigger and will kick off your worker automation. You can run this automation manually or schedule it to run on a regular basis.
3. Process the Job with your Worker Automation
Your worker automation will receive a payload containing details about the job (the item’s content and metadata). Your worker should process the job as you wish and then update the item’s status to “completed”. Be sure to update with triggers off to avoid triggering your worker automation again.
Creating a Job
To create a new job:
Understanding Triggers and Workers
When a job’s status changes, you can have an automation run to process it (this is your worker automation). This happens through triggers.
How Triggers Work
- When creating a job, you attach trigger URLs that should be called when the job updates
- When you update the job’s status, all attached triggers will fire (set up your trigger to point to your worker automation)
- Your worker automation receives a payload containing details about the job (the item’s content and metadata). It also gives you info about what changed in the doc - which in this case is just the status that you updated. What you really care about here is the job details that will allow you to process the job.
Controlling Triggers
Sometimes you want to update a job without firing triggers (ie: you don’t want your workers to run). In this case use triggers off
:
Processing Jobs
There are two ways to process jobs:
1. On-Demand Processing
Write an automation that you run manually.
2. Scheduled Processing
Create a job orchestration automation that runs every 5 minutes (minimum interval) to process jobs. Here’s the complete flow:
- First, query for available jobs:
- For each job returned, attempt to claim it:
⚠️ WARNING: Race Condition Risk There is a critical race condition that can occur here: If you ran this scheduled orchestration automation simultaneously with another orchestration automation that is also processing jobs, you could end up with multiple workers claiming the same job.
For example:
- Worker A and Worker B both query at 10:00:00
- Both see Job X in “not_started” state
- Both try to claim Job X at almost the same time
- Worker A’s update succeeds because it’s slightly faster
- Worker B’s update fails because the version number no longer matches
- Worker B should catch this error and move on to the next job
To handle this safely:
-
Always check if your claim attempt succeeded
-
Move on to the next job if the claim fails
-
Consider adding a small random delay before claiming to help distribute worker claims
-
Keep track of failed claim attempts for monitoring
-
After successfully claiming a job, process it and update its status:
Handling Multiple Workers Safely
When multiple automations process jobs simultaneously, you need to prevent them from processing the same job. The status field handles this automatically:
1. Status-Based Job Claims
Workers only query for jobs with “not_started” status:
When a worker claims a job, it updates the status to “started”. This automatically prevents other workers from seeing or claiming the job in their queries:
2. Job Locking
Include these fields to track job processing:
worker_id
: Identifies which worker has the jobstarted_at
: When processing beganlocked_until
: When the lock expiresattempts
: Number of processing attempts
3. Lock Timeouts
Set a locked_until
time when claiming a job:
- Other workers can claim the job after this time by querying for expired locks
- Prevents jobs from being stuck if a worker crashes
- Typically 5-15 minutes depending on job complexity
To handle expired locks, workers can also query for abandoned jobs:
Complete Job Lifecycle Example
- Create Job
- Claim Job
- Complete Job Be sure to update with triggers off to avoid triggering your worker automation again.