forked from mapp-john/fmc-api
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfmc_api_modules.py
171 lines (160 loc) · 6.1 KB
/
fmc_api_modules.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# Import Required Modules
import os
import re
import sys
import csv
import json
import socket
import random
import netaddr
import getpass
import requests
import traceback
#
#
#
# Define Password Function
def define_password():
password = None
while not password:
password = getpass.getpass('Please Enter API Password: ')
passwordverify = getpass.getpass('Re-enter API Password to Verify: ')
if not password == passwordverify:
print('Passwords Did Not Match Please Try Again')
password = None
return password
#
#
#
# Define Generate Access Token Function
def AccessToken(server,headers,username,password):
auth_url = f'{server}/api/fmc_platform/v1/auth/generatetoken'
try:
# REST call with SSL verification turned off:
r = requests.post(auth_url, headers=headers, auth=requests.auth.HTTPBasicAuth(username,password), verify=False)
print(r.status_code)
auth_token = r.headers['X-auth-access-token']
domains = json.loads(r.headers['DOMAINS'])
if auth_token == None:
print('auth_token not found. Exiting...')
sys.exit()
except Exception as err:
print (f'Error in generating auth token --> {traceback.format_exc()}')
print(r.headers)
sys.exit()
return auth_token,domains
#
#
# Get Device Details from Inventory List
# And Delete item from Inventory List
def GetDeviceDetails(ID,DeviceList):
temp_dict = {}
for item in DeviceList:
if item['id']==ID:
temp_dict['name'] = item['name']
temp_dict['model'] = item['model']
temp_dict['healthStatus'] = item['healthStatus']
temp_dict['sw_version'] = item['sw_version']
temp_dict['license_caps'] = item['license_caps']
if 'chassisData' in item['metadata']: temp_dict['chassisData'] = item['metadata']['chassisData']
# Delete item from Device List
DeviceList.pop(DeviceList.index(item))
return temp_dict
#
#
#
# Get Net Object UUID
def GetNetObjectUUID(server,API_UUID,headers,ObjectName,outfile):
# Create Get DATA JSON Dictionary to collect data from GET calls
GetDATA = {}
GetDATA['items'] = []
try:
# Collect Network Objects
url_get = f'{server}/api/fmc_config/v1/domain/{API_UUID}/object/networks?offset=0&limit=1000'
# REST call with SSL verification turned off
r = requests.get(url_get, headers=headers, verify=False)
status_code = r.status_code
resp = r.text
print(f'Status code is: {status_code}')
json_resp = r.json()
if status_code == 200:
# Loop for First Page of Items
for item in json_resp['items']:
# Append Items to New Dictionary
GetDATA['items'].append({'name': item['name'],'id': item['id']})
while json_resp['paging'].__contains__('next'):
url_get = json_resp['paging']['next'][0]
try:
# REST call with SSL verification turned off
r = requests.get(url_get, headers=headers, verify=False)
status_code = r.status_code
resp = r.text
print(f'Status code is: {status_code}')
json_resp = r.json()
if status_code == 200:
# Loop for First Page of Items
for item in json_resp['items']:
# Append Items to New Dictionary
GetDATA['items'].append({'name': item['name'],'id': item['id']})
except requests.exceptions.HTTPError as err:
print (f'Error in connection --> {err}')
outfile.write(f'Error occurred in POST --> {resp}\n{ObjectName}\n')
# Collect Host Objects
url_get = f'{server}/api/fmc_config/v1/domain/{API_UUID}/object/hosts?offset=0&limit=1000'
# REST call with SSL verification turned off
r = requests.get(url_get, headers=headers, verify=False)
status_code = r.status_code
resp = r.text
print(f'Status code is: {status_code}')
json_resp = r.json()
if status_code == 200:
# Loop for First Page of Items
for item in json_resp['items']:
# Append Items to New Dictionary
GetDATA['items'].append({'name': item['name'],'id': item['id']})
while json_resp['paging'].__contains__('next'):
url_get = json_resp['paging']['next'][0]
try:
# REST call with SSL verification turned off
r = requests.get(url_get, headers=headers, verify=False)
status_code = r.status_code
resp = r.text
print(f'Status code is: {status_code}')
json_resp = r.json()
if status_code == 200:
# Loop for First Page of Items
for item in json_resp['items']:
# Append Items to New Dictionary
GetDATA['items'].append({'name': item['name'],'id': item['id']})
except requests.exceptions.HTTPError as err:
print (f'Error in connection --> {err}')
outfile.write(f'Error occurred in POST --> {resp}\n{ObjectName}\n')
try:
if r: r.close()
except:
None
except requests.exceptions.HTTPError as err:
print (f'Error in connection --> {err}')
outfile.write(f'Error occurred in POST --> {resp}\n{ObjectName}\n')
try:
if r: r.close()
except:
None
for item in GetDATA['items']:
if item['name'] == ObjectName:
# Pull Object UUID from json_resp data
ObjectID = item['id']
return ObjectID
#
#
#
# Convert list elements to string, separating elements with semicolon (;)
def listToString(s):
# initialize an empty string
str1 = ""
# traverse in the string
for ele in s:
str1 += ele
str1 += "; "
# return string
return str1