Skip to content

Commit

Permalink
Simplify netbox.py a ton by using xmltodict
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Rybka committed Nov 29, 2017
1 parent 5a4a115 commit 67c6258
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 160 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.pyc
206 changes: 46 additions & 160 deletions netbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,15 @@
import copy
import csv
import urllib2
import xml.dom.minidom
import xml.etree.ElementTree as ElementTree
import yaml
import xmltodict
import traceback, sys, code

# Some helpful constants.
CMD = 'COMMAND'
NB_API = 'NETBOX-API'
TEMPLATE_REQUEST = {NB_API: {CMD: {'num': '1'}}}
TXT = '__text__'
GETAPIVERSION = {NB_API: {CMD: {'num': '1', 'name': 'GetAPIVersion'}}}

# Convenience rename for some code I copypasta'd.
etree = ElementTree
TEMPLATE_REQUEST = {NB_API: {CMD: {'@num': '1'}}}
GETAPIVERSION = {NB_API: {CMD: {'@num': '1', '@name': 'GetAPIVersion'}}}


def GetConfig(filename):
Expand All @@ -29,94 +25,16 @@ def GetConfig(filename):
CONFIG = GetConfig('s2.yaml')


def xml2d(e):
"""Convert an etree into a dict structure.
@type e: etree.Element
@param e: the root of the tree
@return: The dictionary representation of the XML tree
"""
def _xml2d(e):
kids = dict(e.attrib)
if e.text:
kids['__text__'] = e.text
if e.tail:
kids['__tail__'] = e.tail
for k, g in groupby(e, lambda x: x.tag):
g = [_xml2d(x) for x in g]
kids[k] = g
return kids
return {e.tag: _xml2d(e)}


def d2xml(d):
"""convert dict to xml.
1. The top level d must contain a single entry i.e. the root element
2. Keys of the dictionary become sublements or attributes
3. If a value is a simple string, then the key is an attribute
4. if a value is dict then, then key is a subelement
5. if a value is list, then key is a set of sublements
a = { 'module' : {'tag' : [ { 'name': 'a', 'value': 'b'},
{ 'name': 'c', 'value': 'd'},
],
'gobject' : { 'name': 'g', 'type':'xx' },
'uri' : 'test',
}
}
>>> d2xml(a)
<module uri="test">
<gobject type="xx" name="g"/>
<tag name="a" value="b"/>
<tag name="c" value="d"/>
</module>
@type d: dict
@param d: A dictionary formatted as an XML document
@return: A etree Root element
"""
def _d2xml(d, p):
for k, v in d.items():
if isinstance(v, dict):
node = etree.SubElement(p, k)
_d2xml(v, node)
elif isinstance(v, list):
for item in v:
node = etree.SubElement(p, k)
_d2xml(item, node)
elif k == '__text__':
p.text = v
elif k == '__tail__':
p.tail = v
else:
p.set(k, v)

k, v = d.items()[0]
node = etree.Element(k)
_d2xml(v, node)
return node


def post(url, data, contenttype):
"""Sends a POST request to the given URL with the given data and content-
type."""
"""Sends a POST request to the given URL."""
request = urllib2.Request(url, data)
request.add_header('Content-Type', contenttype)
response = urllib2.urlopen(request)
return response.read()


def postxml(url, elem):
"""Sends a POST request to the URL with the XML rooted at 'elem' as
data."""
data = ElementTree.tostring(elem, encoding='UTF-8')
return post(url, data, 'text/xml')


def get_search(lastname=None, firstname=None, nextkey=None):
"""Constructs a SearchPersonData request."""
search_params = {}
if lastname is not None:
search_params['LASTNAME'] = lastname
Expand All @@ -128,19 +46,23 @@ def get_search(lastname=None, firstname=None, nextkey=None):


def remove_access(personid=None, cardid=None):
search_params = {'accesslevels': [('accesslevel', CONFIG['no_access'])]}
cred_params = {'cardformat': CONFIG['card_format']}
"""Removes access for the given personid, and optionally one of their cards."""
# Note, we actually set their access to 'None' instead of removing.
# The API does not actually support removal of access from a person.
search_params = {'ACCESSLEVELS': [{'ACCESSLEVEL': CONFIG['no_access']}]}
cred_params = {'CARDFORMAT': CONFIG['card_format']}
if personid is not None:
search_params['personid'] = personid
search_params['PERSONID'] = personid
if cardid is not None:
cred_params['personid'] = personid
cred_params['encodednum'] = cardid
cred_params['PERSONID'] = personid
cred_params['ENCODEDNUM'] = cardid
execute(get_cmd('RemoveCredential', cred_params))
execute(get_cmd('ModifyPerson', search_params))


def add_person(lastname=None, firstname=None):
search_params = {'accesslevels': [('accesslevel', CONFIG['all_access'])]}
"""Adds a new person with full access."""
search_params = {'ACCESSLEVELS': [{'ACCESSLEVEL': CONFIG['all_access']}]}
if lastname is not None:
search_params['LASTNAME'] = lastname
if firstname is not None:
Expand All @@ -149,11 +71,11 @@ def add_person(lastname=None, firstname=None):


def add_cred(personid=None, cardid=None):
cred_params = {'cardformat': CONFIG['card_format']}
cred_params = {'CARDFORMAT': CONFIG['card_format']}
if personid is not None:
cred_params['personid'] = personid
cred_params['PERSONID'] = personid
if cardid is not None:
cred_params['encodednum'] = cardid
cred_params['ENCODEDNUM'] = cardid
return get_cmd(name='AddCredential', params=cred_params)


Expand All @@ -168,39 +90,13 @@ def get_cmd(name=None, params=None):
dict, with the correct wrapping to be properly parsed into XML.
"""
new_params = {}
for k, v in params.iteritems():
if type(v) is str:
new_params[k.upper()] = {TXT: v}
elif type(v) is list:
l = []
for t in v:
l.append({t[0].upper(): {TXT: t[1]}})
new_params[k.upper()] = l

request = copy.deepcopy(TEMPLATE_REQUEST)
command = request[NB_API][CMD]
command['name'] = name
command['PARAMS'] = new_params

command['@name'] = name
command['PARAMS'] = params
return request


def pretty_print(xml_string):
"""Handy method for debugging XML.
Args:
xml_string: str, the XML as a string
Returns:
str, XML formatted with tabs and newlines and such.
"""
parsed = xml.dom.minidom.parseString(xml_string)
pretty_xml_as_string = parsed.toprettyxml()
return pretty_xml_as_string


def execute(command):
"""Convenience method to execute a command.
Expand All @@ -213,31 +109,13 @@ def execute(command):
dict, parsed XML converted to a dictionary with the API wrapper stripped.
"""
return xml2d(etree.XML(postxml(CONFIG['url'], d2xml(command))))['NETBOX']['RESPONSE']


def get_key(obj, key):
"""Convenience function to get a given key out of a returned obj.
The API has all these annoying layers of indirection. Tries to decode that.
Args:
obj: The dictionary, probably a response.
key: The key to return.
Returns:
The value, after a few layers of indirection.
"""
val = obj[key][0]
if type(val) == dict and val.has_key(TXT):
return val[TXT]
else:
return val
xml_resp = xmltodict.parse(post(CONFIG['url'], xmltodict.unparse(command), 'text/xml'))
return xml_resp['NETBOX']['RESPONSE']


def successful(resp):
"""Determines if a response is successful."""
return resp['CODE'][0]['__text__'] == 'SUCCESS'
return resp['CODE'] == 'SUCCESS'


def add_new_members(fname):
Expand All @@ -249,10 +127,9 @@ def add_new_members(fname):

for person in people.itervalues():
response = execute(add_person(person['last'], person['first']))
response = response[0]
if successful(response):
person['pid'] = response['DETAILS'][0]['PERSONID'][0][TXT]
response = execute(add_cred(person['pid'], person['card_id']))[0]
person['pid'] = response['DETAILS']['PERSONID']
response = execute(add_cred(person['pid'], person['card_id']))
if not successful(response):
print response
else:
Expand All @@ -261,12 +138,12 @@ def add_new_members(fname):

def get_people(resp):
"""Returns a list."""
return get_key(get_key(resp, 'DETAILS'), 'PEOPLE')
return resp['DETAILS']['PEOPLE']


def has_access(person):
key_active = get_key(person, 'ACCESSLEVELS')
return 'ACCESSLEVEL' in key_active and get_key(key_active, 'ACCESSLEVEL') == CONFIG['all_access']
key_active = person['ACCESSLEVELS']
return key_active and 'ACCESSLEVEL' in key_active and key_active['ACCESSLEVEL'] == CONFIG['all_access']


def do_audit():
Expand All @@ -279,16 +156,25 @@ def do_audit():
to_process = []
nextkey = None
while nextkey != '-1':
response = execute(get_search(nextkey=nextkey))[0]
# Don't use get_key here.
response = execute(get_search(nextkey=nextkey))
to_process.extend(get_people(response)['PERSON'])
nextkey = get_key(get_key(response, 'DETAILS'), 'NEXTKEY')
nextkey = response['DETAILS']['NEXTKEY']
for person in to_process:
if has_access(person) and get_key(person, 'LASTNAME') not in people:
print '%s,%s' % (get_key(person, 'LASTNAME'), get_key(person, 'FIRSTNAME'))
if has_access(person) and person['LASTNAME'] not in people:
print '%s,%s' % (person['LASTNAME'], person['FIRSTNAME'])


if __name__ == '__main__':
# By default, add new members
# add_new_members('new2019.csv')
do_audit()
try:
# By default, add new members
# add_new_members('new2019.csv')
do_audit()
except:
# Cool exeception handling from https://stackoverflow.com/a/242514
type, value, tb = sys.exc_info()
traceback.print_exc()
last_frame = lambda tb=tb: last_frame(tb.tb_next) if tb.tb_next else tb
frame = last_frame().tb_frame
ns = dict(frame.f_globals)
ns.update(frame.f_locals)
code.interact(local=ns)
2 changes: 2 additions & 0 deletions new2019.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
last,first,card_id
Brannigan,Zapp,11223344
2 changes: 2 additions & 0 deletions roster.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
last
Brannigan

0 comments on commit 67c6258

Please sign in to comment.