The problem described in the installer feedback section, was that some devices send several requests to the TFTP server. Since we are using that event as the trigger we want to avoid sending notifications and starting other tasks twice.
Redis is an open source, in-memory data structure store, used as a database, cache and message broker. That might sound a bit complicated. To put it in an easier way, we will use Redis to run tasks. RQ is a Python library which can talk with Redis.
sudo apt-get install redis
The Python library should be installed in the same Virtualenv we’ve been using for the TFTP server.
pip install rq
Later on we will setup both rq and the TFTP server to run as a service, for now we can just a worker it in a new window.
~ ᐅ cd /opt/ztp
/opt/ztp ᐅ rq worker ztp_tasks
10:03:55 RQ worker 'rq:worker:vagrant.15041' started, version 0.10.0
10:03:55 *** Listening on ztp_tasks...
10:03:55 Cleaning registries for queue: ztp_tasks
Now that we have a worker running we can modify our ZTP server to start jobs through rq instead of doing it directly.
Create the file /opt/ztp/app/tasks.py
from app.notifications import notify_slack
import time
def ztp_start(host, file):
msg = '{} downloaded {}'.format(host, file)
notify_slack(msg)
time.sleep(360)
Currently the code defines a ztp_start function which uses the notify_slack we previously setup. There’s also a sleep which is just going to wait there for six minutes. Why would we want that there? Remember that we got several notifications in the previous chapter. We are going to setup the jobs so that only one of these jobs can be running at a time for each host. As of yet we don’t have any other code in the function aside from the notification, so the wait is only meant to prevent further notifications for the current run.
The next file /opt/ztp/app/broker.py is going to call the function we just defined.
import redis
import rq
def trigger_job(host, filename):
redis_conn = redis.Redis()
timeout = 600
q = rq.Queue('ztp_tasks', connection=redis_conn)
job_id = '{}_{}'.format(host, filename)
try:
rq_job = rq.job.Job.fetch(job_id, connection=redis_conn)
if rq_job.status in ['finished', 'failed']:
rq_job.delete()
q.enqueue('app.tasks.ztp_start', host, filename, job_id=job_id,
timeout=timeout)
except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError):
q.enqueue('app.tasks.ztp_start', host, filename, job_id=job_id,
timeout=timeout)
Looking at this function we see that it will take a parameter for a host along with a filename. These will be the IP address of the device that requested the file along with the requested filename. We connect to Redis using the “ztp_tasks” queue, which is the same one that rq started its worker with. A job_id is statically defined by concatenating the host and filename. Redis is then queried to see if a job with this id is already running if it is running nothing happens. If the job has ended or isn’t found a new job is created.
Finally, we need to make some changes to the main program ztp_tftp.py.
We will no longer be calling notify_slack from this code, so remove the import for that function.
from app.notifications import notify_slack
Instead we add an import for trigger_job
from fbtftp.base_server import BaseServer
from app.broker import trigger_job
import os
The session_stats function is also modified to call trigger_job.
def session_stats(stats):
[...]
print('#' * 60)
if stats.packets_sent > 0:
trigger_job(stats.peer[0], stats.file_path)
When we start the device again we see that we are still getting multiple requests from the device.
When we look in Slack though we see that we only get the one notification.
You might in fact see that the device continues to try downloading network-confg after our timeout in ztp_start() has passed, this can be because the device is still trying to figure out its hostname.