Skip to content

Commit

Permalink
Merge pull request #638 from dynamic-entropy/auto_approve_refactoring
Browse files Browse the repository at this point in the history
Auto approve refactoring
  • Loading branch information
ericvaandering authored Nov 1, 2023
2 parents 28b29c0 + 60d7fca commit 47c635d
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 119 deletions.
105 changes: 5 additions & 100 deletions src/policy/CMSRucioPolicy/algorithms/auto_approve.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
"""
Auto approve algorithm for CMS Rucio policy
"""
import logging
from configparser import NoOptionError, NoSectionError
from datetime import datetime

from rucio.common.config import config_get
from sqlalchemy.sql import func


def global_approval(rule, did, session) -> bool:
Expand All @@ -32,102 +26,13 @@ def global_approval(rule, did, session) -> bool:
:returns: True if the rule should be auto approved, False otherwise
"""

from rucio.core.account import has_account_attribute
from rucio.core.did import list_files
from rucio.core.rse_expression_parser import parse_expression
from rucio.core.rule import list_rules
from rucio.db.sqla.models import ReplicaLock, ReplicationRule

def _get_rule_size(rules):
rule_size = 0
for rule in rules:
scope = rule['scope']
name = rule['name']
rule_files = list_files(scope, name, session=session)
rule_size += sum([file['bytes'] for file in rule_files])
return rule_size

account = rule['account']
try:
global_usage_all_accounts = float(config_get(
'rules', 'global_usage_all_accounts', raise_exception=True, default=1e16))
except (NoOptionError, NoSectionError, RuntimeError):
global_usage_all_accounts = 1e16

try:
global_usage_per_account = float(config_get(
'rules', 'global_usage_per_account', raise_exception=True, default=1e15))
except (NoOptionError, NoSectionError, RuntimeError):
global_usage_per_account = 1e15

try:
rule_lifetime_threshold = int(config_get('rules', 'rule_lifetime_threshold',
raise_exception=True, default=2592000))
except (NoOptionError, NoSectionError, RuntimeError):
rule_lifetime_threshold = 2592000

try:
single_rse_rule_size_threshold = float(config_get(
'rules', 'single_rse_rule_size_threshold', raise_exception=True, default=50e12))
except (NoOptionError, NoSectionError, RuntimeError):
single_rse_rule_size_threshold = 50e12

auto_approve_activity = 'User AutoApprove'

# Check if the account is banned
if has_account_attribute(account, 'rule_banned', session=session):
return False

# All checks are performed at rule creation
# The approval conditions are define in _check_for_auto_approve_eligibility function in the permissions module
# Check activity is User AutoApprove
if rule['activity'] != auto_approve_activity:
return False

# Check if the rule is locked
if rule['locked']:
return False

if rule['expires_at'] is None:
return False

lifetime = (rule['expires_at'] - datetime.utcnow()).total_seconds()
if lifetime > rule_lifetime_threshold:
return False

size_of_rule = sum([file['bytes'] for file in list_files(did['scope'], did['name'], session=session)])

# Limit single RSE rules to 50 TB
# This does not mean that the total locks size at a RSE will be limited to 50 TB
# as other rules that are spread over multiple RSEs may claim the same space
# This is just a simple check to avoid a single RSE rules from being too large
rse_expression = rule['rse_expression']
rses = parse_expression(rse_expression, filter_={'availability_write': True}, session=session)
if len(rses) == 1:
this_rse_autoapprove_rules = list_rules(
filters={'account': account, 'activity': auto_approve_activity, 'rse_expression': rse_expression},
session=session)
this_rse_autoapprove_usage = _get_rule_size(this_rse_autoapprove_rules)
if this_rse_autoapprove_usage + size_of_rule > single_rse_rule_size_threshold:
logging.warning('Single RSE usage exceeded for auto approve rules for account %s and RSE %s',
account, rse_expression)
return False

# Check global usage of the account under this activity
all_auto_approve_rules_by_account = list_rules(
filters={'account': account, 'activity': auto_approve_activity}, session=session)
global_auto_approve_usage_by_account = _get_rule_size(all_auto_approve_rules_by_account)
if global_auto_approve_usage_by_account + size_of_rule > global_usage_per_account:
logging.warning('Global usage exceeded for auto approve rules for account %s', account)
return False
if rule['activity'] == auto_approve_activity:
return True

# Check global usage under the AutoApprove category by all accounts
query = session.query(
func.sum(ReplicaLock.bytes)).join(
ReplicationRule, ReplicaLock.rule_id == ReplicationRule.id).filter(
ReplicationRule.activity == 'User AutoApprove')
current_auto_approve_usage = query.scalar()
if current_auto_approve_usage is None:
current_auto_approve_usage = 0
if current_auto_approve_usage + size_of_rule > global_usage_all_accounts:
logging.warning('Global usage exceeded for auto approve rules')
return False
return True
return False
153 changes: 134 additions & 19 deletions src/policy/CMSRucioPolicy/permission.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from configparser import NoOptionError, NoSectionError
from datetime import datetime
from sqlalchemy.sql import func
from typing import TYPE_CHECKING

import rucio.core.scope
from rucio.common.config import config_get
from rucio.common.exception import InvalidRSEExpression
from rucio.common.types import InternalScope
from rucio.core.account import has_account_attribute
from rucio.core.did import list_files
from rucio.core.identity import exist_identity_account
from rucio.core.rse import list_rse_attributes, get_rse
from rucio.core.rse_expression_parser import parse_expression
from rucio.core.rule import get_rule
from rucio.core.rule import get_rule, list_rules
from rucio.db.sqla.constants import IdentityType
from rucio.db.sqla.models import ReplicaLock, ReplicationRule

if TYPE_CHECKING:
from typing import Optional
Expand Down Expand Up @@ -167,6 +174,128 @@ def perm_update_rse(issuer, kwargs, *, session: "Optional[Session]" = None):
return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session)


def _check_for_auto_approve_eligibility(issuer, rses, kwargs, session: "Optional[Session]" = None):

def _get_rule_size(rules):
rule_size = 0
for rule in rules:
scope = rule['scope']
name = rule['name']
rule_files = list_files(scope, name, session=session)
rule_size += sum([file['bytes'] for file in rule_files])
return rule_size

# prevent rule creation under 'User AutoApprove' for rules without ask_approval
if not kwargs["ask_approval"]:
return False
# prevent rule creation to tape and Tier3 and Tier0 under the 'User AutoApprove' activity
rule_rses = {rse['rse'] for rse in rses}
try:
t3_rses = {rse['rse'] for rse in parse_expression("tier=3|tier=0", session=session)}
except InvalidRSEExpression:
t3_rses = set()

try:
tape_rses = {rse['rse'] for rse in parse_expression("rse_type=TAPE", session=session)}
except InvalidRSEExpression:
tape_rses = set()

if rule_rses & t3_rses or rule_rses & tape_rses:
return False

account = kwargs['account']
auto_approve_activity = 'User AutoApprove'
dids = kwargs['dids']
try:
global_usage_all_accounts = float(config_get(
'rules', 'global_usage_all_accounts', raise_exception=True, default=1e16))
except (NoOptionError, NoSectionError, RuntimeError):
global_usage_all_accounts = 1e16

try:
global_usage_per_account = float(config_get(
'rules', 'global_usage_per_account', raise_exception=True, default=1e15))
except (NoOptionError, NoSectionError, RuntimeError):
global_usage_per_account = 1e15

try:
rule_lifetime_threshold = int(config_get('rules', 'rule_lifetime_threshold',
raise_exception=True, default=30*24*3600))
except (NoOptionError, NoSectionError, RuntimeError):
rule_lifetime_threshold = 30*24*3600

try:
single_rse_rule_size_threshold = float(config_get(
'rules', 'single_rse_rule_size_threshold', raise_exception=True, default=50e12))
except (NoOptionError, NoSectionError, RuntimeError):
single_rse_rule_size_threshold = 50e12

# Check if the account is banned
if has_account_attribute(account, 'auto_approve_banned', session=session):
return False

# Check if the rule is locked
if kwargs['locked']:
return False

if kwargs['lifetime'] is None:
return False

if kwargs['lifetime'] > rule_lifetime_threshold:
return False

for did in dids:
size_of_rule = sum([file['bytes']
for file in list_files(InternalScope(did['scope']),
did['name'],
session=session)])

# Limit single RSE rules to 50 TB
# This does not mean that the total locks size at a RSE will be limited to 50 TB
# as other rules that are spread over multiple RSEs may claim the same space
# This is just a simple check to avoid a single RSE rules from being too large
rse_expression = kwargs['rse_expression']
rses = parse_expression(rse_expression, filter_={'availability_write': True}, session=session)

if len(rses) == 1:
this_rse_autoapprove_rules = list_rules(
filters={'account': account, 'activity': auto_approve_activity, 'rse_expression': rse_expression},
session=session)
this_rse_autoapprove_usage = _get_rule_size(this_rse_autoapprove_rules)
if this_rse_autoapprove_usage + size_of_rule > single_rse_rule_size_threshold:
logging.warning(
'Single RSE usage exceeded for auto approve rules for account %s and RSE %s, this_rse_autoapprove_usage, size_of_rule, single_rse_rule_size_threshold: %s, %s, %s',
account, rse_expression, this_rse_autoapprove_usage, size_of_rule,
single_rse_rule_size_threshold)
return False

# Check global usage of the account under this activity
all_auto_approve_rules_by_account = list_rules(
filters={'account': account, 'activity': auto_approve_activity}, session=session)
global_auto_approve_usage_by_account = _get_rule_size(all_auto_approve_rules_by_account)
if global_auto_approve_usage_by_account + size_of_rule > global_usage_per_account:
logging.warning(
'Global usage exceeded for auto approve rules for account %s, current usage, size of rule, global_usage_per_account: %s, %s, %s',
account, global_auto_approve_usage_by_account, size_of_rule, global_usage_per_account)
return False

# Check global usage under the AutoApprove category by all accounts
query = session.query(
func.sum(ReplicaLock.bytes)).join(
ReplicationRule, ReplicaLock.rule_id == ReplicationRule.id).filter(
ReplicationRule.activity == 'User AutoApprove')
current_auto_approve_usage = query.scalar()
if current_auto_approve_usage is None:
current_auto_approve_usage = 0
if current_auto_approve_usage + size_of_rule > global_usage_all_accounts:
logging.warning('Global usage exceeded for auto approve rules, current usage, size of rule, '
'global_usage_all_accounts: %s, %s, %s', current_auto_approve_usage, size_of_rule,
global_usage_all_accounts)
return False

return True


def perm_add_rule(issuer, kwargs, *, session: "Optional[Session]" = None):
"""
Checks if an account can add a replication rule.
Expand All @@ -179,13 +308,6 @@ def perm_add_rule(issuer, kwargs, *, session: "Optional[Session]" = None):

rses = parse_expression(kwargs['rse_expression'], filter_={'vo': issuer.vo}, session=session)

# Keep while sync is running so it can make rules on all RSEs
if _is_root(issuer) and repr(kwargs['account']).startswith('sync_'):
return True

if isinstance(repr(issuer), str) and repr(issuer).startswith('sync_'): # noqa
return True

# If any of RSEs matching the expression needs approval, the rule cannot be created
if not kwargs['ask_approval']:
for rse in rses:
Expand All @@ -194,17 +316,7 @@ def perm_add_rule(issuer, kwargs, *, session: "Optional[Session]" = None):
return False

if kwargs["activity"] == "User AutoApprove":
# prevent rule creation under 'User AutoApprove' for rules without ask_approval
if not kwargs["ask_approval"]:
return False
# prevent rule creation to tape and Tier3 and Tier0 under the 'User AutoApprove' activity
rule_rses = {rse['rse'] for rse in rses}
t3_rses = {rse['rse'] for rse in parse_expression("tier=3|tier=0", filter_={'vo': issuer.vo}, session=session)}
tape_rses = {rse['rse'] for rse in parse_expression(
"rse_type=TAPE", filter_={'vo': issuer.vo}, session=session)}

if rule_rses.intersection(t3_rses) or rule_rses.intersection(tape_rses):
return False
return _check_for_auto_approve_eligibility(issuer, rses, kwargs, session=session)

# Anyone can use _Temp RSEs if a lifetime is set and under a month
all_temp = True
Expand All @@ -217,10 +329,13 @@ def perm_add_rule(issuer, kwargs, *, session: "Optional[Session]" = None):
if all_temp and kwargs['lifetime'] is not None and kwargs['lifetime'] < 31 * 24 * 60 * 60:
return True

# Non admin users can create rules without the ability to lock them
if kwargs['account'] == issuer and not kwargs['locked']:
return True

if _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session):
return True

return False


Expand Down

0 comments on commit 47c635d

Please sign in to comment.