Skip to content

Commit

Permalink
Merge pull request #2 from PagerDuty/3645-agent-queue
Browse files Browse the repository at this point in the history
@divtxt PDQueue + queueing scripts prototype
  • Loading branch information
anitarao committed Nov 6, 2013
2 parents cd3bfe6 + 4ac9f2c commit 210d65b
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 13 deletions.
53 changes: 53 additions & 0 deletions bin/pd-queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/usr/bin/env python
#
# Python script to queue an incident event for delayed send to PagerDuty.
#
# Copyright (c) 2013, PagerDuty, Inc. <info@pagerduty.com>
# All rights reserved.
#

def build_queue_arg_parser(description):
from pdagent.argparse import ArgumentParser
parser = ArgumentParser(description=description)
parser.add_argument("-k", "--service-key", dest="service_key", required=True,
help="Service API Key")
parser.add_argument("-t", "--event-type", dest="event_type", required=True,
choices=["trigger", "acknowledge", "resolve"],
help="Event type")
parser.add_argument("-d", "--description", dest="description",
help="Short description of the problem"),
parser.add_argument("-i", "--incident-key", dest="incident_key",
help="Incident Key"),
parser.add_argument("-f", "--field", action="append", dest="fields",
help="Add given KEY=VALUE pair to the event details"
)
return parser

def parse_fields(fields):
if fields is None:
return {}
return dict(f.split("=", 2) for f in fields)

def main():
from pdagent.pdagentutil import queue_event
description="Queue up a trigger, acknowledge or resolve event to PagerDuty."
parser = build_queue_arg_parser(description)
args = parser.parse_args()
details = parse_fields(args.fields)

if args.event_type == "trigger":
if not args.description:
parser.error("Event type '%s' requires description" % args.event_type)
else:
if not args.incident_key:
parser.error("Event type '%s' requires incident key" % args.event_type)

queue_event(args.event_type, args.service_key, args.incident_key, args.description, details)

if __name__ == "__main__":
import sys
from os.path import abspath, dirname, join
proj_dir = dirname(dirname(abspath(__file__)))
sys.path.append(proj_dir)
main()

7 changes: 5 additions & 2 deletions bin/pd-send.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def build_send_arg_parser(description):
parser.add_argument("-t", "--event-type", dest="event_type", required=True,
choices=["trigger", "acknowledge", "resolve"],
help="Event type")
parser.add_argument("-d", "--description", dest="description", required=True,
parser.add_argument("-d", "--description", dest="description",
help="Short description of the problem"),
parser.add_argument("-i", "--incident-key", dest="incident_key",
help="Incident Key"),
Expand All @@ -35,7 +35,10 @@ def main():
args = parser.parse_args()
details = parse_fields(args.fields)

if args.event_type != "trigger":
if args.event_type == "trigger":
if not args.description:
parser.error("Event type '%s' requires description" % args.event_type)
else:
if not args.incident_key:
parser.error("Event type '%s' requires incident key" % args.event_type)

Expand Down
36 changes: 25 additions & 11 deletions pdagent/pdagentutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,46 @@
EVENTS_API_BASE = "https://events.pagerduty.com/generic/2010-04-15/create_event.json"

def send_event(event_type, service_key, incident_key, description, details):
d = {
"service_key": service_key,
"event_type": event_type,
"incident_key": incident_key,
"details": details,
}
if description is not None:
d["description"] = description

print "Sending %s..." % event_type

j = json.dumps(d)
j = _build_event_json_str(event_type, service_key, incident_key, description, details)
send_event_json_str(j)

def send_event_json_str(event_str):
request = urllib2.Request(EVENTS_API_BASE)
request.add_header("Content-type", "application/json")
request.add_data(j)
request.add_data(event_str)

response = urllib2.urlopen(request)
http_code = response.getcode()
result = json.loads(response.read())

print "HTTP status code:", http_code
print "Response data:", repr(result)
incident_key = None
if result["status"] == "success":
incident_key = result["incident_key"]
print "Success! incident_key =", incident_key
else:
print "Error! Reason:", str(response)
return (incident_key, http_code)

def queue_event(event_type, service_key, incident_key, description, details):
from pdqueue import PDQueue
print "Queuing %s..." % event_type

event = _build_event_json_str(event_type, service_key, incident_key, description, details)
PDQueue().enqueue(event)

def _build_event_json_str(event_type, service_key, incident_key, description, details):
d = {
"service_key": service_key,
"event_type": event_type,
"details": details,
}
if incident_key is not None:
d["incident_key"] = incident_key
if description is not None:
d["description"] = description

return json.dumps(d)
71 changes: 71 additions & 0 deletions pdagent/pdqueue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import fcntl
import os
import re
import time

class PDQueue(object):
"""
This class implements a simple directory based queue for PagerDuty events
"""

QUEUE_DIR = "/tmp/pagerduty" # TODO changeme

def __init__(self, queue_dir=QUEUE_DIR):
self.queue_dir = queue_dir
self._create_queue_dir()
self._verify_permissions()

def _create_queue_dir(self):
if not os.access(self.queue_dir, os.F_OK):
os.mkdir(self.queue_dir, 0700)

def _verify_permissions(self):
if not (os.access(self.queue_dir, os.R_OK)
and os.access(self.queue_dir, os.W_OK)):
raise Exception("Can't read/write to directory %s, please check permissions." % self.queue_dir)

# Get the list of files from the queue directory
def _queued_files(self):
files = os.listdir(self.queue_dir)
pd_names = re.compile("pd_")
pd_file_names = filter(pd_names.match, files)

# We need to sort the files by the timestamp.
# This function extracts the timestamp out of the file name
def file_timestamp(file_name):
return int(re.search('pd_(\d+)_', file_name).group(1))

sorted_file_names = sorted(pd_file_names, key=file_timestamp)
return pd_file_names

def _flush_queue(self):
file_names = self._queued_files()
# TODO handle related incidents e.g. if there is an ack for which a resolve is also present
for file_name in file_names:
file_path = ("%s/%s" % (self.queue_dir, file_name))
json_event_str = None
with open(file_path, "r") as event_file:
json_event_str = event_file.read()
incident_key, status_code = send_event_json_str(json_event_str)

# clean up the file only if we are successful, or if the failure was server-side.
if not (status_code >= 500 and status_code < 600): # success, or non-server-side problem
os.remove(file_path)

def flush(self):
with open("%s/lockfile" % self.queue_dir, "w") as lock_file:
try:
fcntl.lockf(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
# We have acquired the lock here; let's flush the queue
self._flush_queue()
except IOError as e:
print "Error while trying to acquire lock on queue: %s" % str(e)
finally:
fcntl.lockf(lock_file.fileno(), fcntl.LOCK_UN)

def enqueue(self, event_json_str):
process_id = os.getpid()
time_seconds = int(time.time())
file_name = "%s/pd_%d_%d" % (self.queue_dir, time_seconds, process_id)
with open(file_name, "w", 0600) as f:
f.write(event_json_str)

0 comments on commit 210d65b

Please sign in to comment.