Skip to content

Commit

Permalink
Fix issues related to supporting multiple trusted security keys
Browse files Browse the repository at this point in the history
This commit better handles a use case where a client might be capable
of authenticating with one of  multiple security keys, all of which
are accepted by the server but only some of which are plugged into the
client system.

With this change, keys in client_keys which correspond to security keys
that aren't plugged into the system are ignored. Only keys which are
connected, present in client_keys, and trusted by the server will
actually be used for signing, possibly triggering a touch requirement.

Thanks go to GitHub user zanda8893 for reporting the issue and helping
to work out the details of the problem!
  • Loading branch information
ronf committed Oct 20, 2024
1 parent bfa04aa commit 8593f28
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 28 deletions.
32 changes: 18 additions & 14 deletions asyncssh/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async def _start(self) -> None:
await self.send_request(key=self._conn.get_gss_context(),
trivial=False)
else:
self._conn.try_next_auth()
self._conn.try_next_auth(next_method=True)


class _ClientGSSMICAuth(ClientAuth):
Expand All @@ -184,7 +184,7 @@ async def _start(self) -> None:
mechs = b''.join(String(mech) for mech in self._gss.mechs)
await self.send_request(UInt32(len(self._gss.mechs)), mechs)
else:
self._conn.try_next_auth()
self._conn.try_next_auth(next_method=True)

def _finish(self) -> None:
"""Finish client GSS MIC authentication"""
Expand Down Expand Up @@ -224,7 +224,7 @@ async def _process_response(self, _pkttype: int, _pktid: int,
if exc.token:
self.send_packet(MSG_USERAUTH_GSSAPI_ERRTOK, String(exc.token))

self._conn.try_next_auth()
self._conn.try_next_auth(next_method=True)

async def _process_token(self, _pkttype: int, _pktid: int,
packet: SSHPacket) -> None:
Expand All @@ -247,7 +247,7 @@ async def _process_token(self, _pkttype: int, _pktid: int,
if exc.token:
self.send_packet(MSG_USERAUTH_GSSAPI_ERRTOK, String(exc.token))

self._conn.try_next_auth()
self._conn.try_next_auth(next_method=True)

def _process_error(self, _pkttype: int, _pktid: int,
packet: SSHPacket) -> None:
Expand Down Expand Up @@ -295,7 +295,7 @@ async def _start(self) -> None:
await self._conn.host_based_auth_requested()

if keypair is None:
self._conn.try_next_auth()
self._conn.try_next_auth(next_method=True)
return

self.logger.debug1('Trying host based auth of user %s on host %s '
Expand Down Expand Up @@ -323,7 +323,7 @@ async def _start(self) -> None:
self._keypair = await self._conn.public_key_auth_requested()

if self._keypair is None:
self._conn.try_next_auth()
self._conn.try_next_auth(next_method=True)
return

self.logger.debug1('Trying public key auth with %s key',
Expand All @@ -341,10 +341,14 @@ async def _send_signed_request(self) -> None:
self.logger.debug1('Signing request with %s key',
self._keypair.algorithm)

await self.send_request(Boolean(True),
String(self._keypair.algorithm),
String(self._keypair.public_data),
key=self._keypair, trivial=False)
try:
await self.send_request(Boolean(True),
String(self._keypair.algorithm),
String(self._keypair.public_data),
key=self._keypair, trivial=False)
except ValueError as exc:
self.logger.debug1('Public key auth failed: %s', str(exc))
self._conn.try_next_auth()

def _process_public_key_ok(self, _pkttype: int, _pktid: int,
packet: SSHPacket) -> None:
Expand Down Expand Up @@ -378,7 +382,7 @@ async def _start(self) -> None:
submethods = await self._conn.kbdint_auth_requested()

if submethods is None:
self._conn.try_next_auth()
self._conn.try_next_auth(next_method=True)
return

self.logger.debug1('Trying keyboard-interactive auth')
Expand All @@ -394,7 +398,7 @@ async def _receive_challenge(self, name: str, instruction: str, lang: str,
lang, prompts)

if responses is None:
self._conn.try_next_auth()
self._conn.try_next_auth(next_method=True)
return

self.send_packet(MSG_USERAUTH_INFO_RESPONSE, UInt32(len(responses)),
Expand Down Expand Up @@ -455,7 +459,7 @@ async def _start(self) -> None:
password = await self._conn.password_auth_requested()

if password is None:
self._conn.try_next_auth()
self._conn.try_next_auth(next_method=True)
return

self.logger.debug1('Trying password auth')
Expand All @@ -470,7 +474,7 @@ async def _change_password(self, prompt: str, lang: str) -> None:

if result == NotImplemented:
# Password change not supported - move on to the next auth method
self._conn.try_next_auth()
self._conn.try_next_auth(next_method=True)
return

self.logger.debug1('Trying to chsnge password')
Expand Down
11 changes: 7 additions & 4 deletions asyncssh/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3513,21 +3513,24 @@ def get_server_auth_methods(self) -> Sequence[str]:

return [method.decode('ascii') for method in self._auth_methods]

def try_next_auth(self) -> None:
def try_next_auth(self, *, next_method: bool = False) -> None:
"""Attempt client authentication using the next compatible method"""

if self._auth:
self._auth.cancel()
self._auth = None

while self._auth_methods:
method = self._auth_methods.pop(0)
if next_method:
self._auth_methods.pop(0)

self._auth = lookup_client_auth(self, method)
while self._auth_methods:
self._auth = lookup_client_auth(self, self._auth_methods[0])

if self._auth:
return

self._auth_methods.pop(0)

self.logger.info('Auth failed for user %s', self._username)

self._force_close(PermissionDenied('Permission denied for user '
Expand Down
5 changes: 5 additions & 0 deletions asyncssh/sk.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ def _ctap2_sign(dev: 'CtapHidDevice', message_hash: bytes,
allow_creds = [{'type': 'public-key', 'id': key_handle}]
options = {'up': touch_required}

# See if key handle exists before requiring touch
if touch_required:
ctap2.get_assertions(application, message_hash, allow_creds,
options={'up': False})

assertion = ctap2.get_assertions(application, message_hash, allow_creds,
options=options)[0]

Expand Down
4 changes: 3 additions & 1 deletion tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,11 @@ async def get_auth_result(self):

return await self._auth_waiter

def try_next_auth(self):
def try_next_auth(self, *, next_method=False):
"""Handle a request to move to another form of auth"""

# pylint: disable=unused-argument

# Report that the current auth attempt failed
self._auth_waiter.set_result((False, self._password_changed))
self._auth = None
Expand Down
4 changes: 2 additions & 2 deletions tests/test_connection_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,11 @@ async def validate_kbdint_response(self, username, responses):
class _UnknownAuthClientConnection(asyncssh.connection.SSHClientConnection):
"""Test getting back an unknown auth method from the SSH server"""

def try_next_auth(self):
def try_next_auth(self, *, next_method=False):
"""Attempt client authentication using an unknown method"""

self._auth_methods = [b'unknown'] + self._auth_methods
super().try_next_auth()
super().try_next_auth(next_method=next_method)


class _TestNullAuth(ServerTestCase):
Expand Down
17 changes: 10 additions & 7 deletions tests/test_sk.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ async def test_auth_ctap1_error(self):
"""Test security key returning a CTAP 1 error"""

with sk_error('err'):
with self.assertRaises(ValueError):
await self.connect(username='ckey', client_keys=[self._privkey])
with self.assertRaises(asyncssh.PermissionDenied):
await self.connect(username='ckey',
client_keys=[self._privkey])


@unittest.skipUnless(sk_available, 'security key support not available')
Expand Down Expand Up @@ -169,8 +170,9 @@ async def test_auth_ctap2_error(self):
"""Test security key returning a CTAP 2 error"""

with sk_error('err'):
with self.assertRaises(ValueError):
await self.connect(username='ckey', client_keys=[self._privkey])
with self.assertRaises(asyncssh.PermissionDenied):
await self.connect(username='ckey',
client_keys=[self._privkey])

@asynctest
async def test_enroll_pin_invalid(self):
Expand Down Expand Up @@ -201,8 +203,9 @@ async def test_auth_cred_not_found(self):
"""Test authenticating with security credential not found"""

with sk_error('nocred'):
with self.assertRaises(ValueError):
await self.connect(username='ckey', client_keys=[self._privkey])
with self.assertRaises(asyncssh.PermissionDenied):
await self.connect(username='ckey',
client_keys=[self._privkey])


@unittest.skipUnless(sk_available, 'security key support not available')
Expand Down Expand Up @@ -255,7 +258,7 @@ async def test_load_resident_ctap2_error(self):
"""Test getting resident keys returning a CTAP 2 error"""

with sk_error('err'):
with self.assertRaises(ValueError):
with self.assertRaises(asyncssh.KeyImportError):
asyncssh.load_resident_keys(b'123456')

@asynctest
Expand Down

0 comments on commit 8593f28

Please sign in to comment.