-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcertbot_dns_vultr.py
137 lines (100 loc) · 5.15 KB
/
certbot_dns_vultr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import logging
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
import requests
logger = logging.getLogger(__name__)
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for Vultr
This Authenticator uses the Vultr API to fulfill a dns-01 challenge.
"""
description = "Obtain certs using a DNS TXT record (if you are using Vultr for DNS)."
def __init__(self, *args, **kwargs):
super(Authenticator, self).__init__(*args, **kwargs)
self.credentials = None
self.vultr = None
@classmethod
def add_parser_arguments(cls, add):
super(Authenticator, cls).add_parser_arguments(add)
add("credentials", help="Vultr credentials INI file.")
def more_info(self):
return "This plugin configures a DNS TXT record to respond to a dns-01 challenge using theVultr API."
def _setup_credentials(self):
self.credentials = self._configure_credentials("credentials", "Vultr credentials INI file", {
"key": "API key for Vultr account"
})
def _perform(self, domain, validation_name, validation):
self._get_vultr_client().add_txt_record(domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
self._get_vultr_client().del_txt_record(domain, validation_name, validation)
def _get_vultr_client(self):
if self.vultr is None:
self.vultr = VultrClient(self.credentials.conf("key"))
return self.vultr
class VultrClient:
def __init__(self, key):
self.api_key = key
self.domains_cache = None
self.added_records = dict()
def add_txt_record(self, domain_name, record_name, record_data):
try:
base_domain_name = self.get_base_domain_name(domain_name)
relative_record_name = self.get_relative_record_name(base_domain_name, record_name)
result = self.request("POST", f"/domains/{base_domain_name}/records", {
"type": "TXT",
"name": relative_record_name,
"data": quote(record_data),
})
self.added_records[(record_name, record_data)] = result["record"]["id"]
logger.debug(f'Successfully added TXT record for "{domain_name}"')
except (VultrPluginError, requests.HTTPError) as err:
error_message = err.message if isinstance(err, VultrPluginError) else http_error_message(err)
raise errors.PluginError(f'Failed to add TXT record for "{domain_name}": {error_message}')
def del_txt_record(self, domain_name, record_name, record_data):
if (record_name, record_data) not in self.added_records:
logger.debug(f'Skipping deletion of TXT record for "{domain_name}" since it was not successfully added')
return
try:
base_domain_name = self.get_base_domain_name(domain_name)
record_id = self.added_records[(record_name, record_data)]
self.request("DELETE", f"/domains/{base_domain_name}/records/{record_id}")
logger.debug(f'Successfully deleted TXT record for "{domain_name}"')
except (VultrPluginError, requests.HTTPError) as err:
error_message = err.message if isinstance(err, VultrPluginError) else http_error_message(err)
logger.warning(f'Failed to delete TXT record for "{domain_name}": {error_message}')
def get_base_domain_name(self, full_domain_name):
if self.domains_cache is not None:
domains = self.domains_cache
else:
try:
domains = self.domains_cache = self.request("GET", "/domains")["domains"]
except requests.HTTPError as err:
raise VultrPluginError("Error fetching DNS domains list: " + http_error_message(err))
guess_list = dns_common.base_domain_name_guesses(full_domain_name)
for guess in guess_list:
for base_domain in domains:
if base_domain["domain"] == guess:
logger.debug(f'Using base domain "{guess}" for "{full_domain_name}"')
return guess
raise VultrPluginError(f'Could not find the (base) domain for "{full_domain_name}" (Is the domain set in your DNS?)')
def get_relative_record_name(self, base_domain_name, absolute_record_name):
return absolute_record_name[:-len("." + base_domain_name)]
def request(self, method, path, data=None):
url = "https://api.vultr.com/v2" + path
response = requests.request(method, url, json=data, headers={"Authorization": f"Bearer {self.api_key}"})
response.raise_for_status()
if response.headers["Content-Type"] == "application/json":
return response.json()
else:
return response.text
class VultrPluginError(Exception):
def __init__(self, message):
self.message = message
def http_error_message(http_error):
response = http_error.response
return f"{response.status_code} - {response.text}"
def quote(text):
return f'"{text}"'