Skip to content

Commit

Permalink
Add ability for administrator to create onetime passwords
Browse files Browse the repository at this point in the history
This commit adds an API endpoint for administrator with ACCOUNT_WRITE
privileges to create single-use passwords for user accounts that
allow the user to authenticate to the truenas server. This is
needed for STIG environments to allow administrators to help with
creating new user accounts so that the user can authenticate to
the TrueNAS server in order to configure two-factor authentication.

The single-use password may be used with the auth.login_ex mechanism
PASSWORD_PLAIN.
  • Loading branch information
anodos325 committed Jan 14, 2025
1 parent 06cd9cc commit 7067905
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 19 deletions.
11 changes: 10 additions & 1 deletion src/middlewared/middlewared/api/v25_04_0/auth.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from middlewared.api.base import BaseModel, single_argument_result
from middlewared.api.base import BaseModel, single_argument_args, single_argument_result
from middlewared.utils.auth import AuthMech, AuthResp
from datetime import datetime
from pydantic import Field, Secret
Expand Down Expand Up @@ -211,3 +211,12 @@ class AuthSetAttributeArgs(BaseModel):

class AuthSetAttributeResult(BaseModel):
result: Literal[None]


@single_argument_args('generate_single_use_password')
class AuthGenerateOnetimePasswordArgs(BaseModel):
username: str


class AuthGenerateOnetimePasswordResult(BaseModel):
result: str
11 changes: 11 additions & 0 deletions src/middlewared/middlewared/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

class SessionManagerCredentials:
is_user_session = False
may_create_auth_token = True
allowlist = None

@classmethod
Expand Down Expand Up @@ -71,6 +72,7 @@ def __init__(self, user: dict, assurance: AuthenticatorAssuranceLevel | None):
self.last_used_at = now

if assurance:
self.may_create_auth_token = AuthMech.TOKEN_PLAIN in assurance.mechanisms
self.expiry = now + self.assurance.max_session_age
self.inactivity_timeout = self.assurance.max_inactivity

Expand Down Expand Up @@ -148,6 +150,15 @@ class LoginTwofactorSessionManagerCredentials(LoginPasswordSessionManagerCredent
pass


class LoginOnetimePasswordSessionManagerCredentials(UserSessionManagerCredentials):
""" Credentials for a specific user account on TrueNAS
Authenticated by username + onetime password ccombination
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.may_create_auth_token = False


class TokenSessionManagerCredentials(SessionManagerCredentials):
def __init__(self, token_manager, token):
self.root_credentials = token.root_credentials()
Expand Down
63 changes: 51 additions & 12 deletions src/middlewared/middlewared/plugins/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@
AuthSetAttributeArgs, AuthSetAttributeResult,
AuthTerminateSessionArgs, AuthTerminateSessionResult,
AuthTerminateOtherSessionsArgs, AuthTerminateOtherSessionsResult,
AuthGenerateOnetimePasswordArgs, AuthGenerateOnetimePasswordResult,
)
from middlewared.auth import (UserSessionManagerCredentials, UnixSocketSessionManagerCredentials,
ApiKeySessionManagerCredentials, LoginPasswordSessionManagerCredentials,
LoginTwofactorSessionManagerCredentials, AuthenticationContext,
TruenasNodeSessionManagerCredentials, TokenSessionManagerCredentials,
dump_credentials)
LoginOnetimePasswordSessionManagerCredentials, dump_credentials)
from middlewared.plugins.account_.constants import MIDDLEWARE_PAM_SERVICE, MIDDLEWARE_PAM_API_KEY_SERVICE
from middlewared.service import (
Service, filterable_api_method, filter_list,
pass_app, private, cli_private, CallError,
)
from middlewared.service_exception import MatchNotFound
from middlewared.service_exception import MatchNotFound, ValidationError
import middlewared.sqlalchemy as sa
from middlewared.utils.auth import (
aal_auth_mechanism_check, AuthMech, AuthResp, AuthenticatorAssuranceLevel, AA_LEVEL1,
AA_LEVEL2, AA_LEVEL3, CURRENT_AAL, MAX_OTP_ATTEMPTS,
AA_LEVEL2, AA_LEVEL3, CURRENT_AAL, MAX_OTP_ATTEMPTS, OTPW_MANAGER,
)
from middlewared.utils.crypto import generate_token
from middlewared.utils.time_utils import utc_now
Expand Down Expand Up @@ -302,6 +303,25 @@ async def terminate_other_sessions(self, app):

return True

@api_method(
AuthGenerateOnetimePasswordArgs, AuthGenerateOnetimePasswordResult,
roles=['ACCOUNT_WRITE'],
audit='Generate onetime password for user'
)
def generate_onetime_password(self, data):
"""
Generate a password for the specified username that may be used only a single time to authenticate
to TrueNAS. This may be used by server administrators to allow users authenticate and then set
a proper password and two-factor authentication token.
"""
username = data['username']
user_data = self.middleware.call_sync('user.query', [['username', '=', username]])
if not user_data:
raise ValidationError('auth.generate_onetime_password.username', f'{username}: user does not exist.')

passwd = OTPW_MANAGER.generate_for_uid(user_data[0]['uid'])
return passwd

@api_method(AuthGenerateTokenArgs, AuthGenerateTokenResult, authorization_required=False)
@pass_app(rest=True)
def generate_token(self, app, ttl, attrs, match_origin):
Expand All @@ -324,6 +344,12 @@ def generate_token(self, app, ttl, attrs, match_origin):
errno.EOPNOTSUPP
)

if app and not app.authenticated_credentials.may_create_auth_token:
raise CallError(
f'{app.authenticated_credentials.class_name()}: the current session type does '
'not support creation of authentication tokens.'
)

if ttl is None:
ttl = 600

Expand Down Expand Up @@ -638,12 +664,14 @@ async def login_ex(self, app, data):
case AuthMech.PASSWORD_PLAIN:
# Both of these mechanisms are de-factor username + password
# combinations and pass through libpam.
cred_type = 'LOGIN_PASSWORD'
resp = await self.get_login_user(
app,
data['username'],
data['password'],
mechanism
)

if resp['otp_required']:
# A one-time password is required for this user account and so
# we should request it from API client.
Expand All @@ -653,6 +681,8 @@ async def login_ex(self, app, data):
'response_type': AuthResp.OTP_REQUIRED,
'username': resp['user_data']['username']
}
elif resp['otpw_used']:
cred_type = 'ONETIME_PASSWORD'
elif CURRENT_AAL.level.otp_mandatory:
if resp['pam_response'] == 'SUCCESS':
# Insert a failure delay so that we don't leak information about
Expand All @@ -665,23 +695,27 @@ async def login_ex(self, app, data):

match resp['pam_response']['code']:
case pam.PAM_SUCCESS:
cred = LoginPasswordSessionManagerCredentials(resp['user_data'], CURRENT_AAL.level)
if cred_type == 'ONETIME_PASSWORD':
cred = LoginOnetimePasswordSessionManagerCredentials(resp['user_data'], CURRENT_AAL.level)
else:
cred = LoginPasswordSessionManagerCredentials(resp['user_data'], CURRENT_AAL.level)

await login_fn(app, cred)
case pam.PAM_AUTH_ERR:
await self.middleware.log_audit_message(app, 'AUTHENTICATION', {
'credentials': {
'credentials': 'LOGIN_PASSWORD',
'credentials': cred_type,
'credentials_data': {'username': data['username']},
},
'error': 'Bad username or password'
}, False)
case _:
await self.middleware.log_audit_message(app, 'AUTHENTICATION', {
'credentials': {
'credentials': 'LOGIN_PASSWORD',
'credentials': cred_type,
'credentials_data': {'username': data['username']},
},
'error': resp['pam_response']['reason']
'error': resp['pam_response']['reason'] or resp['pam_response']['otpw_response']
}, False)

case AuthMech.API_KEY_PLAIN:
Expand Down Expand Up @@ -890,18 +924,23 @@ async def get_login_user(self, app, username, password, mechanism):
combination and returns user information and whether additional OTP is required.
"""
otp_required = False
otpw_used = False

resp = await self.middleware.call(
'auth.authenticate_plain',
username, password,
mechanism == AuthMech.API_KEY_PLAIN,
app=app
)
if mechanism == AuthMech.PASSWORD_PLAIN and resp['pam_response']['code'] == pam.PAM_SUCCESS:
twofactor_auth = await self.middleware.call('auth.twofactor.config')
if twofactor_auth['enabled'] and '2FA' in resp['user_data']['account_attributes']:
otp_required = True
if 'OTPW' in resp['user_data']['account_attributes']:
otpw_used = True
else:
twofactor_auth = await self.middleware.call('auth.twofactor.config')
if twofactor_auth['enabled'] and '2FA' in resp['user_data']['account_attributes']:
otp_required = True

return resp | {'otp_required': otp_required}
return resp | {'otp_required': otp_required, 'otpw_used': otpw_used}

@cli_private
@api_method(AuthLegacyApiKeyLoginArgs, AuthLoginResult, authentication_required=False)
Expand Down Expand Up @@ -1055,7 +1094,7 @@ async def check_permission(middleware, app):
query = {'uid': origin.uid}
user_info = {'id': None, 'uid': None, 'local': False}

user = await middleware.call('auth.authenticate_user', query, user_info, False)
user = await middleware.call('auth.authenticate_user', query, user_info, None)
if user is None:
return

Expand Down
57 changes: 52 additions & 5 deletions src/middlewared/middlewared/plugins/auth_/authenticate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
)
from middlewared.service import Service, pass_app, private
from middlewared.service_exception import CallError
from middlewared.utils.auth import OTPW_MANAGER, OTPWResponse
from middlewared.utils.crypto import check_unixhash

PAM_SERVICES = {MIDDLEWARE_PAM_SERVICE, MIDDLEWARE_PAM_API_KEY_SERVICE}
Expand Down Expand Up @@ -46,7 +47,11 @@ async def authenticate_plain(self, app, username, password, is_api_key=False):
#
# In all failure cases libpam_authenticate is called so that timing
# is consistent with pam_fail_delay
if not is_api_key and username == 'root' and await self.middleware.call('privilege.always_has_root_password_enabled'):
if (
not is_api_key and
username == 'root' and
await self.middleware.call('privilege.always_has_root_password_enabled')
):
if not unixhash_is_valid(unixhash):
await self.middleware.call('auth.libpam_authenticate', username, password)
elif await self.middleware.run_in_thread(check_unixhash, password, unixhash):
Expand All @@ -57,15 +62,57 @@ async def authenticate_plain(self, app, username, password, is_api_key=False):
else:
pam_resp = await self.middleware.call('auth.libpam_authenticate', username, password, pam_svc, app=app)

# TODO: this needs better integration with PAM. We may have a onetime password (not TOTP token)
# We perform after failed PAM authentication so that we properly evaluate whether account is disabled
# This unfortunately means that authentication via onetime password always has pam error delay.
if pam_resp['code'] == pam.PAM_AUTH_ERR and not is_api_key:
# Check for non-local user
if (uid := user_info['uid']) is None:
# query directly by name bypasses middleware cache and performs NSS lookup
ds_user = await self.middleware.cal('user.query', [['username', '=', username]])
if ds_user:
uid = ds_user[0]['uid']

if uid is not None:
# we can try to authenticate via onetime password preserving orginal
# pam response if password isn't a known OTP
if (resp := await self.middleware.call('auth.onetime_password_authenticate', uid, password)) is not None:
pam_resp = resp

if pam_resp['code'] == pam.PAM_SUCCESS:
user_token = await self.authenticate_user({'username': username}, user_info, is_api_key)
if is_api_key:
cred_tag = 'API_KEY'
elif 'otpw_response' in pam_resp:
cred_tag = 'OTPW'
else:
cred_tag = None

user_token = await self.authenticate_user({'username': username}, user_info, cred_tag)
if user_token is None:
# Some error occurred when trying to generate our user token
pam_resp['code'] = pam.PAM_AUTH_ERR
pam_resp['reason'] = 'Failed to generate user token'

return {'pam_response': pam_resp, 'user_data': user_token}

@private
def onetime_password_authenticate(self, uid, password):
resp = {'code': pam.PAM_AUTH_ERR, 'reason': 'Authentication failure', 'otpw_response': None}
otpw_resp = OTPW_MANAGER.authenticate(uid, password)
match otpw_resp:
case OTPWResponse.SUCCESS:
resp = {'code': pam.PAM_SUCCESS, 'reason': '', 'otpw_response': otpw_resp}
case OTPWResponse.EXPIRED:
resp = {'code': pam.PAM_CRED_EXPIRED, 'otpw_response': 'Onetime password is expired'}
case OTPWResponse.NO_KEY:
# This onetime password doesn't exist. Returning None
# will fallback to original pam response
resp = None
case _:
resp['otpw_response'] = f'Onetime password authentication failed: {otpw_resp}'

return resp

@private
@pass_app()
def libpam_authenticate(self, app, username, password, pam_service=MIDDLEWARE_PAM_SERVICE):
Expand Down Expand Up @@ -116,7 +163,7 @@ def libpam_authenticate(self, app, username, password, pam_service=MIDDLEWARE_PA
return pam_resp

@private
async def authenticate_user(self, query, user_info, is_api_key):
async def authenticate_user(self, query, user_info, cred_tag=None):
try:
user = await self.middleware.call('user.get_user_obj', {
**query, 'get_groups': True,
Expand Down Expand Up @@ -181,8 +228,8 @@ async def authenticate_user(self, query, user_info, is_api_key):
if twofactor_enabled:
account_flags.append('2FA')

if is_api_key:
account_flags.append('API_KEY')
if cred_tag:
account_flags.append(cred_tag)

if user['pw_uid'] in (0, ADMIN_UID):
if not user['local']:
Expand Down
Loading

0 comments on commit 7067905

Please sign in to comment.