Handling jobs with Redis

  • by Patrick Ogenstad
  • April 25, 2018

The problem described in the installer feedback section, was that some devices send several requests to the TFTP server. Since we are using that event as the trigger we want to avoid sending notifications and starting other tasks twice.

Installing Redis and RQ

Redis is an open source, in-memory data structure store, used as a database, cache and message broker. That might sound a bit complicated. To put it in an easier way, we will use Redis to run tasks. RQ is a Python library which can talk with Redis.

sudo apt-get install redis

The Python library should be installed in the same Virtualenv we’ve been using for the TFTP server.

pip install rq

Later on we will setup both rq and the TFTP server to run as a service, for now we can just a worker it in a new window.

~  ᐅ cd /opt/ztp
/opt/ztp  ᐅ rq worker ztp_tasks

10:03:55 RQ worker 'rq:worker:vagrant.15041' started, version 0.10.0
10:03:55 *** Listening on ztp_tasks...
10:03:55 Cleaning registries for queue: ztp_tasks

Now that we have a worker running we can modify our ZTP server to start jobs through rq instead of doing it directly.

Running tasks with rq

Create the file /opt/ztp/app/tasks.py

from app.notifications import notify_slack
import time


def ztp_start(host, file):
    msg = '{} downloaded {}'.format(host, file)
    notify_slack(msg)
    time.sleep(360)

Currently the code defines a ztp_start function which uses the notify_slack we previously setup. There’s also a sleep which is just going to wait there for six minutes. Why would we want that there? Remember that we got several notifications in the previous chapter. We are going to setup the jobs so that only one of these jobs can be running at a time for each host. As of yet we don’t have any other code in the function aside from the notification, so the wait is only meant to prevent further notifications for the current run.

The next file /opt/ztp/app/broker.py is going to call the function we just defined.

import redis
import rq


def trigger_job(host, filename):
    redis_conn = redis.Redis()
    timeout = 600
    q = rq.Queue('ztp_tasks', connection=redis_conn)
    job_id = '{}_{}'.format(host, filename)
    try:
        rq_job = rq.job.Job.fetch(job_id, connection=redis_conn)
        if rq_job.status in ['finished', 'failed']:
            rq_job.delete()
            q.enqueue('app.tasks.ztp_start', host, filename, job_id=job_id,
                      timeout=timeout)
    except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError):
        q.enqueue('app.tasks.ztp_start', host, filename, job_id=job_id,
                  timeout=timeout)

Looking at this function we see that it will take a parameter for a host along with a filename. These will be the IP address of the device that requested the file along with the requested filename. We connect to Redis using the “ztp_tasks” queue, which is the same one that rq started its worker with. A job_id is statically defined by concatenating the host and filename. Redis is then queried to see if a job with this id is already running if it is running nothing happens. If the job has ended or isn’t found a new job is created.

Finally, we need to make some changes to the main program ztp_tftp.py.

We will no longer be calling notify_slack from this code, so remove the import for that function.

from app.notifications import notify_slack

Instead we add an import for trigger_job

from fbtftp.base_server import BaseServer
from app.broker import trigger_job

import os

The session_stats function is also modified to call trigger_job.

def session_stats(stats):
    [...]
    print('#' * 60)
    if stats.packets_sent > 0:
        trigger_job(stats.peer[0], stats.file_path)

Starting the device again

When we start the device again we see that we are still getting multiple requests from the device.

Installer feedback

When we look in Slack though we see that we only get the one notification.

Installer feedback

You might in fact see that the device continues to try downloading network-confg after our timeout in ztp_start() has passed, this can be because the device is still trying to figure out its hostname.