The datastore can be used as a job queue system. This guide explains how to create, schedule, and process jobs using the datastore.

How It Works in 3 Steps

1. Create a Job Definition (a datastore item)

Create an item in the datastore that includes a status in its metadata (like “not_started”). This is your job definition and status tracker. Put any data you need to send to the worker in the contents of the item.

1a. Attach an API Trigger

As part of the item creation process, attach an API trigger to your worker automation. Your worker automation will receive a payload containing details about the job (the item’s content and metadata).

2. Create an Orchestration Automation

Create an automation that will be your orchestration automation. This automation will query for jobs with a status of “not_started” and then update the item’s status to “started”. This will automatically fire your trigger and will kick off your worker automation. You can run this automation manually or schedule it to run on a regular basis.

3. Process the Job with your Worker Automation

Your worker automation will receive a payload containing details about the job (the item’s content and metadata). Your worker should process the job as you wish and then update the item’s status to “completed”. Be sure to update with triggers off to avoid triggering your worker automation again.

Creating a Job

To create a new job:

/datastore create item with:
key: jobs
sortField: job-2024-03-15-001
content: {
    "report_type": "monthly",
    "department": "sales"
}
metadata: {
    "status": "not_started",
    "type": "report_generation"
}
triggerUrls: {
    "https://your-automation-url": "your-api-key"
}

Understanding Triggers and Workers

When a job’s status changes, you can have an automation run to process it (this is your worker automation). This happens through triggers.

How Triggers Work

  1. When creating a job, you attach trigger URLs that should be called when the job updates
  2. When you update the job’s status, all attached triggers will fire (set up your trigger to point to your worker automation)
  3. Your worker automation receives a payload containing details about the job (the item’s content and metadata). It also gives you info about what changed in the doc - which in this case is just the status that you updated. What you really care about here is the job details that will allow you to process the job.
{
  "dataChanged": { // Shows you everything that changed in the item
    "metadata": {
      "status": "started"
    }
  },
  "newDataset": { // This is the full current contents of your item
    "key": "jobs",
    "sortField": "job-2024-03-15-001",
    "content": {
      "report_type": "monthly",
      "department": "sales"
    },
    "metadata": {
      "status": "started",
      "type": "report_generation"
    }
  }
}

Controlling Triggers

Sometimes you want to update a job without firing triggers (ie: you don’t want your workers to run). In this case use triggers off:

/datastore update item with triggers off:
key: jobs
sortField: job-2024-03-15-001
metadata: {
    "status": "completed",
    "completed_at": "2024-03-15T10:00:00Z"
}

Processing Jobs

There are two ways to process jobs:

1. On-Demand Processing

Write an automation that you run manually.

/datastore update item with:
key: jobs
sortField: job-2024-03-15-001
metadata: {
    "status": "started",
    "started_at": "2024-03-15T10:00:00Z",
    "worker_id": "worker-123",
    "version": 1,
    "locked_until": "2024-03-15T10:10:00Z"
}

2. Scheduled Processing

Create a job orchestration automation that runs every 5 minutes (minimum interval) to process jobs. Here’s the complete flow:

  1. First, query for available jobs:
/datastore query items where:
key: jobs
metadata: {
    "status": "not_started"
}
limit: 10
  1. For each job returned, attempt to claim it:
/datastore update item with:
key: jobs
sortField: [job.sortField]
metadata: {
    "status": "started",
    "version": [current_version],
    "worker_id": "worker-123",
    "started_at": "2024-03-15T10:00:00Z",
    "locked_until": "2024-03-15T10:10:00Z"
}

⚠️ WARNING: Race Condition Risk There is a critical race condition that can occur here: If you ran this scheduled orchestration automation simultaneously with another orchestration automation that is also processing jobs, you could end up with multiple workers claiming the same job.

For example:

  1. Worker A and Worker B both query at 10:00:00
  2. Both see Job X in “not_started” state
  3. Both try to claim Job X at almost the same time
  4. Worker A’s update succeeds because it’s slightly faster
  5. Worker B’s update fails because the version number no longer matches
  6. Worker B should catch this error and move on to the next job

To handle this safely:

  1. Always check if your claim attempt succeeded

  2. Move on to the next job if the claim fails

  3. Consider adding a small random delay before claiming to help distribute worker claims

  4. Keep track of failed claim attempts for monitoring

  5. After successfully claiming a job, process it and update its status:

/datastore update item with triggers off:
key: jobs
sortField: [job.sortField]
metadata: {
    "status": "completed",
    "completed_at": "2024-03-15T10:05:00Z"
}

Handling Multiple Workers Safely

When multiple automations process jobs simultaneously, you need to prevent them from processing the same job. The status field handles this automatically:

1. Status-Based Job Claims

Workers only query for jobs with “not_started” status:

/datastore query items where:
key: jobs
metadata: {
    "status": "not_started"
}
limit: 10

When a worker claims a job, it updates the status to “started”. This automatically prevents other workers from seeing or claiming the job in their queries:

/datastore update item with:
key: jobs
sortField: job-2024-03-15-001
metadata: {
    "status": "started",
    "worker_id": "worker-123",
    "started_at": "2024-03-15T10:00:00Z",
    "locked_until": "2024-03-15T10:10:00Z"
}

2. Job Locking

Include these fields to track job processing:

  • worker_id: Identifies which worker has the job
  • started_at: When processing began
  • locked_until: When the lock expires
  • attempts: Number of processing attempts

3. Lock Timeouts

Set a locked_until time when claiming a job:

  • Other workers can claim the job after this time by querying for expired locks
  • Prevents jobs from being stuck if a worker crashes
  • Typically 5-15 minutes depending on job complexity

To handle expired locks, workers can also query for abandoned jobs:

/datastore query items where:
key: jobs
metadata: {
    "status": "started",
    "locked_until": {
        "lt": "CURRENT_TIMESTAMP"
    }
}
limit: 10

Complete Job Lifecycle Example

  1. Create Job
/datastore create item with:
key: jobs
sortField: job-2024-03-15-001
content: {
    "report_type": "monthly",
    "department": "sales"
}
metadata: {
    "status": "not_started",
    "type": "report_generation",
    "created_at": "2024-03-15T09:00:00Z",
}
triggerUrls: {
    "https://your-automation-url": "your-api-key"
}
  1. Claim Job
/datastore update item with:
key: jobs
sortField: job-2024-03-15-001
metadata: {
    "status": "started",
    "worker_id": "worker-123",
    "started_at": "2024-03-15T09:05:00Z",
    "locked_until": "2024-03-15T09:20:00Z"
}
  1. Complete Job Be sure to update with triggers off to avoid triggering your worker automation again.
/datastore update item with triggers off:
key: jobs
sortField: job-2024-03-15-001
metadata: {
    "status": "completed",
    "completed_at": "2024-03-15T09:15:00Z",
    "result": {
        "success": true,
        "records_processed": 1000
    }
}