diff --git a/asyncssh/auth.py b/asyncssh/auth.py index 2457689..84c4a1f 100644 --- a/asyncssh/auth.py +++ b/asyncssh/auth.py @@ -159,7 +159,7 @@ async def _start(self) -> None: await self.send_request(key=self._conn.get_gss_context(), trivial=False) else: - self._conn.try_next_auth() + self._conn.try_next_auth(next_method=True) class _ClientGSSMICAuth(ClientAuth): @@ -184,7 +184,7 @@ async def _start(self) -> None: mechs = b''.join(String(mech) for mech in self._gss.mechs) await self.send_request(UInt32(len(self._gss.mechs)), mechs) else: - self._conn.try_next_auth() + self._conn.try_next_auth(next_method=True) def _finish(self) -> None: """Finish client GSS MIC authentication""" @@ -224,7 +224,7 @@ async def _process_response(self, _pkttype: int, _pktid: int, if exc.token: self.send_packet(MSG_USERAUTH_GSSAPI_ERRTOK, String(exc.token)) - self._conn.try_next_auth() + self._conn.try_next_auth(next_method=True) async def _process_token(self, _pkttype: int, _pktid: int, packet: SSHPacket) -> None: @@ -247,7 +247,7 @@ async def _process_token(self, _pkttype: int, _pktid: int, if exc.token: self.send_packet(MSG_USERAUTH_GSSAPI_ERRTOK, String(exc.token)) - self._conn.try_next_auth() + self._conn.try_next_auth(next_method=True) def _process_error(self, _pkttype: int, _pktid: int, packet: SSHPacket) -> None: @@ -295,7 +295,7 @@ async def _start(self) -> None: await self._conn.host_based_auth_requested() if keypair is None: - self._conn.try_next_auth() + self._conn.try_next_auth(next_method=True) return self.logger.debug1('Trying host based auth of user %s on host %s ' @@ -323,7 +323,7 @@ async def _start(self) -> None: self._keypair = await self._conn.public_key_auth_requested() if self._keypair is None: - self._conn.try_next_auth() + self._conn.try_next_auth(next_method=True) return self.logger.debug1('Trying public key auth with %s key', @@ -341,10 +341,14 @@ async def _send_signed_request(self) -> None: self.logger.debug1('Signing request with %s key', self._keypair.algorithm) - await self.send_request(Boolean(True), - String(self._keypair.algorithm), - String(self._keypair.public_data), - key=self._keypair, trivial=False) + try: + await self.send_request(Boolean(True), + String(self._keypair.algorithm), + String(self._keypair.public_data), + key=self._keypair, trivial=False) + except ValueError as exc: + self.logger.debug1('Public key auth failed: %s', str(exc)) + self._conn.try_next_auth() def _process_public_key_ok(self, _pkttype: int, _pktid: int, packet: SSHPacket) -> None: @@ -378,7 +382,7 @@ async def _start(self) -> None: submethods = await self._conn.kbdint_auth_requested() if submethods is None: - self._conn.try_next_auth() + self._conn.try_next_auth(next_method=True) return self.logger.debug1('Trying keyboard-interactive auth') @@ -394,7 +398,7 @@ async def _receive_challenge(self, name: str, instruction: str, lang: str, lang, prompts) if responses is None: - self._conn.try_next_auth() + self._conn.try_next_auth(next_method=True) return self.send_packet(MSG_USERAUTH_INFO_RESPONSE, UInt32(len(responses)), @@ -455,7 +459,7 @@ async def _start(self) -> None: password = await self._conn.password_auth_requested() if password is None: - self._conn.try_next_auth() + self._conn.try_next_auth(next_method=True) return self.logger.debug1('Trying password auth') @@ -470,7 +474,7 @@ async def _change_password(self, prompt: str, lang: str) -> None: if result == NotImplemented: # Password change not supported - move on to the next auth method - self._conn.try_next_auth() + self._conn.try_next_auth(next_method=True) return self.logger.debug1('Trying to chsnge password') diff --git a/asyncssh/connection.py b/asyncssh/connection.py index b920c28..8c5b7a3 100644 --- a/asyncssh/connection.py +++ b/asyncssh/connection.py @@ -3513,21 +3513,24 @@ def get_server_auth_methods(self) -> Sequence[str]: return [method.decode('ascii') for method in self._auth_methods] - def try_next_auth(self) -> None: + def try_next_auth(self, *, next_method: bool = False) -> None: """Attempt client authentication using the next compatible method""" if self._auth: self._auth.cancel() self._auth = None - while self._auth_methods: - method = self._auth_methods.pop(0) + if next_method: + self._auth_methods.pop(0) - self._auth = lookup_client_auth(self, method) + while self._auth_methods: + self._auth = lookup_client_auth(self, self._auth_methods[0]) if self._auth: return + self._auth_methods.pop(0) + self.logger.info('Auth failed for user %s', self._username) self._force_close(PermissionDenied('Permission denied for user ' diff --git a/asyncssh/sk.py b/asyncssh/sk.py index 2c008c6..cc807bc 100644 --- a/asyncssh/sk.py +++ b/asyncssh/sk.py @@ -147,6 +147,11 @@ def _ctap2_sign(dev: 'CtapHidDevice', message_hash: bytes, allow_creds = [{'type': 'public-key', 'id': key_handle}] options = {'up': touch_required} + # See if key handle exists before requiring touch + if touch_required: + ctap2.get_assertions(application, message_hash, allow_creds, + options={'up': False}) + assertion = ctap2.get_assertions(application, message_hash, allow_creds, options=options)[0] diff --git a/tests/test_auth.py b/tests/test_auth.py index 8612581..66bb89e 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -165,9 +165,11 @@ async def get_auth_result(self): return await self._auth_waiter - def try_next_auth(self): + def try_next_auth(self, *, next_method=False): """Handle a request to move to another form of auth""" + # pylint: disable=unused-argument + # Report that the current auth attempt failed self._auth_waiter.set_result((False, self._password_changed)) self._auth = None diff --git a/tests/test_connection_auth.py b/tests/test_connection_auth.py index a93a21b..a5b43e6 100644 --- a/tests/test_connection_auth.py +++ b/tests/test_connection_auth.py @@ -414,11 +414,11 @@ async def validate_kbdint_response(self, username, responses): class _UnknownAuthClientConnection(asyncssh.connection.SSHClientConnection): """Test getting back an unknown auth method from the SSH server""" - def try_next_auth(self): + def try_next_auth(self, *, next_method=False): """Attempt client authentication using an unknown method""" self._auth_methods = [b'unknown'] + self._auth_methods - super().try_next_auth() + super().try_next_auth(next_method=next_method) class _TestNullAuth(ServerTestCase): diff --git a/tests/test_sk.py b/tests/test_sk.py index 7c2c05d..06b492c 100644 --- a/tests/test_sk.py +++ b/tests/test_sk.py @@ -123,8 +123,9 @@ async def test_auth_ctap1_error(self): """Test security key returning a CTAP 1 error""" with sk_error('err'): - with self.assertRaises(ValueError): - await self.connect(username='ckey', client_keys=[self._privkey]) + with self.assertRaises(asyncssh.PermissionDenied): + await self.connect(username='ckey', + client_keys=[self._privkey]) @unittest.skipUnless(sk_available, 'security key support not available') @@ -169,8 +170,9 @@ async def test_auth_ctap2_error(self): """Test security key returning a CTAP 2 error""" with sk_error('err'): - with self.assertRaises(ValueError): - await self.connect(username='ckey', client_keys=[self._privkey]) + with self.assertRaises(asyncssh.PermissionDenied): + await self.connect(username='ckey', + client_keys=[self._privkey]) @asynctest async def test_enroll_pin_invalid(self): @@ -201,8 +203,9 @@ async def test_auth_cred_not_found(self): """Test authenticating with security credential not found""" with sk_error('nocred'): - with self.assertRaises(ValueError): - await self.connect(username='ckey', client_keys=[self._privkey]) + with self.assertRaises(asyncssh.PermissionDenied): + await self.connect(username='ckey', + client_keys=[self._privkey]) @unittest.skipUnless(sk_available, 'security key support not available') @@ -255,7 +258,7 @@ async def test_load_resident_ctap2_error(self): """Test getting resident keys returning a CTAP 2 error""" with sk_error('err'): - with self.assertRaises(ValueError): + with self.assertRaises(asyncssh.KeyImportError): asyncssh.load_resident_keys(b'123456') @asynctest