Skip to content

Commit

Permalink
subject and issuer are now structured data instead of an RFC string
Browse files Browse the repository at this point in the history
  • Loading branch information
mathiasertl committed Sep 2, 2023
1 parent 74c45e5 commit e58a82a
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 29 deletions.
4 changes: 3 additions & 1 deletion ca/django_ca/api/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ def sign_certificate(request: WSGIRequest, serial: str, data: SignCertificateSch
csr = x509.load_pem_x509_csr(data.csr.encode())
ca = get_certificate_authority(serial)
profile = profiles[data.profile]
subject = x509.Name.from_rfc4514_string(data.subject)
subject = x509.Name(
[x509.NameAttribute(oid=x509.ObjectIdentifier(attr.oid), value=attr.value) for attr in data.subject]
)
algorithm = expires = None
extensions: List[x509.Extension[x509.ExtensionType]] = []

Expand Down
50 changes: 36 additions & 14 deletions ca/django_ca/api/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
"""Pydantic Schemas for the API."""

import abc
import base64
from datetime import datetime
from typing import Optional
from typing import List, Optional, Union

import pydantic
from ninja import Field, ModelSchema, Schema

from cryptography.x509.oid import NameOID

from django_ca import ca_settings, constants
from django_ca.api.extension_schemas import DATETIME_EXAMPLE, ExtensionsSchema
from django_ca.constants import ReasonFlags
Expand All @@ -27,6 +31,24 @@
from django_ca.typehints import SerializedExtension


class NameAttributeSchema(pydantic.BaseModel):
"""docstring for class"""

oid: str = Field(
title="OID",
description="The attribute OID as dotted string.",
example=NameOID.COMMON_NAME.dotted_string,
)

value: Union[str, bytes] = Field(
description="The value of the attribute.",
)

class Config:
# NOTE: json_encoders does not seem to do anything if there is a Union[] annotation
json_encoders = {bytes: lambda v: base64.b64encode(v).decode()} # pragma: no cover


class X509BaseSchema(ModelSchema, abc.ABC):
"""Base schema for CAs and Certificates."""

Expand All @@ -42,10 +64,8 @@ class X509BaseSchema(ModelSchema, abc.ABC):
example="-----BEGIN CERTIFICATE-----\n...-----END CERTIFICATE-----\n",
)
serial: str = Field(description="Serial (in hex) of the certificate.", example="ABC...0123")
subject: str = Field(
description="The subject as RFC 4514 formatted string.",
example="CN=example.com,O=Example,ST=Vienna,C=AT",
)
subject: List[NameAttributeSchema] = Field(description="The subject as list of name attributes.")
issuer: List[NameAttributeSchema] = Field(description="The issuer as list of name attributes.")
revoked: bool = Field(description="If the certificate was revoked.", example=False)
updated: datetime = Field(description="When the certificate was last updated.", example=DATETIME_EXAMPLE)

Expand All @@ -54,22 +74,27 @@ class Config: # pylint: disable=missing-class-docstring
model_fields = sorted(["created", "revoked", "serial", "updated"])

@staticmethod
def resolve_created(obj: CertificateAuthority) -> datetime:
def resolve_created(obj: X509CertMixin) -> datetime:
"""Strip microseconds from the attribute."""
return obj.created.replace(microsecond=0)

@staticmethod
def resolve_pem(obj: CertificateAuthority) -> str:
def resolve_pem(obj: X509CertMixin) -> str:
"""Convert the public certificate to its PEM format"""
return obj.pub.pem

@staticmethod
def resolve_subject(obj: CertificateAuthority) -> str:
def resolve_subject(obj: X509CertMixin) -> List[NameAttributeSchema]:
"""Convert the subject to its RFC 4514 representation."""
return obj.subject.rfc4514_string()
return [NameAttributeSchema(oid=attr.oid.dotted_string, value=attr.value) for attr in obj.subject]

@staticmethod
def resolve_updated(obj: CertificateAuthority) -> datetime:
def resolve_issuer(obj: X509CertMixin) -> List[NameAttributeSchema]:
"""Convert the issuer to its serialized representation."""
return [NameAttributeSchema(oid=attr.oid.dotted_string, value=attr.value) for attr in obj.issuer]

@staticmethod
def resolve_updated(obj: X509CertMixin) -> datetime:
"""Strip microseconds from the attribute."""
return obj.updated.replace(microsecond=0)

Expand Down Expand Up @@ -204,10 +229,7 @@ class SignCertificateSchema(Schema):
default=ca_settings.CA_DEFAULT_PROFILE,
enum=sorted(ca_settings.CA_PROFILES),
)
subject: str = Field(
description="The subject as RFC 4514 formatted string.",
example="CN=example.com,O=Example,ST=Vienna,C=AT",
)
subject: List[NameAttributeSchema] = Field(description="The subject as list of name attributes.")


class RevokeCertificateSchema(Schema):
Expand Down
6 changes: 5 additions & 1 deletion ca/django_ca/tests/api/test_list_cas.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def setUp(self) -> None:
"can_sign_certificates": False,
"created": self.iso_format(self.ca.created),
"crl_url": self.ca.crl_url,
"issuer": [{"oid": attr.oid.dotted_string, "value": attr.value} for attr in cert["issuer"]],
"issuer_alt_name": "",
"issuer_url": self.ca.issuer_url,
"not_after": self.iso_format(self.ca.expires),
Expand All @@ -56,7 +57,10 @@ def setUp(self) -> None:
"revoked": False,
"serial": cert["serial"],
"sign_certificate_policies": None,
"subject": x509_name(cert["subject"]).rfc4514_string(),
"subject": [
{"oid": attr.oid.dotted_string, "value": attr.value}
for attr in x509_name(cert["subject"])
],
"terms_of_service": "",
"updated": self.iso_format(self.ca.updated),
"website": "",
Expand Down
6 changes: 5 additions & 1 deletion ca/django_ca/tests/api/test_list_certs.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,17 @@ def setUp(self) -> None:
{
"autogenerated": False,
"created": self.iso_format(self.cert.created),
"issuer": [{"oid": attr.oid.dotted_string, "value": attr.value} for attr in cert["issuer"]],
"not_after": self.iso_format(self.cert.expires),
"not_before": self.iso_format(self.cert.valid_from),
"pem": cert["pub"]["pem"],
"profile": self.cert.profile,
"revoked": False,
"serial": cert["serial"],
"subject": x509_name(cert["subject"]).rfc4514_string(),
"subject": [
{"oid": attr.oid.dotted_string, "value": attr.value}
for attr in x509_name(cert["subject"])
],
"updated": self.iso_format(self.cert.updated),
}
]
Expand Down
5 changes: 4 additions & 1 deletion ca/django_ca/tests/api/test_revoke_cert.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,16 @@ def setUp(self) -> None:
self.expected_response = {
"autogenerated": False,
"created": self.iso_format(self.cert.created),
"issuer": [{"oid": attr.oid.dotted_string, "value": attr.value} for attr in cert["issuer"]],
"not_after": self.iso_format(self.cert.expires),
"not_before": self.iso_format(self.cert.valid_from),
"pem": cert["pub"]["pem"],
"profile": self.cert.profile,
"revoked": True,
"serial": cert["serial"],
"subject": x509_name(cert["subject"]).rfc4514_string(),
"subject": [
{"oid": attr.oid.dotted_string, "value": attr.value} for attr in x509_name(cert["subject"])
],
"updated": self.iso_format(self.cert.updated),
}

Expand Down
43 changes: 43 additions & 0 deletions ca/django_ca/tests/api/test_schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.

"""Tests for pydantic schemas."""

import json

from cryptography import x509
from cryptography.x509.oid import NameOID

import pytest

from django_ca.api.schemas import NameAttributeSchema


@pytest.mark.parametrize(
"oid,value",
[
(NameOID.COMMON_NAME, "example.com"),
(NameOID.COUNTRY_NAME, "AT"),
],
)
def test_name_attribute_schema(oid: x509.ObjectIdentifier, value: str) -> None:
"""Test NameAttributeSchema."""
encoded = NameAttributeSchema(oid=oid.dotted_string, value=value).json()
assert json.loads(encoded) == {"oid": oid.dotted_string, "value": value}


@pytest.mark.xfail(reason="json_encoders doesn't work for Union[] attributes in pydantic 1.10.12.")
def test_name_attribute_with_bytes() -> None:
"""Test name attribute with bytes. This does not currently work."""
encoded = NameAttributeSchema(oid=NameOID.X500_UNIQUE_IDENTIFIER.dotted_string, value=b"\x00\x01").json()
assert json.loads(encoded) == {"oid": NameOID.X500_UNIQUE_IDENTIFIER.dotted_string, "value": "AAE="}
25 changes: 17 additions & 8 deletions ca/django_ca/tests/api/test_sign_cert.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
class SignCertificateTestCase(APITestCaseMixin, TestCase):
"""Test the signing certificates via the API."""

default_subject = [{"oid": NameOID.COMMON_NAME.dotted_string, "value": "api.example.com"}]
path = reverse_lazy("django_ca:api:sign_certificate", kwargs={"serial": certs["root"]["serial"]})
required_permission = (Certificate, "sign_certificate")

Expand All @@ -49,19 +50,27 @@ def setUp(self) -> None:
self.expected_response = {
"autogenerated": False,
"created": self.iso_format(timestamps["everything_valid"]),
"issuer": [{"oid": attr.oid.dotted_string, "value": attr.value} for attr in cert["issuer"]],
"not_after": self.iso_format(timestamps["everything_valid"] + ca_settings.CA_DEFAULT_EXPIRES),
"not_before": self.iso_format(timestamps["everything_valid"]),
"pem": cert["pub"]["pem"],
"profile": ca_settings.CA_DEFAULT_PROFILE,
"revoked": False,
"serial": cert["serial"],
"subject": "CN=api.example.com,OU=Django CA Testsuite,O=Django CA,L=Vienna,ST=Vienna,C=AT",
"subject": [
{"oid": NameOID.COUNTRY_NAME.dotted_string, "value": "AT"},
{"oid": NameOID.STATE_OR_PROVINCE_NAME.dotted_string, "value": "Vienna"},
{"oid": NameOID.LOCALITY_NAME.dotted_string, "value": "Vienna"},
{"oid": NameOID.ORGANIZATION_NAME.dotted_string, "value": "Django CA"},
{"oid": NameOID.ORGANIZATIONAL_UNIT_NAME.dotted_string, "value": "Django CA Testsuite"},
{"oid": NameOID.COMMON_NAME.dotted_string, "value": "api.example.com"},
],
"updated": self.iso_format(timestamps["everything_valid"]),
}

def default_request(self, *args: Any, **kwargs: Any) -> "HttpResponse":
if not args:
args = ({"csr": certs["root-cert"]["csr"]["pem"], "subject": "CN=api.example.com"},)
args = ({"csr": certs["root-cert"]["csr"]["pem"], "subject": self.default_subject},)

kwargs["content_type"] = "application/json"
return self.client.post(self.path, *args, **kwargs)
Expand Down Expand Up @@ -96,7 +105,7 @@ def test_sign_certificate_with_parameters(self) -> None:
response = self.default_request(
{
"csr": certs["root-cert"]["csr"]["pem"],
"subject": "CN=api.example.com",
"subject": self.default_subject,
"autogenerated": True,
"profile": "server",
"expires": self.iso_format(expires),
Expand Down Expand Up @@ -124,7 +133,7 @@ def test_sign_certificate_with_extensions(self) -> None:
response = self.default_request(
{
"csr": certs["root-cert"]["csr"]["pem"],
"subject": "CN=api.example.com",
"subject": self.default_subject,
"extensions": {
"authority_information_access": {
"value": {
Expand Down Expand Up @@ -290,7 +299,7 @@ def test_sign_certificate_with_subject_alternative_name(self) -> None:
response = self.default_request(
{
"csr": certs["root-cert"]["csr"]["pem"],
"subject": "CN=api.example.com",
"subject": self.default_subject,
"extensions": {
"subject_alternative_name": {
"critical": not constants.EXTENSION_DEFAULT_CRITICAL[
Expand Down Expand Up @@ -331,7 +340,7 @@ def test_crldp_with_full_name_and_relative_name(self) -> None:
response = self.default_request(
{
"csr": certs["root-cert"]["csr"]["pem"],
"subject": "CN=api.example.com",
"subject": self.default_subject,
"extensions": {
"crl_distribution_points": {
"value": [
Expand Down Expand Up @@ -372,7 +381,7 @@ def test_crldp_with_no_full_name_or_relative_name(self) -> None:
response = self.default_request(
{
"csr": certs["root-cert"]["csr"]["pem"],
"subject": "CN=api.example.com",
"subject": self.default_subject,
"extensions": {
"crl_distribution_points": {
"value": [
Expand Down Expand Up @@ -410,7 +419,7 @@ def test_with_invalid_key_usage(self) -> None:
response = self.default_request(
{
"csr": certs["root-cert"]["csr"]["pem"],
"subject": "CN=api.example.com",
"subject": self.default_subject,
"extensions": {
"key_usage": {"value": ["unknown"]},
},
Expand Down
6 changes: 5 additions & 1 deletion ca/django_ca/tests/api/test_update_ca.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def setUp(self) -> None:
**{
"can_sign_certificates": False,
"created": self.iso_format(self.ca.created),
"issuer": [{"oid": attr.oid.dotted_string, "value": attr.value} for attr in cert["issuer"]],
"not_after": self.iso_format(self.ca.expires),
"not_before": self.iso_format(self.ca.valid_from),
"pem": cert["pub"]["pem"],
Expand All @@ -77,7 +78,10 @@ def setUp(self) -> None:
"critical": constants.EXTENSION_DEFAULT_CRITICAL[ExtensionOID.CERTIFICATE_POLICIES],
"value": [{"policy_identifier": "1.1.1", "policy_qualifiers": None}],
},
"subject": x509_name(cert["subject"]).rfc4514_string(),
"subject": [
{"oid": attr.oid.dotted_string, "value": attr.value}
for attr in x509_name(cert["subject"])
],
"updated": self.iso_format(timestamps["everything_valid"]),
},
)
Expand Down
5 changes: 4 additions & 1 deletion ca/django_ca/tests/api/test_view_ca.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def setUp(self) -> None:
"can_sign_certificates": False,
"created": self.iso_format(self.ca.created),
"crl_url": self.ca.crl_url,
"issuer": [{"oid": attr.oid.dotted_string, "value": attr.value} for attr in cert["issuer"]],
"issuer_alt_name": "",
"issuer_url": self.ca.issuer_url,
"name": "root",
Expand All @@ -56,7 +57,9 @@ def setUp(self) -> None:
"pem": cert["pub"]["pem"],
"revoked": False,
"serial": cert["serial"],
"subject": x509_name(cert["subject"]).rfc4514_string(),
"subject": [
{"oid": attr.oid.dotted_string, "value": attr.value} for attr in x509_name(cert["subject"])
],
"sign_certificate_policies": None,
"terms_of_service": "",
"updated": self.iso_format(self.ca.updated),
Expand Down
5 changes: 4 additions & 1 deletion ca/django_ca/tests/api/test_view_cert.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ def setUp(self) -> None:
self.expected_response = {
"autogenerated": False,
"created": self.iso_format(self.cert.created),
"issuer": [{"oid": attr.oid.dotted_string, "value": attr.value} for attr in cert["issuer"]],
"not_after": self.iso_format(self.cert.expires),
"not_before": self.iso_format(self.cert.valid_from),
"pem": cert["pub"]["pem"],
"profile": self.cert.profile,
"revoked": False,
"serial": cert["serial"],
"subject": x509_name(cert["subject"]).rfc4514_string(),
"subject": [
{"oid": attr.oid.dotted_string, "value": attr.value} for attr in x509_name(cert["subject"])
],
"updated": self.iso_format(self.cert.updated),
}

Expand Down

0 comments on commit e58a82a

Please sign in to comment.