Skip to content

Commit

Permalink
Add support for WebAuthN authentication with U2F security keys
Browse files Browse the repository at this point in the history
This commit adds support for a new WebAuthN signature algorithm for
U2F security keys, allowing non-admin Windows users to use these
keys for authentication. Previously, U2F security keys worked on
Windows, but only when the user was an Administrator.

This new WebAuthN signature algorithm will be selected automatically
when running as a non-admin on Windows. It can also be requested
explicitly. For it to work, though, the server needs to be configured
to accept this signature algorithm. At the moment, it is disabled by
default on OpenSSH. Here's the necessary config to enable it:

PubkeyAcceptedAlgorithms=+webauthn-sk-ecdsa-sha2-nistp256@openssh.com
  • Loading branch information
ronf committed Nov 3, 2024
1 parent 51b7846 commit e83874a
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 77 deletions.
3 changes: 3 additions & 0 deletions asyncssh/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3463,6 +3463,9 @@ def _choose_signature_alg(self, keypair: _ClientHostKey) -> bool:

if self._server_sig_algs:
for alg in keypair.sig_algorithms:
if keypair.use_webauthn and not alg.startswith(b'webauthn-'):
continue

if alg in self._sig_algs and alg in self._server_sig_algs:
keypair.set_sig_algorithm(alg)
return True
Expand Down
11 changes: 7 additions & 4 deletions asyncssh/public_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ class SSHKey:
pem_name: bytes = b''
pkcs8_oid: Optional[ObjectIdentifier] = None
use_executor: bool = False
use_webauthn: bool = False

def __init__(self, key: Optional[CryptoKey] = None):
self._key = key
Expand Down Expand Up @@ -2058,7 +2059,8 @@ def __init__(self, algorithm: bytes, sig_algorithm: bytes,
public_data: bytes, comment: _Comment,
cert: Optional[SSHCertificate] = None,
filename: Optional[bytes] = None,
use_executor: bool = False):
use_executor: bool = False,
use_webauthn: bool = False):
self.key_algorithm = algorithm
self.key_public_data = public_data

Expand All @@ -2067,6 +2069,7 @@ def __init__(self, algorithm: bytes, sig_algorithm: bytes,
self._filename = filename

self.use_executor = use_executor
self.use_webauthn = use_webauthn

if cert:
if cert.key.public_data != self.key_public_data:
Expand Down Expand Up @@ -2250,8 +2253,9 @@ def __init__(self, key: SSHKey, pubkey: Optional[SSHKey] = None,
comment = None

super().__init__(key.algorithm, key.algorithm, key.sig_algorithms,
key.sig_algorithms, key.public_data, comment, cert,
key.get_filename(), key.use_executor)
key.sig_algorithms, key.public_data, comment,
cert, key.get_filename(), key.use_executor,
key.use_webauthn)

self._key = key

Expand Down Expand Up @@ -3856,7 +3860,6 @@ def load_resident_keys(pin: str, *, application: str = 'ssh:',
"""

application = application.encode('utf-8')
flags = SSH_SK_USER_PRESENCE_REQD if touch_required else 0
reserved = b''

Expand Down
108 changes: 89 additions & 19 deletions asyncssh/sk.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2022 by Ron Frederick <[email protected]> and others.
# Copyright (c) 2019-2024 by Ron Frederick <[email protected]> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
Expand All @@ -20,6 +20,8 @@

"""U2F security key handler"""

from base64 import urlsafe_b64encode
import ctypes
from hashlib import sha256
import hmac
import time
Expand Down Expand Up @@ -54,6 +56,12 @@ def _decode_public_key(alg: int, public_key: Mapping[int, object]) -> bytes:
return b'\x04' + result + cast(bytes, public_key[-3])


def _verify_rp_id(_rp_id: str, _origin: str):
"""Allow any relying party name -- SSH encodes the application here"""

return True


def _ctap1_poll(poll_interval: float, func: Callable[..., _PollResult],
*args: object) -> _PollResult:
"""Poll until a CTAP1 response is received"""
Expand All @@ -69,29 +77,28 @@ def _ctap1_poll(poll_interval: float, func: Callable[..., _PollResult],


def _ctap1_enroll(dev: 'CtapHidDevice', alg: int,
application: bytes) -> Tuple[bytes, bytes]:
application: str) -> Tuple[bytes, bytes]:
"""Enroll a new security key using CTAP version 1"""

ctap1 = Ctap1(dev)

if alg != SSH_SK_ECDSA:
raise ValueError('Unsupported algorithm')

app_hash = sha256(application).digest()
app_hash = sha256(application.encode('utf-8')).digest()
registration = _ctap1_poll(_CTAP1_POLL_INTERVAL, ctap1.register,
_dummy_hash, app_hash)

return registration.public_key, registration.key_handle


def _ctap2_enroll(dev: 'CtapHidDevice', alg: int, application: bytes,
def _ctap2_enroll(dev: 'CtapHidDevice', alg: int, application: str,
user: str, pin: Optional[str],
resident: bool) -> Tuple[bytes, bytes]:
"""Enroll a new security key using CTAP version 2"""

ctap2 = Ctap2(dev)

application = application.decode('utf-8')
rp = {'id': application, 'name': application}
user_cred = {'id': user.encode('utf-8'), 'name': user}
key_params = [{'type': 'public-key', 'alg': alg}]
Expand All @@ -118,13 +125,31 @@ def _ctap2_enroll(dev: 'CtapHidDevice', alg: int, application: bytes,
return _decode_public_key(alg, cdata.public_key), cdata.credential_id


def _ctap1_sign(dev: 'CtapHidDevice', message_hash: bytes, application: bytes,
def _win_enroll(alg: int, application: str, user: str) -> Tuple[bytes, bytes]:
"""Enroll a new security key using Windows WebAuthn API"""

client = WindowsClient(application, verify=_verify_rp_id)

rp = {'id': application, 'name': application}
user_cred = {'id': user.encode('utf-8'), 'name': user}
key_params = [{'type': 'public-key', 'alg': alg}]
options = {'rp': rp, 'user': user_cred, 'challenge': b'',
'pubKeyCredParams': key_params}

result = client.make_credential(options)
cdata = result.attestation_object.auth_data.credential_data

# pylint: disable=no-member
return _decode_public_key(alg, cdata.public_key), cdata.credential_id


def _ctap1_sign(dev: 'CtapHidDevice', message_hash: bytes, application: str,
key_handle: bytes) -> Tuple[int, int, bytes]:
"""Sign a message with a security key using CTAP version 1"""

ctap1 = Ctap1(dev)

app_hash = sha256(application).digest()
app_hash = sha256(application.encode('utf-8')).digest()

auth_response = _ctap1_poll(_CTAP1_POLL_INTERVAL, ctap1.authenticate,
message_hash, app_hash, key_handle)
Expand All @@ -137,13 +162,12 @@ def _ctap1_sign(dev: 'CtapHidDevice', message_hash: bytes, application: bytes,


def _ctap2_sign(dev: 'CtapHidDevice', message_hash: bytes,
application: bytes, key_handle: bytes,
application: str, key_handle: bytes,
touch_required: bool) -> Tuple[int, int, bytes]:
"""Sign a message with a security key using CTAP version 2"""

ctap2 = Ctap2(dev)

application = application.decode('utf-8')
allow_creds = [{'type': 'public-key', 'id': key_handle}]
options = {'up': touch_required}

Expand All @@ -160,10 +184,38 @@ def _ctap2_sign(dev: 'CtapHidDevice', message_hash: bytes,
return auth_data.flags, auth_data.counter, assertion.signature


def sk_enroll(alg: int, application: bytes, user: str,
pin: Optional[str], resident: bool) -> Tuple[bytes, bytes]:
def _win_sign(data: bytes, application: str,
key_handle: bytes) -> Tuple[int, int, bytes, bytes]:
"""Sign a message with a security key using Windows WebAuthn API"""

client = WindowsClient(application, verify=_verify_rp_id)

creds = [{'type': 'public-key', 'id': key_handle}]
options = {'challenge': data, 'rpId': application,
'allowCredentials': creds}

result = client.get_assertion(options).get_response(0)
auth_data = result.authenticator_data

return auth_data.flags, auth_data.counter, \
result.signature, bytes(result.client_data)


def sk_webauthn_prefix(data: bytes, application: str) -> bytes:
"""Calculate a WebAuthn request prefix"""

return b'{"type":"webauthn.get","challenge":"' + \
urlsafe_b64encode(data).rstrip(b'=') + b'","origin":"' + \
application.encode('utf-8') + b'"'


def sk_enroll(alg: int, application: str, user: str, pin: Optional[str],
resident: bool) -> Tuple[bytes, bytes]:
"""Enroll a new security key"""

if sk_use_webauthn:
return _win_enroll(alg, application, user)

try:
dev = next(CtapHidDevice.list_devices())
except StopIteration:
Expand All @@ -187,22 +239,35 @@ def sk_enroll(alg: int, application: bytes, user: str,
dev.close()


def sk_sign(message_hash: bytes, application: bytes, key_handle: bytes,
flags: int) -> Tuple[int, int, bytes]:
def sk_sign(data: bytes, application: str, key_handle: bytes, flags: int,
is_webauthn: bool = False) -> Tuple[int, int, bytes, bytes]:
"""Sign a message with a security key"""

touch_required = bool(flags & SSH_SK_USER_PRESENCE_REQD)

if is_webauthn and sk_use_webauthn:
return _win_sign(data, application, key_handle)

if is_webauthn:
data = sk_webauthn_prefix(data, application) + b'}'

message_hash = sha256(data).digest()

for dev in CtapHidDevice.list_devices():
try:
return _ctap2_sign(dev, message_hash, application,
key_handle, touch_required)
flags, counter, sig = _ctap2_sign(dev, message_hash, application,
key_handle, touch_required)

return flags, counter, sig, data
except CtapError as exc:
if exc.code != CtapError.ERR.NO_CREDENTIALS:
raise ValueError(str(exc)) from None
except ValueError:
try:
return _ctap1_sign(dev, message_hash, application, key_handle)
flags, counter, sig = _ctap1_sign(dev, message_hash,
application, key_handle)

return flags, counter, sig, data
except ApduError as exc:
if exc.code != APDU.WRONG_DATA:
raise ValueError(str(exc)) from None
Expand All @@ -212,11 +277,11 @@ def sk_sign(message_hash: bytes, application: bytes, key_handle: bytes,
raise ValueError('Security key credential not found')


def sk_get_resident(application: bytes, user: Optional[str],
def sk_get_resident(application: str, user: Optional[str],
pin: str) -> Sequence[_SKResidentKey]:
"""Get keys resident on a security key"""

app_hash = sha256(application).digest()
app_hash = sha256(application.encode('utf-8')).digest()
result: List[_SKResidentKey] = []

for dev in CtapHidDevice.list_devices():
Expand Down Expand Up @@ -262,13 +327,18 @@ def sk_get_resident(application: bytes, user: Optional[str],


try:
from fido2.hid import CtapHidDevice
from fido2.client import WindowsClient
from fido2.ctap import CtapError
from fido2.ctap1 import Ctap1, APDU, ApduError
from fido2.ctap2 import Ctap2, ClientPin, PinProtocolV1
from fido2.ctap2 import CredentialManagement
from fido2.hid import CtapHidDevice

sk_available = True

sk_use_webauthn = WindowsClient.is_available() and \
hasattr(ctypes, 'windll') and \
not ctypes.windll.shell32.IsUserAnAdmin()
except (ImportError, OSError, AttributeError): # pragma: no cover
sk_available = False

Expand Down
Loading

0 comments on commit e83874a

Please sign in to comment.