Skip to content

Commit

Permalink
feat: notify event on cert upload
Browse files Browse the repository at this point in the history
Publish certificate-changed event when a custom TLS certificate has been
uploaded.
  • Loading branch information
DavidePrincipi committed Mar 6, 2025
1 parent d484a05 commit f4cf443
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 0 deletions.
31 changes: 31 additions & 0 deletions imageroot/actions/upload-certificate/30certificate_changed
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/usr/bin/env python3

#
# Copyright (C) 2025 Nethesis S.r.l.
# SPDX-License-Identifier: GPL-3.0-or-later
#

import os
import json
import agent
import sys
import base64
import cert_helpers


def main():
request = json.load(sys.stdin)
bcert = base64.b64decode(request["certFile"])
changed_dns_names = cert_helpers.extract_certified_names(bcert)
notify_certificate_changed_event(changed_dns_names)

def notify_certificate_changed_event(changed_dns_names):
rdb = agent.redis_connect(privileged=True)
rdb.publish(f"{os.environ['AGENT_ID']}/event/certificate-changed", json.dumps({
"node_id": int(os.environ['NODE_ID']),
"module_id": os.environ['MODULE_ID'],
"names": list(changed_dns_names),
}))

if __name__ == "__main__":
main()
26 changes: 26 additions & 0 deletions imageroot/pypkg/cert_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,32 @@
import subprocess
import datetime
import select
from cryptography import x509
from cryptography.hazmat.backends import default_backend

def extract_certified_names(cert_data : bytearray) -> set:
"""
Extract the subject common name and subject alternative names (SAN)
from a PEM certificate.
:param cert_data: Certificate, PEM-encoded.
:return: A set of certified host names.
"""
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
hostnames = set()
# Extract Common Name (CN) from the Subject field
subject = cert.subject
cn = subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
if cn:
hostnames.add(cn[0].value)
# Extract Subject Alternative Names (SANs), if any
try:
ext = cert.extensions.get_extension_for_oid(x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
san = ext.value
hostnames.update(san.get_values_for_type(x509.DNSName))
except x509.ExtensionNotFound:
pass
return hostnames

def read_default_cert_names():
"""Return the list of host names configured in the
Expand Down

0 comments on commit f4cf443

Please sign in to comment.