This repository has been archived by the owner on Jun 24, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 62
/
incident_demo.py
123 lines (101 loc) · 3.52 KB
/
incident_demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from __future__ import print_function
from os.path import dirname, join
import logging
from time import sleep
from pprint import pprint
import pypd
from pypd.errors import BadRequest
# turn logging on debug so we can see details about HTTP requests
logging.basicConfig(level=logging.DEBUG)
# path should be wherever your api.key file exists
path = join(dirname(dirname(__file__)), 'api.key')
pypd.set_api_key_from_file(path)
# set the from_email to an appropriate email for the api key used
from_email = '[email protected]'
# in this case assuming that there is already a service created
service = pypd.Service.find_one()
# assuming an escalation policy exists as well
escalation_policy = pypd.EscalationPolicy.find_one()
# set some incident data with a incident_key we can use to find it later if
# we want to avoid duplicating until we act on any open incidents
data = {
'type': 'incident',
'title': 'incident_demo_incident2',
'service': {
'id': service['id'],
'type': 'service_reference',
},
'incident_key': 'incident_demo_key',
'body': {
'type': 'incident_body',
'details': 'testing creating an incident',
},
'escalation_policy': {
'id': escalation_policy['id'],
'type': 'escalation_policy_reference',
}
}
# if the incident is already open it will error with BadRequest
try:
incident = pypd.Incident.create(
data=data,
add_headers={'from': from_email, },
)
except BadRequest:
incident = pypd.Incident.find(incident_key='incident_demo_key')[-1]
# mergable incident
data_mergable = data.copy()
mergable_key = 'incident_demo_key_mergable'
data_mergable['incident_key'] = mergable_key
try:
to_merge = pypd.Incident.create(
data=data_mergable,
add_headers={'from': from_email, }
)
except BadRequest:
to_merge = pypd.Incident.find(incident_key=mergable_key)[-1]
# ack it, snooze it, resolve it... bop it?
pprint(incident)
pprint(incident.json)
incident.acknowledge(from_email)
incident.snooze(from_email, duration=3600)
incident.create_note(from_email, 'This is a note!')
incident.merge(from_email, [to_merge, ])
# before we trigger an event get all currently triggered incidents
triggered_incidents = pypd.Incident.find(statuses=['triggered', ])
# let's see if events and alerts work!
pypd.EventV2.create(data={
'routing_key': '64e61052453e4ec0a3b42e93ac235375',
'event_action': 'trigger',
'payload': {
'summary': 'this is an error event!',
'severity': 'error',
'source': 'pypd bot',
}
})
# wait for our events to trigger incidents
while True:
incidents = pypd.Incident.find(statuses=['triggered', ])
if len(incidents) > len(triggered_incidents):
break
pprint('Sleeping for 1 second...')
sleep(1)
# find the new incidents because they will have alerts since they were event
# generated
triggered_ids = [i['id'] for i in triggered_incidents]
incident_ids = [i['id'] for i in incidents]
new_ids = set(triggered_ids).union(set(incident_ids))
new_incidents = filter(lambda i: i['id'] in new_ids, incidents)
new_incident = new_incidents[0]
alerts = new_incident.alerts()
# try some alert actions
alert = alerts[0]
pprint(alert.json)
pprint(alert.resolve(from_email))
for i in incidents:
# skip the one we resolved earlier because it will only have 1 alert
if i['id'] == new_incident['id']:
continue
i.resolve(from_email=from_email, resolution='resolved automagically!')
# resolve and finish up
incident.resolve(from_email, resolution='resolved automatically!')