From 8e9a03d1df4c74e0ad8d649bfb6f55f758564bc7 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Wed, 21 Dec 2022 13:48:23 -0800 Subject: [PATCH 1/6] python: add assumeFuture argument to flux.util.parse_datetime Problem: The parse_datetime() utility function is statically configured to assume future dates when a term like "Friday" is given. That is, instead of giving the date for the previous Friday, the function will return then next Friday instead. This behavior should be configurable. Add an assumeFuture parameter to the function which defaults to True. If set to False, then parse_datetime() will assume dates in the past instead. --- src/bindings/python/flux/util.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/bindings/python/flux/util.py b/src/bindings/python/flux/util.py index 6f561f1add26..0ead970814b2 100644 --- a/src/bindings/python/flux/util.py +++ b/src/bindings/python/flux/util.py @@ -331,7 +331,7 @@ def parse_fsd(fsd_string): return seconds -def parse_datetime(string, now=None): +def parse_datetime(string, now=None, assumeFuture=True): """Parse a possibly human readable datetime string or offset If string starts with `+` or `-`, then the remainder of the string @@ -369,6 +369,9 @@ def parse_datetime(string, now=None): cal = Calendar() cal.ptc.StartHour = 0 + if not assumeFuture: + cal.ptc.DOWParseStyle = 0 + cal.ptc.YearParseStyle = 0 time_struct, status = cal.parse(string, sourceTime=now.timetuple()) if status == 0: raise ValueError(f'Invalid datetime: "{string}"') From 95386e5d905a8f62c10d764452a74fc686e07abb Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sun, 7 Jan 2024 09:15:19 -0800 Subject: [PATCH 2/6] python: add convert_values member to ConstraintParser class Problem: Values are always returned as strings by the ConstraintParser parser, but there are times when another type may be more suitable. Additionally, there is currently no way to reduce a set of values to a single element if this is required or beneficial in the result. Add a convert_values mapping to the ConstraintParser class. If an operator is in this dictionary, then the values of a term are passed to the provided callable, which should return a new list of values as a result. --- src/bindings/python/flux/constraint/parser.py | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/src/bindings/python/flux/constraint/parser.py b/src/bindings/python/flux/constraint/parser.py index ca299dd524c7..d97ae0e1dc00 100644 --- a/src/bindings/python/flux/constraint/parser.py +++ b/src/bindings/python/flux/constraint/parser.py @@ -231,6 +231,16 @@ class default operator may optionally be substituted, e.g. "operand" to split values of that operator. For instance ``{"op": ","}`` would autosplit operator ``op`` values on comma. + convert_values (dict): A mapping of operator name to callable which + should take a single list argument containing the values from + the obtained from the current term for the operator after the + operator_map and split_values operations have been applied. The + callable should return a new list of values. This can be used + to convert values to a new type or to combine multiple values + into a single element, e.g. :: + + convert_values = {"ints": lambda args: [int(x) for x in args]} + combined_terms (set): A set of operator terms whose values can be combined when joined with the AND logical operator. E.g. if "test" is in ``combined_terms``, then @@ -284,6 +294,10 @@ class MyConstraintParser(ConstraintParser): # Combined terms combined_terms = set() + # Mapping of operator name to value conversion function. + # E.g. { "integer": lambda args: [ int(x) for x in args ] } + convert_values = {} + def __init__( self, lexer=None, optimize=True, debug=False, write_tables=False, **kw_args ): @@ -408,10 +422,12 @@ def p_expression_token(self, p): f"invalid character '{invalid}' in operator '{op}:'" ) + values = [value] if op in self.split_values: - p[0] = {op: value.split(self.split_values[op])} - else: - p[0] = {op: [value]} + values = value.split(self.split_values[op]) + if op in self.convert_values: + values = self.convert_values[op](values) + p[0] = {op: values} def p_quoted_token(self, p): """ From 2ff0ec975367f7586bb427e8153137568717ee6c Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 11 Jan 2024 15:57:07 -0800 Subject: [PATCH 3/6] python: add flux.job.info.strtostate and strtoresult Problem: flux_job_strtostate(3) and flux_job_strtoresult(3) are not exposed in the Python API. Add strtostate() and strtoresult() to flux.job.info. --- src/bindings/python/flux/job/info.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/bindings/python/flux/job/info.py b/src/bindings/python/flux/job/info.py index 376f965e345a..9ecb61637ab1 100644 --- a/src/bindings/python/flux/job/info.py +++ b/src/bindings/python/flux/job/info.py @@ -19,7 +19,7 @@ from itertools import chain import flux.constants -from flux.core.inner import raw +from flux.core.inner import ffi, raw from flux.job.JobID import JobID from flux.job.stats import JobStats from flux.memoized_property import memoized_property @@ -42,6 +42,12 @@ def statetostr(stateid, fmt="L"): return raw.flux_job_statetostr(stateid, fmt).decode("utf-8") +def strtostate(state): + result = ffi.new("flux_job_state_t [1]") + raw.flux_job_strtostate(state, result) + return int(result[0]) + + def statetoemoji(stateid): statestr = raw.flux_job_statetostr(stateid, "S").decode("utf-8") if statestr == "N": @@ -81,6 +87,12 @@ def resulttostr(resultid, fmt="L"): return raw.flux_job_resulttostr(resultid, fmt).decode("utf-8") +def strtoresult(arg): + result = ffi.new("flux_job_result_t [1]") + raw.flux_job_strtoresult(arg, result) + return int(result[0]) + + def resulttoemoji(resultid): if resultid != "": resultstr = raw.flux_job_resulttostr(resultid, "S").decode("utf-8") From 95a817dfb7f2132fd1db2476ef183f5b847a829b Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Mon, 22 Jan 2024 08:49:53 -0800 Subject: [PATCH 4/6] python: add JobListConstraintParser Problem: There is no user friendly way to create constraint objects that can be used with the job-list service. Add a JobListConstraintParser class which can be used to create RFC 31 constraing objects suitable for sending to the job-list service. --- src/bindings/python/flux/job/list.py | 180 ++++++++++++++++++++++++++- 1 file changed, 179 insertions(+), 1 deletion(-) diff --git a/src/bindings/python/flux/job/list.py b/src/bindings/python/flux/job/list.py index 73f342f40bbf..6c4f8518e3e8 100644 --- a/src/bindings/python/flux/job/list.py +++ b/src/bindings/python/flux/job/list.py @@ -10,13 +10,18 @@ import errno import os import pwd +import sys from collections.abc import Iterable +from datetime import datetime +from functools import reduce import flux.constants +from flux.constraint.parser import ConstraintLexer, ConstraintParser from flux.future import WaitAllFuture from flux.job import JobID -from flux.job.info import JobInfo +from flux.job.info import JobInfo, strtoresult, strtostate from flux.rpc import RPC +from flux.util import parse_datetime class JobListRPC(RPC): @@ -341,3 +346,176 @@ def jobs(self): if hasattr(rpc, "errors"): self.errors = rpc.errors return [JobInfo(job) for job in jobs] + + +def job_list_filter_to_mask(args, conv): + """ + Convert all job state or result strings with conv() and combine into + a single state or result mask as accepted by the job-list constraints. + + This is a convenience function for the JobListConstraintParser class. + + Args: + args (list): list of values to convert + conv (callable): function to call on each arg to convert to a state + or result mask. + """ + return reduce(lambda x, y: x | y, map(conv, args)) + + +class JobListConstraintParser(ConstraintParser): + operator_map = { + None: "filter", + "id": "jobid", + "host": "hostlist", + "hosts": "hostlist", + "rank": "ranks", + } + split_values = {"states": ",", "results": ",", "userid": ","} + convert_values = { + "userid": lambda args: [int(x) for x in args], + "states": lambda args: [job_list_filter_to_mask(args, strtostate)], + "results": lambda args: [job_list_filter_to_mask(args, strtoresult)], + } + valid_states = ( + "depend", + "priority", + "sched", + "run", + "cleanup", + "inactive", + "pending", + "running", + "active", + ) + valid_results = ("completed", "failed", "canceled", "timeout") + + def convert_filter(self, arg): + # + # This is a generic state/result filter for backwards compat with + # --filter=. Split into separate states and results operators and + # return the new term(s) (joined by 'or' since that preserves the + # behavior of `--filter`). + # + states = [] + results = [] + for name in arg.split(","): + name = name.lower() + if name in self.valid_states: + states.append(name) + elif name in self.valid_results: + results.append(name) + else: + raise ValueError(f"Invalid filter specified: {name}") + arg = "" + if states: + arg += "states:" + ",".join(states) + " " + if results: + arg += "or " + if results: + arg += "results:" + ",".join(results) + return arg.rstrip() + + @staticmethod + def convert_user(arg): + op, _, arg = arg.partition(":") + users = [] + for user in arg.split(","): + try: + users.append(str(int(user))) + except ValueError: + users.append(str(pwd.getpwnam(user).pw_uid)) + return "userid:" + ",".join(users) + + @staticmethod + def convert_datetime(dt): + if isinstance(dt, (float, int)): + if dt == 0: + # A datetime of zero indicates unset, or an arbitrary time + # in the future. Return 12 months from now. + return parse_datetime("+12m") + dt = datetime.fromtimestamp(dt).astimezone() + else: + dt = parse_datetime(dt, assumeFuture=False) + return dt.timestamp() + + def convert_range(self, arg): + arg = arg[1:] + if ".." in arg: + start, end = arg.split("..") + arg = "(not (" + if start: + dt = self.convert_datetime(start) + arg += f"'t_cleanup:<{dt}'" + if start and end: + arg += " or " + if end: + dt = self.convert_datetime(end) + arg += f"'t_run:>{dt}'" + arg += "))" + else: + dt = self.convert_datetime(arg) + arg = f"(t_run:'<={dt}' and t_cleanup:'>={dt}')" + return arg + + def convert_timeop(self, arg): + op, _, arg = arg.partition(":") + prefix = "" + if arg[0] in (">", "<"): + if arg[1] == "=": + prefix = arg[:2] + arg = arg[2:] + else: + prefix = arg[0] + arg = arg[1:] + arg = self.convert_datetime(arg) + return f"'{op}:{prefix}{arg}'" + + def convert_token(self, arg): + if arg.startswith("@"): + return self.convert_range(arg) + if arg.startswith("t_"): + return self.convert_timeop(arg) + if arg.startswith("user:"): + return self.convert_user(arg) + if ":" not in arg: + return self.convert_filter(arg) + return f"'{arg}'" + + def parse(self, string, debug=False): + # First pass: traverse all tokens and apply convenience conversions + expression = "" + lexer = ConstraintLexer() + lexer.input(str(string)) + if debug: + print(f"input: {string}", file=sys.stderr) + + # Get all tokens first so we can do lookahead in the next step for + # proper use of whitespace: + tokens = [] + while True: + tok = lexer.token() + if tok is None: + break + tokens.append(tok) + + # Reconstruct expression while converting tokens: + for i, tok in enumerate(tokens): + next_tok = None + if i < len(tokens) - 1: + next_tok = tokens[i + 1] + if debug: + print(tok, file=sys.stderr) + if tok.type != "TOKEN": + expression += tok.value + else: + expression += self.convert_token(tok.value) + if tok.type not in ("LPAREN", "NEGATE") and ( + next_tok and next_tok.type not in ("RPAREN") + ): + expression += " " + + if debug: + print(f"expression: '{expression}'", file=sys.stderr) + + return super().parse(expression) From 12d87a8b5067270638b493d8f37ba735b0854acf Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 25 Jan 2024 16:13:28 -0800 Subject: [PATCH 5/6] python: add flux.util.FilterActionConcatenate Problem: The flux-jobs(1) -f, --filter option is being updated to take a query string, but there is no "filter" based argparse action to handle this case. Add flux.util.FilterActionConcatenate which concatenates multiple --filter strings together, separated by space. --- src/bindings/python/flux/util.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/bindings/python/flux/util.py b/src/bindings/python/flux/util.py index 0ead970814b2..9c20f08cd609 100644 --- a/src/bindings/python/flux/util.py +++ b/src/bindings/python/flux/util.py @@ -226,6 +226,17 @@ def __call__(self, parser, namespace, values, option_string=None): getattr(namespace, self.dest).update(values) +class FilterActionConcatenate(argparse.Action): + """Concatenate filter arguments separated with space""" + + def __call__(self, parser, namespace, values, option_string=None): + setattr(namespace, "filtered", True) + current = getattr(namespace, self.dest) + if current is not None: + values = current + " " + values + setattr(namespace, self.dest, values) + + # pylint: disable=redefined-builtin class FilterTrueAction(argparse.Action): def __init__( From 635a4ccd29df2885bee1c2f4570f87883145025c Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 25 Jan 2024 16:23:12 -0800 Subject: [PATCH 6/6] flux-jobs: support constraint query string with `-f, --filter` Problem: There is no way to provide an arbitrary constraint to flux-jobs(1). Update the `-f, --filter` option of flux-jobs(1) to take a query string that will be passed to the JobList constraint parameter instead of the filter paramter. --- src/cmd/flux-jobs.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/cmd/flux-jobs.py b/src/cmd/flux-jobs.py index 91b66914c4a9..53126658b477 100755 --- a/src/cmd/flux-jobs.py +++ b/src/cmd/flux-jobs.py @@ -20,9 +20,11 @@ from flux.hostlist import Hostlist from flux.idset import IDset from flux.job import JobID, JobInfo, JobInfoFormat, JobList, job_fields_to_attrs +from flux.job.list import JobListConstraintParser from flux.job.stats import JobStats from flux.util import ( FilterAction, + FilterActionConcatenate, FilterActionSetUpdate, FilterTrueAction, UtilConfig, @@ -153,18 +155,17 @@ def fetch_jobs_flux(args, fields, flux_handle=None): if args.filter: LOGGER.warning("Both -a and --filter specified, ignoring -a") else: - args.filter.update(["pending", "running", "inactive"]) + args.filter = "pending,running,inactive" if not args.filter: - args.filter = {"pending", "running"} + args.filter = "pending,running" - constraint = None if args.include: try: - constraint = {"ranks": [IDset(args.include).encode()]} + args.filter += " ranks:" + IDset(args.include).encode() except ValueError: try: - constraint = {"hostlist": [Hostlist(args.include).encode()]} + args.filter += " host:" + Hostlist(args.include).encode() except ValueError: raise ValueError(f"-i/--include: invalid targets: {args.include}") @@ -172,13 +173,12 @@ def fetch_jobs_flux(args, fields, flux_handle=None): flux_handle, ids=args.jobids, attrs=attrs, - filters=args.filter, user=args.user, max_entries=args.count, since=since, name=args.name, queue=args.queue, - constraint=constraint, + constraint=JobListConstraintParser().parse(args.filter), ) jobs = jobs_rpc.jobs() @@ -231,10 +231,9 @@ def parse_args(): parser.add_argument( "-f", "--filter", - action=FilterActionSetUpdate, - metavar="STATE|RESULT", - default=set(), - help="List jobs with specific job state or result", + action=FilterActionConcatenate, + metavar="QUERY", + help="Restrict jobs using a constraint query string", ) parser.add_argument( "--since",