diff --git a/src/cryptography/x509/extensions.py b/src/cryptography/x509/extensions.py index 0136ab74c2ea..fc3e7730eca0 100644 --- a/src/cryptography/x509/extensions.py +++ b/src/cryptography/x509/extensions.py @@ -2439,6 +2439,9 @@ def __eq__(self, other: object) -> bool: def __hash__(self) -> int: return hash((self.authority, tuple(self._admissions))) + def public_bytes(self) -> bytes: + return rust_x509.encode_extension_value(self) + class UnrecognizedExtension(ExtensionType): def __init__(self, oid: ObjectIdentifier, value: bytes) -> None: diff --git a/src/rust/cryptography-x509/src/extensions.rs b/src/rust/cryptography-x509/src/extensions.rs index 5b224db50c3a..fbea5637b7f7 100644 --- a/src/rust/cryptography-x509/src/extensions.rs +++ b/src/rust/cryptography-x509/src/extensions.rs @@ -285,7 +285,7 @@ impl KeyUsage<'_> { } } -// #[derive(asn1::Asn1Read, asn1::Asn1Write)] +#[derive(asn1::Asn1Read, asn1::Asn1Write)] pub struct NamingAuthority<'a> { pub id: Option, pub url: Option>, @@ -302,9 +302,9 @@ type SequenceOfObjectIdentifiers<'a> = common::Asn1ReadableOrWritable< asn1::SequenceOfWriter<'a, asn1::ObjectIdentifier, Vec>, >; -// #[derive(asn1::Asn1Read, asn1::Asn1Write)] +#[derive(asn1::Asn1Read, asn1::Asn1Write)] pub struct ProfessionInfo<'a> { - // #[explicit(0)] + #[explicit(0)] pub naming_authority: Option>, pub profession_items: SequenceOfDisplayTexts<'a>, pub profession_oids: Option>, @@ -312,29 +312,25 @@ pub struct ProfessionInfo<'a> { pub add_profession_info: Option<&'a [u8]>, } -// #[derive(asn1::Asn1Read, asn1::Asn1Write)] +#[derive(asn1::Asn1Read, asn1::Asn1Write)] pub struct Admission<'a> { - // #[explicit(0)] + #[explicit(0)] pub admission_authority: Option>, - // #[explicit(1)] + #[explicit(1)] pub naming_authority: Option>, - /* pub profession_infos: common::Asn1ReadableOrWritable< asn1::SequenceOf<'a, ProfessionInfo<'a>>, asn1::SequenceOfWriter<'a, ProfessionInfo<'a>, Vec>>, >, - */ } -// #[derive(asn1::Asn1Read, asn1::Asn1Write)] +#[derive(asn1::Asn1Read, asn1::Asn1Write)] pub struct Admissions<'a> { pub admission_authority: Option>, - /* pub contents_of_admissions: common::Asn1ReadableOrWritable< asn1::SequenceOf<'a, Admission<'a>>, asn1::SequenceOfWriter<'a, Admission<'a>, Vec>>, >, - */ } #[cfg(test)] diff --git a/src/rust/cryptography-x509/src/oid.rs b/src/rust/cryptography-x509/src/oid.rs index fbc440eea122..ee148a7896ee 100644 --- a/src/rust/cryptography-x509/src/oid.rs +++ b/src/rust/cryptography-x509/src/oid.rs @@ -44,6 +44,7 @@ pub const FRESHEST_CRL_OID: asn1::ObjectIdentifier = asn1::oid!(2, 5, 29, 46); pub const INHIBIT_ANY_POLICY_OID: asn1::ObjectIdentifier = asn1::oid!(2, 5, 29, 54); pub const ACCEPTABLE_RESPONSES_OID: asn1::ObjectIdentifier = asn1::oid!(1, 3, 6, 1, 5, 5, 7, 48, 1, 4); +pub const ADMISSIONS_OID: asn1::ObjectIdentifier = asn1::oid!(1, 3, 36, 8, 3, 3); // Public key identifiers pub const EC_OID: asn1::ObjectIdentifier = asn1::oid!(1, 2, 840, 10045, 2, 1); diff --git a/src/rust/src/x509/extensions.rs b/src/rust/src/x509/extensions.rs index 9bd942542393..2342c40a1f03 100644 --- a/src/rust/src/x509/extensions.rs +++ b/src/rust/src/x509/extensions.rs @@ -416,6 +416,149 @@ fn encode_scts(ext: &pyo3::Bound<'_, pyo3::PyAny>) -> CryptographyResult Ok(asn1::write_single(&result.as_slice())?) } +fn encode_naming_authority<'a>( + py: pyo3::Python<'_>, + ka_str: &'a cryptography_keepalive::KeepAlive, + py_naming_authority: &pyo3::Bound<'a, pyo3::PyAny>, +) -> CryptographyResult> { + let py_oid = py_naming_authority.getattr(pyo3::intern!(py, "id"))?; + let id = if !py_oid.is_none() { + Some(py_oid_to_oid(py_oid)?) + } else { + None + }; + let py_url = py_naming_authority.getattr(pyo3::intern!(py, "url"))?; + let url = if !py_url.is_none() { + let py_url_str = ka_str.add(py_url.extract::()?); + match asn1::IA5String::new(py_url_str) { + Some(s) => Some(s), + None => { + return Err(CryptographyError::from( + pyo3::exceptions::PyValueError::new_err("url value must be a valid IA5String"), + )) + } + } + } else { + None + }; + let py_text = py_naming_authority.getattr(pyo3::intern!(py, "text"))?; + let text = if !py_text.is_none() { + let py_text_str = ka_str.add(py_text.extract::()?); + Some(extensions::DisplayText::Utf8String(asn1::Utf8String::new( + py_text_str, + ))) + } else { + None + }; + Ok(extensions::NamingAuthority { id, url, text }) +} + +fn encode_profession_info<'a>( + py: pyo3::Python<'_>, + ka_bytes: &'a cryptography_keepalive::KeepAlive, + ka_str: &'a cryptography_keepalive::KeepAlive, + py_info: &pyo3::Bound<'a, pyo3::PyAny>, +) -> CryptographyResult> { + let py_naming_authority = py_info.getattr(pyo3::intern!(py, "naming_authority"))?; + let naming_authority = if !py_naming_authority.is_none() { + Some(encode_naming_authority(py, ka_str, &py_naming_authority)?) + } else { + None + }; + let mut profession_items = vec![]; + let py_items = py_info.getattr(pyo3::intern!(py, "profession_items"))?; + for py_item in py_items.iter()? { + let py_item = py_item?; + let py_item_str = ka_str.add(py_item.extract::()?); + let item = extensions::DisplayText::Utf8String(asn1::Utf8String::new(py_item_str)); + profession_items.push(item); + } + let profession_items = + common::Asn1ReadableOrWritable::new_write(asn1::SequenceOfWriter::new(profession_items)); + let py_oids = py_info.getattr(pyo3::intern!(py, "profession_oids"))?; + let profession_oids = if !py_oids.is_none() { + let mut profession_oids = vec![]; + for py_oid in py_oids.iter()? { + let py_oid = py_oid?; + let oid = py_oid_to_oid(py_oid)?; + profession_oids.push(oid); + } + Some(common::Asn1ReadableOrWritable::new_write( + asn1::SequenceOfWriter::new(profession_oids), + )) + } else { + None + }; + let py_registration_number = py_info.getattr(pyo3::intern!(py, "registration_number"))?; + let registration_number = if !py_registration_number.is_none() { + let py_registration_number_str = + ka_str.add(py_registration_number.extract::()?); + match asn1::PrintableString::new(py_registration_number_str) { + Some(s) => Some(s), + None => { + return Err(CryptographyError::from( + pyo3::exceptions::PyValueError::new_err( + "registration_number value must be a valid PrintableString", + ), + )) + } + } + } else { + None + }; + let py_add_profession_info = py_info.getattr(pyo3::intern!(py, "add_profession_info"))?; + let add_profession_info = if !py_add_profession_info.is_none() { + Some(ka_bytes.add(py_add_profession_info.extract::()?)) + } else { + None + }; + Ok(extensions::ProfessionInfo { + naming_authority, + profession_items, + profession_oids, + registration_number, + add_profession_info, + }) +} + +fn encode_admission<'a>( + py: pyo3::Python<'_>, + ka_bytes: &'a cryptography_keepalive::KeepAlive, + ka_str: &'a cryptography_keepalive::KeepAlive, + py_admission: &pyo3::Bound<'a, pyo3::PyAny>, +) -> CryptographyResult> { + let py_admission_authority = py_admission.getattr(pyo3::intern!(py, "admission_authority"))?; + let admission_authority = if !py_admission_authority.is_none() { + Some(x509::common::encode_general_name( + py, + ka_bytes, + ka_str, + &py_admission_authority, + )?) + } else { + None + }; + let py_naming_authority = py_admission.getattr(pyo3::intern!(py, "naming_authority"))?; + let naming_authority = if !py_naming_authority.is_none() { + Some(encode_naming_authority(py, ka_str, &py_naming_authority)?) + } else { + None + }; + + let py_profession_infos = py_admission.getattr(pyo3::intern!(py, "profession_infos"))?; + let mut profession_infos = vec![]; + for py_info in py_profession_infos.iter()? { + profession_infos.push(encode_profession_info(py, ka_bytes, ka_str, &py_info?)?); + } + let profession_infos = + common::Asn1ReadableOrWritable::new_write(asn1::SequenceOfWriter::new(profession_infos)); + Ok(extensions::Admission { + admission_authority, + naming_authority, + profession_infos, + }) +} + pub(crate) fn encode_extension( py: pyo3::Python<'_>, oid: &asn1::ObjectIdentifier, @@ -563,6 +706,35 @@ pub(crate) fn encode_extension( }; Ok(Some(asn1::write_single(&mstpl)?)) } + &oid::ADMISSIONS_OID => { + let ka_bytes = cryptography_keepalive::KeepAlive::new(); + let ka_str = cryptography_keepalive::KeepAlive::new(); + let py_admission_authority = ext.getattr(pyo3::intern!(py, "authority"))?; + let admission_authority = if !py_admission_authority.is_none() { + Some(x509::common::encode_general_name( + py, + &ka_bytes, + &ka_str, + &py_admission_authority, + )?) + } else { + None + }; + let mut admissions = vec![]; + for py_admission in ext.iter()? { + let admission = encode_admission(py, &ka_bytes, &ka_str, &py_admission?)?; + admissions.push(admission); + } + + let contents_of_admissions = + common::Asn1ReadableOrWritable::new_write(asn1::SequenceOfWriter::new(admissions)); + + let admission = extensions::Admissions { + admission_authority, + contents_of_admissions, + }; + Ok(Some(asn1::write_single(&admission)?)) + } _ => Ok(None), } } diff --git a/tests/x509/test_x509_ext.py b/tests/x509/test_x509_ext.py index b29a45664484..f1a32b83c09a 100644 --- a/tests/x509/test_x509_ext.py +++ b/tests/x509/test_x509_ext.py @@ -7116,6 +7116,161 @@ def test_hash(self): assert hash(admissions1) != hash(admissions4) assert hash(admissions1) != hash(admissions5) + def test_public_bytes(self): + ext = x509.Admissions(None, []) + assert ext.public_bytes() == b"0\x020\x00" + + ext = x509.Admissions( + x509.UniformResourceIdentifier(value="https://www.example.com/"), + [], + ) + assert ( + ext.public_bytes() == b"0\x1c\x86\x18https://www.example.com/0\x00" + ) + + # test for encoding none values + ext = x509.Admissions( + None, + [ + x509.Admission( + None, + x509.NamingAuthority(None, None, None), + [x509.ProfessionInfo(None, [], [], None, None)], + ), + x509.Admission( + None, + None, + [ + x509.ProfessionInfo( + x509.NamingAuthority(None, None, None), + [], + [], + None, + None, + ) + ], + ), + ], + ) + assert ext.public_bytes() == ( + b"0\x1e0\x1c0\x0c\xa1\x020\x000\x060\x040\x000\x000\x0c0\n0\x08\xa0\x020\x000\x000\x00" + ) + + # example values taken from https://gemspec.gematik.de/downloads/gemSpec/gemSpec_OID/gemSpec_OID_V3.17.0.pdf + ext = x509.Admissions( + authority=x509.DirectoryName( + value=x509.Name( + [ + x509.NameAttribute( + x509.oid.NameOID.COUNTRY_NAME, "DE" + ), + x509.NameAttribute( + x509.NameOID.ORGANIZATIONAL_UNIT_NAME, + "Elektronisches Gesundheitsberuferegister", + ), + ] + ) + ), + admissions=[ + x509.Admission( + admission_authority=x509.DNSName("gematik.de"), + naming_authority=x509.NamingAuthority( + x509.ObjectIdentifier("1.2.276.0.76.3.1.91"), + "https://gematik.de/", + ( + "Gesellschaft für Telematikanwendungen " + "der Gesundheitskarte mbH" + ), + ), + profession_infos=[ + x509.ProfessionInfo( + naming_authority=x509.NamingAuthority( + x509.ObjectIdentifier("1.2.276.0.76.3.1.1"), + "https://www.kbv.de/", + "KBV Kassenärztliche Bundesvereinigung", + ), + registration_number="123456789", + profession_items=[ + "Ärztin/Arzt", + ( + "Orthopädieschuhmacher/-in " + "und Orthopädietechniker/-in" + ), + ], + profession_oids=[ + x509.ObjectIdentifier("1.2.276.0.76.4.30"), + x509.ObjectIdentifier("1.2.276.0.76.4.305"), + ], + # DER-encoded: + # `OtherName( + # type_id=ObjectIdentifier('1.2.276.0.76.4.60'), + # value=b'\x0c\x1dProbe-Client Broker-Betreiber' + # )` + add_profession_info=( + b"\xa0*\x06\x07*\x82\x14\x00L\x04<\xa0\x1f" + b"\x0c\x1dProbe-Client Broker-Betreiber" + ), + ) + ], + ), + ], + ) + assert ext.public_bytes() == ( + b"0\x82\x01\xa6\xa4B0@1\x0b0\t\x06\x03U\x04\x06\x13\x02DE110/\x06" + b"\x03U\x04\x0b\x0c(Elektronisches Gesundheitsberuferegister0\x82" + b"\x01^0\x82\x01Z\xa0\x0c\x82\ngematik.de\xa1b0`\x06\x08*\x82\x14" + b"\x00L\x03\x01[\x16\x13https://gematik.de/\x0c?Gesellschaft f\xc3" + b"\xbcr Telematikanwendungen der Gesundheitskarte mbH0\x81\xe50" + b"\x81\xe2\xa0I0G\x06\x08*\x82\x14\x00L\x03\x01\x01\x16\x13https://www." + b"kbv.de/\x0c&KBV Kassen\xc3\xa4rztliche Bundesvereinigung0G\x0c" + b"\x0c\xc3\x84rztin/Arzt\x0c7Orthop\xc3\xa4dieschuhmacher/-in und " + b"Orthop\xc3\xa4dietechniker/-in0\x13\x06\x07*\x82\x14\x00L\x04\x1e" + b"\x06\x08*\x82\x14\x00L\x04\x821\x13\t123456789\x04,\xa0*\x06" + b"\x07*\x82\x14\x00L\x04<\xa0\x1f\x0c\x1dProbe-Client Broker-" + b"Betreiber" + ) + + # test for non-ascii url value in naming authority + ext = x509.Admissions( + None, + [ + x509.Admission( + None, + x509.NamingAuthority(None, "😄", None), + [], + ), + ], + ) + with pytest.raises(ValueError): + ext.public_bytes() + + # test for non-ascii registration number value in profession info + ext = x509.Admissions( + None, + [ + x509.Admission( + None, + None, + [x509.ProfessionInfo(None, [], [], "\x00", None)], + ), + ], + ) + with pytest.raises(ValueError): + ext.public_bytes() + + # test that none passed for `profession_oids` is encoded as none + ext = x509.Admissions( + None, + [ + x509.Admission( + None, + None, + [x509.ProfessionInfo(None, [], None, None, None)], + ), + ], + ) + assert ext.public_bytes() == b"0\n0\x080\x060\x040\x020\x00" + def test_all_extension_oid_members_have_names_defined(): for oid in dir(ExtensionOID):