From bb7e331a064755afbe24e7da02a319bdfb0798be Mon Sep 17 00:00:00 2001 From: Andrew Walker Date: Thu, 9 Jan 2025 08:41:36 -0600 Subject: [PATCH] Add ability for administrator to create onetime passwords This commit adds an API endpoint for administrator with ACCOUNT_WRITE privileges to create single-use passwords for user accounts that allow the user to authenticate to the truenas server. This is needed for STIG environments to allow administrators to help with creating new user accounts so that the user can authenticate to the TrueNAS server in order to configure two-factor authentication. The single-use password may be used with the auth.login_ex mechanism PASSWORD_PLAIN. --- .../middlewared/api/v25_04_0/auth.py | 11 +- src/middlewared/middlewared/auth.py | 11 ++ src/middlewared/middlewared/plugins/auth.py | 64 +++++++++-- .../middlewared/plugins/auth_/authenticate.py | 57 ++++++++- src/middlewared/middlewared/utils/auth.py | 85 +++++++++++++- tests/api2/test_030_activedirectory.py | 108 +++++++++++------- tests/api2/test_auth_onetime.py | 63 ++++++++++ .../test_authenticator_assurance_level.py | 30 +++++ tests/unit/test_otpw_manager.py | 51 +++++++++ 9 files changed, 421 insertions(+), 59 deletions(-) create mode 100644 tests/api2/test_auth_onetime.py create mode 100644 tests/unit/test_otpw_manager.py diff --git a/src/middlewared/middlewared/api/v25_04_0/auth.py b/src/middlewared/middlewared/api/v25_04_0/auth.py index cd4943fa7df14..1b5c259fcd7d4 100644 --- a/src/middlewared/middlewared/api/v25_04_0/auth.py +++ b/src/middlewared/middlewared/api/v25_04_0/auth.py @@ -1,4 +1,4 @@ -from middlewared.api.base import BaseModel, single_argument_result +from middlewared.api.base import BaseModel, single_argument_args, single_argument_result from middlewared.utils.auth import AuthMech, AuthResp from datetime import datetime from pydantic import Field, Secret @@ -211,3 +211,12 @@ class AuthSetAttributeArgs(BaseModel): class AuthSetAttributeResult(BaseModel): result: Literal[None] + + +@single_argument_args('generate_single_use_password') +class AuthGenerateOnetimePasswordArgs(BaseModel): + username: str + + +class AuthGenerateOnetimePasswordResult(BaseModel): + result: str diff --git a/src/middlewared/middlewared/auth.py b/src/middlewared/middlewared/auth.py index 6a03fa6e5c3e5..b224618b3030b 100644 --- a/src/middlewared/middlewared/auth.py +++ b/src/middlewared/middlewared/auth.py @@ -11,6 +11,7 @@ class SessionManagerCredentials: is_user_session = False + may_create_auth_token = True allowlist = None @classmethod @@ -71,6 +72,7 @@ def __init__(self, user: dict, assurance: AuthenticatorAssuranceLevel | None): self.last_used_at = now if assurance: + self.may_create_auth_token = AuthMech.TOKEN_PLAIN in assurance.mechanisms self.expiry = now + self.assurance.max_session_age self.inactivity_timeout = self.assurance.max_inactivity @@ -148,6 +150,15 @@ class LoginTwofactorSessionManagerCredentials(LoginPasswordSessionManagerCredent pass +class LoginOnetimePasswordSessionManagerCredentials(UserSessionManagerCredentials): + """ Credentials for a specific user account on TrueNAS + Authenticated by username + onetime password ccombination + """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.may_create_auth_token = False + + class TokenSessionManagerCredentials(SessionManagerCredentials): def __init__(self, token_manager, token): self.root_credentials = token.root_credentials() diff --git a/src/middlewared/middlewared/plugins/auth.py b/src/middlewared/middlewared/plugins/auth.py index 7d15e5c393a63..6932aae34970c 100644 --- a/src/middlewared/middlewared/plugins/auth.py +++ b/src/middlewared/middlewared/plugins/auth.py @@ -19,22 +19,23 @@ AuthSetAttributeArgs, AuthSetAttributeResult, AuthTerminateSessionArgs, AuthTerminateSessionResult, AuthTerminateOtherSessionsArgs, AuthTerminateOtherSessionsResult, + AuthGenerateOnetimePasswordArgs, AuthGenerateOnetimePasswordResult, ) from middlewared.auth import (UserSessionManagerCredentials, UnixSocketSessionManagerCredentials, ApiKeySessionManagerCredentials, LoginPasswordSessionManagerCredentials, LoginTwofactorSessionManagerCredentials, AuthenticationContext, TruenasNodeSessionManagerCredentials, TokenSessionManagerCredentials, - dump_credentials) + LoginOnetimePasswordSessionManagerCredentials, dump_credentials) from middlewared.plugins.account_.constants import MIDDLEWARE_PAM_SERVICE, MIDDLEWARE_PAM_API_KEY_SERVICE from middlewared.service import ( Service, filterable_api_method, filter_list, pass_app, private, cli_private, CallError, ) -from middlewared.service_exception import MatchNotFound +from middlewared.service_exception import MatchNotFound, ValidationError import middlewared.sqlalchemy as sa from middlewared.utils.auth import ( aal_auth_mechanism_check, AuthMech, AuthResp, AuthenticatorAssuranceLevel, AA_LEVEL1, - AA_LEVEL2, AA_LEVEL3, CURRENT_AAL, MAX_OTP_ATTEMPTS, + AA_LEVEL2, AA_LEVEL3, CURRENT_AAL, MAX_OTP_ATTEMPTS, OTPW_MANAGER, ) from middlewared.utils.crypto import generate_token from middlewared.utils.time_utils import utc_now @@ -302,6 +303,25 @@ async def terminate_other_sessions(self, app): return True + @api_method( + AuthGenerateOnetimePasswordArgs, AuthGenerateOnetimePasswordResult, + roles=['ACCOUNT_WRITE'], + audit='Generate onetime password for user' + ) + def generate_onetime_password(self, data): + """ + Generate a password for the specified username that may be used only a single time to authenticate + to TrueNAS. This may be used by server administrators to allow users authenticate and then set + a proper password and two-factor authentication token. + """ + username = data['username'] + user_data = self.middleware.call_sync('user.query', [['username', '=', username]]) + if not user_data: + raise ValidationError('auth.generate_onetime_password.username', f'{username}: user does not exist.') + + passwd = OTPW_MANAGER.generate_for_uid(user_data[0]['uid']) + return passwd + @api_method(AuthGenerateTokenArgs, AuthGenerateTokenResult, authorization_required=False) @pass_app(rest=True) def generate_token(self, app, ttl, attrs, match_origin): @@ -324,6 +344,13 @@ def generate_token(self, app, ttl, attrs, match_origin): errno.EOPNOTSUPP ) + if app and not app.authenticated_credentials.may_create_auth_token: + raise CallError( + f'{app.authenticated_credentials.class_name()}: the current session type does ' + 'not support creation of authentication tokens.', + errno.EOPNOTSUPP + ) + if ttl is None: ttl = 600 @@ -638,12 +665,14 @@ async def login_ex(self, app, data): case AuthMech.PASSWORD_PLAIN: # Both of these mechanisms are de-factor username + password # combinations and pass through libpam. + cred_type = 'LOGIN_PASSWORD' resp = await self.get_login_user( app, data['username'], data['password'], mechanism ) + if resp['otp_required']: # A one-time password is required for this user account and so # we should request it from API client. @@ -653,6 +682,8 @@ async def login_ex(self, app, data): 'response_type': AuthResp.OTP_REQUIRED, 'username': resp['user_data']['username'] } + elif resp['otpw_used']: + cred_type = 'ONETIME_PASSWORD' elif CURRENT_AAL.level.otp_mandatory: if resp['pam_response'] == 'SUCCESS': # Insert a failure delay so that we don't leak information about @@ -665,12 +696,16 @@ async def login_ex(self, app, data): match resp['pam_response']['code']: case pam.PAM_SUCCESS: - cred = LoginPasswordSessionManagerCredentials(resp['user_data'], CURRENT_AAL.level) + if cred_type == 'ONETIME_PASSWORD': + cred = LoginOnetimePasswordSessionManagerCredentials(resp['user_data'], CURRENT_AAL.level) + else: + cred = LoginPasswordSessionManagerCredentials(resp['user_data'], CURRENT_AAL.level) + await login_fn(app, cred) case pam.PAM_AUTH_ERR: await self.middleware.log_audit_message(app, 'AUTHENTICATION', { 'credentials': { - 'credentials': 'LOGIN_PASSWORD', + 'credentials': cred_type, 'credentials_data': {'username': data['username']}, }, 'error': 'Bad username or password' @@ -678,10 +713,10 @@ async def login_ex(self, app, data): case _: await self.middleware.log_audit_message(app, 'AUTHENTICATION', { 'credentials': { - 'credentials': 'LOGIN_PASSWORD', + 'credentials': cred_type, 'credentials_data': {'username': data['username']}, }, - 'error': resp['pam_response']['reason'] + 'error': resp['pam_response']['reason'] or resp['pam_response']['otpw_response'] }, False) case AuthMech.API_KEY_PLAIN: @@ -890,6 +925,8 @@ async def get_login_user(self, app, username, password, mechanism): combination and returns user information and whether additional OTP is required. """ otp_required = False + otpw_used = False + resp = await self.middleware.call( 'auth.authenticate_plain', username, password, @@ -897,11 +934,14 @@ async def get_login_user(self, app, username, password, mechanism): app=app ) if mechanism == AuthMech.PASSWORD_PLAIN and resp['pam_response']['code'] == pam.PAM_SUCCESS: - twofactor_auth = await self.middleware.call('auth.twofactor.config') - if twofactor_auth['enabled'] and '2FA' in resp['user_data']['account_attributes']: - otp_required = True + if 'OTPW' in resp['user_data']['account_attributes']: + otpw_used = True + else: + twofactor_auth = await self.middleware.call('auth.twofactor.config') + if twofactor_auth['enabled'] and '2FA' in resp['user_data']['account_attributes']: + otp_required = True - return resp | {'otp_required': otp_required} + return resp | {'otp_required': otp_required, 'otpw_used': otpw_used} @cli_private @api_method(AuthLegacyApiKeyLoginArgs, AuthLoginResult, authentication_required=False) @@ -1055,7 +1095,7 @@ async def check_permission(middleware, app): query = {'uid': origin.uid} user_info = {'id': None, 'uid': None, 'local': False} - user = await middleware.call('auth.authenticate_user', query, user_info, False) + user = await middleware.call('auth.authenticate_user', query, user_info, None) if user is None: return diff --git a/src/middlewared/middlewared/plugins/auth_/authenticate.py b/src/middlewared/middlewared/plugins/auth_/authenticate.py index 0b009537bba20..231b6ac6c955c 100644 --- a/src/middlewared/middlewared/plugins/auth_/authenticate.py +++ b/src/middlewared/middlewared/plugins/auth_/authenticate.py @@ -8,6 +8,7 @@ ) from middlewared.service import Service, pass_app, private from middlewared.service_exception import CallError +from middlewared.utils.auth import OTPW_MANAGER, OTPWResponse from middlewared.utils.crypto import check_unixhash PAM_SERVICES = {MIDDLEWARE_PAM_SERVICE, MIDDLEWARE_PAM_API_KEY_SERVICE} @@ -46,7 +47,11 @@ async def authenticate_plain(self, app, username, password, is_api_key=False): # # In all failure cases libpam_authenticate is called so that timing # is consistent with pam_fail_delay - if not is_api_key and username == 'root' and await self.middleware.call('privilege.always_has_root_password_enabled'): + if ( + not is_api_key and + username == 'root' and + await self.middleware.call('privilege.always_has_root_password_enabled') + ): if not unixhash_is_valid(unixhash): await self.middleware.call('auth.libpam_authenticate', username, password) elif await self.middleware.run_in_thread(check_unixhash, password, unixhash): @@ -57,8 +62,32 @@ async def authenticate_plain(self, app, username, password, is_api_key=False): else: pam_resp = await self.middleware.call('auth.libpam_authenticate', username, password, pam_svc, app=app) + # TODO: this needs better integration with PAM. We may have a onetime password (not TOTP token) + # We perform after failed PAM authentication so that we properly evaluate whether account is disabled + # This unfortunately means that authentication via onetime password always has pam error delay. + if pam_resp['code'] == pam.PAM_AUTH_ERR and not is_api_key: + # Check for non-local user + if (uid := user_info['uid']) is None: + # query directly by name bypasses middleware cache and performs NSS lookup + ds_user = await self.middleware.call('user.query', [['username', '=', username]]) + if ds_user: + uid = ds_user[0]['uid'] + + if uid is not None: + # we can try to authenticate via onetime password preserving orginal + # pam response if password isn't a known OTP + if (resp := await self.middleware.call('auth.onetime_password_authenticate', uid, password)) is not None: + pam_resp = resp + if pam_resp['code'] == pam.PAM_SUCCESS: - user_token = await self.authenticate_user({'username': username}, user_info, is_api_key) + if is_api_key: + cred_tag = 'API_KEY' + elif 'otpw_response' in pam_resp: + cred_tag = 'OTPW' + else: + cred_tag = None + + user_token = await self.authenticate_user({'username': username}, user_info, cred_tag) if user_token is None: # Some error occurred when trying to generate our user token pam_resp['code'] = pam.PAM_AUTH_ERR @@ -66,6 +95,24 @@ async def authenticate_plain(self, app, username, password, is_api_key=False): return {'pam_response': pam_resp, 'user_data': user_token} + @private + def onetime_password_authenticate(self, uid, password): + resp = {'code': pam.PAM_AUTH_ERR, 'reason': 'Authentication failure', 'otpw_response': None} + otpw_resp = OTPW_MANAGER.authenticate(uid, password) + match otpw_resp: + case OTPWResponse.SUCCESS: + resp = {'code': pam.PAM_SUCCESS, 'reason': '', 'otpw_response': otpw_resp} + case OTPWResponse.EXPIRED: + resp = {'code': pam.PAM_CRED_EXPIRED, 'otpw_response': 'Onetime password is expired'} + case OTPWResponse.NO_KEY: + # This onetime password doesn't exist. Returning None + # will fallback to original pam response + resp = None + case _: + resp['otpw_response'] = f'Onetime password authentication failed: {otpw_resp}' + + return resp + @private @pass_app() def libpam_authenticate(self, app, username, password, pam_service=MIDDLEWARE_PAM_SERVICE): @@ -116,7 +163,7 @@ def libpam_authenticate(self, app, username, password, pam_service=MIDDLEWARE_PA return pam_resp @private - async def authenticate_user(self, query, user_info, is_api_key): + async def authenticate_user(self, query, user_info, cred_tag=None): try: user = await self.middleware.call('user.get_user_obj', { **query, 'get_groups': True, @@ -181,8 +228,8 @@ async def authenticate_user(self, query, user_info, is_api_key): if twofactor_enabled: account_flags.append('2FA') - if is_api_key: - account_flags.append('API_KEY') + if cred_tag: + account_flags.append(cred_tag) if user['pw_uid'] in (0, ADMIN_UID): if not user['local']: diff --git a/src/middlewared/middlewared/utils/auth.py b/src/middlewared/middlewared/utils/auth.py index 1c5588010619d..9c0ee461beddd 100644 --- a/src/middlewared/middlewared/utils/auth.py +++ b/src/middlewared/middlewared/utils/auth.py @@ -1,5 +1,8 @@ import enum +import threading from dataclasses import dataclass +from time import monotonic +from .crypto import generate_string, sha512_crypt, check_unixhash LEGACY_API_KEY_USERNAME = 'LEGACY_API_KEY' MAX_OTP_ATTEMPTS = 3 @@ -58,7 +61,11 @@ def aal_auth_mechanism_check(mechanism_str: str, aal: AuthenticatorAssuranceLeve AA_LEVEL1 = AuthenticatorAssuranceLevel( max_session_age=86400 * 30, max_inactivity=None, - mechanisms=(AuthMech.API_KEY_PLAIN, AuthMech.TOKEN_PLAIN, AuthMech.PASSWORD_PLAIN), + mechanisms=( + AuthMech.API_KEY_PLAIN, + AuthMech.TOKEN_PLAIN, + AuthMech.PASSWORD_PLAIN, + ), otp_mandatory=False ) @@ -91,3 +98,79 @@ def aal_auth_mechanism_check(mechanism_str: str, aal: AuthenticatorAssuranceLeve ) CURRENT_AAL = ServerAAL(AA_LEVEL1) + + +class OTPWResponse(enum.StrEnum): + SUCCESS = 'SUCCESS' + EXPIRED = 'EXPIRED' + NO_KEY = 'NO_KEY' + ALREADY_USED = 'ALREADY_USED' + WRONG_USER = 'WRONG_USER' + BAD_PASSKEY = 'BAD_PASSKEY' + + +@dataclass(slots=True) +class UserOnetimePassword: + uid: int # UID of related user + expires: int # expiration time (monotonic) + keyhash: str # hash of onetime password + used: bool = False # whether password has been used for authentication + + +class OnetimePasswordManager: + """ + This class stores passkeys that may be used precisely once to authenticate + to the TrueNAS server as a particular user. This is to provide a mechanism + for a system administrator to provision a temporary password for a user + that may be used to set two-factor authentication and user password. + """ + otpasswd = {} + lock = threading.Lock() + cnt = 0 + + def generate_for_uid(self, uid: int) -> str: + """ + Generate a passkey for the given UID. + + Format is "_" + We store a sha512 hash of the plaintext for authentication purposes + """ + with self.lock: + plaintext = generate_string(string_size=24) + keyhash = sha512_crypt(plaintext) + expires = monotonic() + 86400 + + entry = UserOnetimePassword(uid=uid, expires=expires, keyhash=keyhash) + self.cnt += 1 + self.otpasswd[str(self.cnt)] = entry + return f'{self.cnt}_{plaintext}' + + def authenticate(self, uid: int, plaintext: str) -> OTPWResponse: + """ Check passkey matches plaintext string. """ + try: + idx, passwd = plaintext.split('_') + except Exception: + return OTPWResponse.NO_KEY + + if (entry := self.otpasswd.get(idx)) is None: + return OTPWResponse.NO_KEY + + with self.lock: + if entry.uid != uid: + return OTPWResponse.WRONG_USER + + if entry.used: + return OTPWResponse.ALREADY_USED + + if monotonic() > entry.expires: + return OTPWResponse.EXPIRED + + if not check_unixhash(passwd, entry.keyhash): + return OTPWResponse.BAD_PASSKEY + + entry.used = True + + return OTPWResponse.SUCCESS + + +OTPW_MANAGER = OnetimePasswordManager() diff --git a/tests/api2/test_030_activedirectory.py b/tests/api2/test_030_activedirectory.py index e6b24578b452c..39a578f9062b6 100644 --- a/tests/api2/test_030_activedirectory.py +++ b/tests/api2/test_030_activedirectory.py @@ -107,6 +107,16 @@ def set_product_type(request): yield +@pytest.fixture(scope="function") +def enable_ds_auth(set_product_type): + call("system.general.update", {"ds_auth": True}) + + try: + yield + finally: + call("system.general.update", {"ds_auth": False}) + + @pytest.fixture(scope="function") def set_ad_nameserver(request): with override_nameservers() as ns: @@ -329,54 +339,72 @@ def test_activedirectory_smb_ops(): assert acl['trivial'] is False, str(acl) -def test_account_privilege_authentication(set_product_type): +def test_account_privilege_authentication(enable_ds_auth): reset_systemd_svcs('winbind smbd') with active_directory(dns_timeout=15): - call("system.general.update", {"ds_auth": True}) nusers = call("user.query", [["local", "=", False]], {"count": True}) assert nusers > 0 ngroups = call("group.query", [["local", "=", False]], {"count": True}) assert ngroups > 0 - try: - # RID 513 is constant for "Domain Users" - domain_sid = call("idmap.domain_info", AD_DOMAIN.split(".")[0])['sid'] - with privilege({ - "name": "AD privilege", - "local_groups": [], - "ds_groups": [f"{domain_sid}-513"], - "roles": ["READONLY_ADMIN"], - "web_shell": False, - }): - with client(auth=(f"limiteduser@{AD_DOMAIN}", ADPASSWORD)) as c: - methods = c.call("core.get_methods") - me = c.call("auth.me") - - assert 'DIRECTORY_SERVICE' in me['account_attributes'] - assert 'ACTIVE_DIRECTORY' in me['account_attributes'] - - assert len(c.call("user.query", [["local", "=", False]])) == nusers - assert len(c.call("group.query", [["local", "=", False]])) == ngroups - - assert "system.info" in methods - assert "pool.create" not in methods - - # ADUSERNAME is member of domain admins and will have - # all privileges - with client(auth=(f"{ADUSERNAME}@{AD_DOMAIN}", ADPASSWORD)) as c: - methods = c.call("core.get_methods") - - assert "pool.create" in methods - - # Alternative formatting for user name \. - # this should also work for auth - with client(auth=(AD_USER, ADPASSWORD)) as c: - methods = c.call("core.get_methods") - - assert "pool.create" in methods - finally: - call("system.general.update", {"ds_auth": False}) + # RID 513 is constant for "Domain Users" + domain_sid = call("idmap.domain_info", AD_DOMAIN.split(".")[0])['sid'] + with privilege({ + "name": "AD privilege", + "local_groups": [], + "ds_groups": [f"{domain_sid}-513"], + "roles": ["READONLY_ADMIN"], + "web_shell": False, + }): + with client(auth=(f'limiteduser@{AD_DOMAIN}', ADPASSWORD)) as c: + methods = c.call("core.get_methods") + me = c.call("auth.me") + + assert 'DIRECTORY_SERVICE' in me['account_attributes'] + assert 'ACTIVE_DIRECTORY' in me['account_attributes'] + + assert len(c.call("user.query", [["local", "=", False]])) == nusers + assert len(c.call("group.query", [["local", "=", False]])) == ngroups + + assert "system.info" in methods + assert "pool.create" not in methods + + # Verify that onetime password for AD users works + # and that second call fails + username = r'AD02\limiteduser' + otpw = call('auth.generate_onetime_password', {'username': username}) + with client(auth=None) as c: + resp = c.call('auth.login_ex', { + 'mechanism': 'PASSWORD_PLAIN', + 'username': username, + 'password': otpw + }) + + assert resp['response_type'] == 'SUCCESS' + assert resp['user_info']['pw_name'] == username + + resp = c.call('auth.login_ex', { + 'mechanism': 'PASSWORD_PLAIN', + 'username': username, + 'password': otpw + }) + + assert resp['response_type'] == 'AUTH_ERR' + + # ADUSERNAME is member of domain admins and will have + # all privileges + with client(auth=(f"{ADUSERNAME}@{AD_DOMAIN}", ADPASSWORD)) as c: + methods = c.call("core.get_methods") + + assert "pool.create" in methods + + # Alternative formatting for user name \. + # this should also work for auth + with client(auth=(AD_USER, ADPASSWORD)) as c: + methods = c.call("core.get_methods") + + assert "pool.create" in methods def test_secrets_restore(): diff --git a/tests/api2/test_auth_onetime.py b/tests/api2/test_auth_onetime.py new file mode 100644 index 0000000000000..79defed7072a0 --- /dev/null +++ b/tests/api2/test_auth_onetime.py @@ -0,0 +1,63 @@ +import errno +import pytest + +from middlewared.service_exception import CallError +from middlewared.test.integration.utils import client, call + + +@pytest.fixture(scope='module') +def onetime_password_user(unprivileged_user_fixture): + yield unprivileged_user_fixture + + +@pytest.fixture(scope='function') +def onetime_password(onetime_password_user): + otpw = call('auth.generate_onetime_password', {'username': onetime_password_user.username}) + yield (onetime_password_user, otpw) + + +def test_basic_onetime_password_auth(onetime_password): + user, otpw = onetime_password + with client(auth=None) as c: + resp = c.call('auth.login_ex', { + 'mechanism': 'PASSWORD_PLAIN', + 'username': user.username, + 'password': otpw + }) + assert resp['response_type'] == 'SUCCESS' + assert 'OTPW' in resp['user_info']['account_attributes'] + + +def test_onetime_password_auth_reuse_fail(onetime_password): + user, otpw = onetime_password + with client(auth=None) as c: + resp = c.call('auth.login_ex', { + 'mechanism': 'PASSWORD_PLAIN', + 'username': user.username, + 'password': otpw + }) + assert resp['response_type'] == 'SUCCESS' + assert 'OTPW' in resp['user_info']['account_attributes'] + + resp = c.call('auth.login_ex', { + 'mechanism': 'PASSWORD_PLAIN', + 'username': user.username, + 'password': otpw + }) + assert resp['response_type'] == 'AUTH_ERR' + + +def test_onetime_password_generate_token_fail(onetime_password): + user, otpw = onetime_password + with client(auth=None) as c: + resp = c.call('auth.login_ex', { + 'mechanism': 'PASSWORD_PLAIN', + 'username': user.username, + 'password': otpw + }) + assert resp['response_type'] == 'SUCCESS' + + with pytest.raises(CallError) as ce: + c.call('auth.generate_token') + + assert ce.value.errno == errno.EOPNOTSUPP diff --git a/tests/api2/test_authenticator_assurance_level.py b/tests/api2/test_authenticator_assurance_level.py index 26ddc392f0c9f..b2fcbce87165e 100644 --- a/tests/api2/test_authenticator_assurance_level.py +++ b/tests/api2/test_authenticator_assurance_level.py @@ -100,3 +100,33 @@ def test_level2_password_with_otp(sharing_admin_user): c.call('auth.generate_token') assert ce.value.errno == errno.EOPNOTSUPP + + +def test_level2_onetime_password(sharing_admin_user): + onetime_password = call('auth.generate_onetime_password', {'username': sharing_admin_user.username}) + + with authenticator_assurance_level('LEVEL_2'): + with client(auth=None) as c: + resp = c.call('auth.login_ex', { + 'mechanism': 'PASSWORD_PLAIN', + 'username': sharing_admin_user.username, + 'password': onetime_password + }) + assert resp['response_type'] == 'SUCCESS' + assert resp['authenticator'] == 'LEVEL_2' + + me = c.call('auth.me') + assert me['pw_name'] == sharing_admin_user.username + assert 'SHARING_ADMIN' in me['privilege']['roles'] + assert 'OTPW' in me['account_attributes'] + + # attempt to reuse the onetime password should fail with EOPNOTSUPP + # because we don't want to leak info about onetime password status. + with pytest.raises(CallError) as ce: + c.call('auth.login_ex', { + 'mechanism': 'PASSWORD_PLAIN', + 'username': sharing_admin_user.username, + 'password': onetime_password + }) + + assert ce.value.errno == errno.EOPNOTSUPP diff --git a/tests/unit/test_otpw_manager.py b/tests/unit/test_otpw_manager.py new file mode 100644 index 0000000000000..0e784805fce17 --- /dev/null +++ b/tests/unit/test_otpw_manager.py @@ -0,0 +1,51 @@ +from middlewared.utils.auth import OTPW_MANAGER, OTPWResponse + + +def test__auth_success(): + passwd = OTPW_MANAGER.generate_for_uid(1000) + resp = OTPW_MANAGER.authenticate(1000, passwd) + assert resp is OTPWResponse.SUCCESS + + +def test__auth_used(): + passwd = OTPW_MANAGER.generate_for_uid(1000) + resp = OTPW_MANAGER.authenticate(1000, passwd) + assert resp is OTPWResponse.SUCCESS + + resp = OTPW_MANAGER.authenticate(1000, passwd) + assert resp is OTPWResponse.ALREADY_USED + + +def test__auth_nokey(): + resp = OTPW_MANAGER.authenticate(1000, '80000_canary') + assert resp is OTPWResponse.NO_KEY + + +def test__auth_bad_passkey(): + passwd = OTPW_MANAGER.generate_for_uid(1000) + resp = OTPW_MANAGER.authenticate(1000, passwd + 'bad') + assert resp is OTPWResponse.BAD_PASSKEY + + # This shouldn't prevent using correct passkey + resp = OTPW_MANAGER.authenticate(1000, passwd) + assert resp is OTPWResponse.SUCCESS + + +def test__auth_wrong_user(): + passwd = OTPW_MANAGER.generate_for_uid(1000) + resp = OTPW_MANAGER.authenticate(1001, passwd) + assert resp is OTPWResponse.WRONG_USER + + # This shouldn't prevent correct user + resp = OTPW_MANAGER.authenticate(1000, passwd) + assert resp is OTPWResponse.SUCCESS + + +def test__auth_expired(): + passwd = OTPW_MANAGER.generate_for_uid(1000) + idx, plaintext = passwd.split('_') + + OTPW_MANAGER.otpasswd[idx].expires = 1 + + resp = OTPW_MANAGER.authenticate(1000, passwd) + assert resp is OTPWResponse.EXPIRED