Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Experimental: Add JobList constraint parser and support constraint query strings in flux jobs -f #5711

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions src/bindings/python/flux/constraint/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,16 @@ class default operator may optionally be substituted, e.g. "operand"
to split values of that operator. For instance ``{"op": ","}``
would autosplit operator ``op`` values on comma.

convert_values (dict): A mapping of operator name to callable which
should take a single list argument containing the values from
the obtained from the current term for the operator after the
operator_map and split_values operations have been applied. The
callable should return a new list of values. This can be used
to convert values to a new type or to combine multiple values
into a single element, e.g. ::

convert_values = {"ints": lambda args: [int(x) for x in args]}

combined_terms (set): A set of operator terms whose values can be
combined when joined with the AND logical operator. E.g. if
"test" is in ``combined_terms``, then
Expand Down Expand Up @@ -284,6 +294,10 @@ class MyConstraintParser(ConstraintParser):
# Combined terms
combined_terms = set()

# Mapping of operator name to value conversion function.
# E.g. { "integer": lambda args: [ int(x) for x in args ] }
convert_values = {}

def __init__(
self, lexer=None, optimize=True, debug=False, write_tables=False, **kw_args
):
Expand Down Expand Up @@ -408,10 +422,12 @@ def p_expression_token(self, p):
f"invalid character '{invalid}' in operator '{op}:'"
)

values = [value]
if op in self.split_values:
p[0] = {op: value.split(self.split_values[op])}
else:
p[0] = {op: [value]}
values = value.split(self.split_values[op])
if op in self.convert_values:
values = self.convert_values[op](values)
p[0] = {op: values}

def p_quoted_token(self, p):
"""
Expand Down
14 changes: 13 additions & 1 deletion src/bindings/python/flux/job/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from itertools import chain

import flux.constants
from flux.core.inner import raw
from flux.core.inner import ffi, raw
from flux.job.JobID import JobID
from flux.job.stats import JobStats
from flux.memoized_property import memoized_property
Expand All @@ -42,6 +42,12 @@ def statetostr(stateid, fmt="L"):
return raw.flux_job_statetostr(stateid, fmt).decode("utf-8")


def strtostate(state):
result = ffi.new("flux_job_state_t [1]")
raw.flux_job_strtostate(state, result)
return int(result[0])


def statetoemoji(stateid):
statestr = raw.flux_job_statetostr(stateid, "S").decode("utf-8")
if statestr == "N":
Expand Down Expand Up @@ -81,6 +87,12 @@ def resulttostr(resultid, fmt="L"):
return raw.flux_job_resulttostr(resultid, fmt).decode("utf-8")


def strtoresult(arg):
result = ffi.new("flux_job_result_t [1]")
raw.flux_job_strtoresult(arg, result)
return int(result[0])


def resulttoemoji(resultid):
if resultid != "":
resultstr = raw.flux_job_resulttostr(resultid, "S").decode("utf-8")
Expand Down
180 changes: 179 additions & 1 deletion src/bindings/python/flux/job/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@
import errno
import os
import pwd
import sys
from collections.abc import Iterable
from datetime import datetime
from functools import reduce

import flux.constants
from flux.constraint.parser import ConstraintLexer, ConstraintParser
from flux.future import WaitAllFuture
from flux.job import JobID
from flux.job.info import JobInfo
from flux.job.info import JobInfo, strtoresult, strtostate
from flux.rpc import RPC
from flux.util import parse_datetime


class JobListRPC(RPC):
Expand Down Expand Up @@ -341,3 +346,176 @@
if hasattr(rpc, "errors"):
self.errors = rpc.errors
return [JobInfo(job) for job in jobs]


def job_list_filter_to_mask(args, conv):
"""
Convert all job state or result strings with conv() and combine into
a single state or result mask as accepted by the job-list constraints.

This is a convenience function for the JobListConstraintParser class.

Args:
args (list): list of values to convert
conv (callable): function to call on each arg to convert to a state
or result mask.
"""
return reduce(lambda x, y: x | y, map(conv, args))


class JobListConstraintParser(ConstraintParser):
operator_map = {
None: "filter",
"id": "jobid",
"host": "hostlist",
"hosts": "hostlist",
"rank": "ranks",
}
split_values = {"states": ",", "results": ",", "userid": ","}
convert_values = {
"userid": lambda args: [int(x) for x in args],
"states": lambda args: [job_list_filter_to_mask(args, strtostate)],
"results": lambda args: [job_list_filter_to_mask(args, strtoresult)],
}
valid_states = (
"depend",
"priority",
"sched",
"run",
"cleanup",
"inactive",
"pending",
"running",
"active",
)
valid_results = ("completed", "failed", "canceled", "timeout")

def convert_filter(self, arg):
#
# This is a generic state/result filter for backwards compat with
# --filter=. Split into separate states and results operators and
# return the new term(s) (joined by 'or' since that preserves the
# behavior of `--filter`).
#
states = []
results = []
for name in arg.split(","):
name = name.lower()
if name in self.valid_states:
states.append(name)
elif name in self.valid_results:
results.append(name)
else:
raise ValueError(f"Invalid filter specified: {name}")
arg = ""
if states:
arg += "states:" + ",".join(states) + " "
if results:
arg += "or "
if results:
arg += "results:" + ",".join(results)
return arg.rstrip()

@staticmethod
def convert_user(arg):
op, _, arg = arg.partition(":")
users = []
for user in arg.split(","):
try:
users.append(str(int(user)))
except ValueError:
users.append(str(pwd.getpwnam(user).pw_uid))
return "userid:" + ",".join(users)

Check warning on line 428 in src/bindings/python/flux/job/list.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/job/list.py#L421-L428

Added lines #L421 - L428 were not covered by tests

@staticmethod
def convert_datetime(dt):
if isinstance(dt, (float, int)):
if dt == 0:

Check warning on line 433 in src/bindings/python/flux/job/list.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/job/list.py#L432-L433

Added lines #L432 - L433 were not covered by tests
# A datetime of zero indicates unset, or an arbitrary time
# in the future. Return 12 months from now.
return parse_datetime("+12m")
dt = datetime.fromtimestamp(dt).astimezone()

Check warning on line 437 in src/bindings/python/flux/job/list.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/job/list.py#L436-L437

Added lines #L436 - L437 were not covered by tests
else:
dt = parse_datetime(dt, assumeFuture=False)
return dt.timestamp()

Check warning on line 440 in src/bindings/python/flux/job/list.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/job/list.py#L439-L440

Added lines #L439 - L440 were not covered by tests

def convert_range(self, arg):
arg = arg[1:]
if ".." in arg:
start, end = arg.split("..")
arg = "(not ("
if start:
dt = self.convert_datetime(start)
arg += f"'t_cleanup:<{dt}'"
if start and end:
arg += " or "
if end:
dt = self.convert_datetime(end)
arg += f"'t_run:>{dt}'"
arg += "))"

Check warning on line 455 in src/bindings/python/flux/job/list.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/job/list.py#L443-L455

Added lines #L443 - L455 were not covered by tests
else:
dt = self.convert_datetime(arg)
arg = f"(t_run:'<={dt}' and t_cleanup:'>={dt}')"
return arg

Check warning on line 459 in src/bindings/python/flux/job/list.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/job/list.py#L457-L459

Added lines #L457 - L459 were not covered by tests

def convert_timeop(self, arg):
op, _, arg = arg.partition(":")
prefix = ""
if arg[0] in (">", "<"):
if arg[1] == "=":
prefix = arg[:2]
arg = arg[2:]

Check warning on line 467 in src/bindings/python/flux/job/list.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/job/list.py#L462-L467

Added lines #L462 - L467 were not covered by tests
else:
prefix = arg[0]
arg = arg[1:]
arg = self.convert_datetime(arg)
return f"'{op}:{prefix}{arg}'"

Check warning on line 472 in src/bindings/python/flux/job/list.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/job/list.py#L469-L472

Added lines #L469 - L472 were not covered by tests

def convert_token(self, arg):
if arg.startswith("@"):
return self.convert_range(arg)

Check warning on line 476 in src/bindings/python/flux/job/list.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/job/list.py#L476

Added line #L476 was not covered by tests
if arg.startswith("t_"):
return self.convert_timeop(arg)

Check warning on line 478 in src/bindings/python/flux/job/list.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/job/list.py#L478

Added line #L478 was not covered by tests
if arg.startswith("user:"):
return self.convert_user(arg)

Check warning on line 480 in src/bindings/python/flux/job/list.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/job/list.py#L480

Added line #L480 was not covered by tests
if ":" not in arg:
return self.convert_filter(arg)
return f"'{arg}'"

def parse(self, string, debug=False):
# First pass: traverse all tokens and apply convenience conversions
expression = ""
lexer = ConstraintLexer()
lexer.input(str(string))
if debug:
print(f"input: {string}", file=sys.stderr)

Check warning on line 491 in src/bindings/python/flux/job/list.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/job/list.py#L491

Added line #L491 was not covered by tests

# Get all tokens first so we can do lookahead in the next step for
# proper use of whitespace:
tokens = []
while True:
tok = lexer.token()
if tok is None:
break
tokens.append(tok)

# Reconstruct expression while converting tokens:
for i, tok in enumerate(tokens):
next_tok = None
if i < len(tokens) - 1:
next_tok = tokens[i + 1]
if debug:
print(tok, file=sys.stderr)

Check warning on line 508 in src/bindings/python/flux/job/list.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/job/list.py#L508

Added line #L508 was not covered by tests
if tok.type != "TOKEN":
expression += tok.value

Check warning on line 510 in src/bindings/python/flux/job/list.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/job/list.py#L510

Added line #L510 was not covered by tests
else:
expression += self.convert_token(tok.value)
if tok.type not in ("LPAREN", "NEGATE") and (
next_tok and next_tok.type not in ("RPAREN")
):
expression += " "

if debug:
print(f"expression: '{expression}'", file=sys.stderr)

Check warning on line 519 in src/bindings/python/flux/job/list.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/job/list.py#L519

Added line #L519 was not covered by tests

return super().parse(expression)
16 changes: 15 additions & 1 deletion src/bindings/python/flux/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,17 @@
getattr(namespace, self.dest).update(values)


class FilterActionConcatenate(argparse.Action):
"""Concatenate filter arguments separated with space"""

def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, "filtered", True)
current = getattr(namespace, self.dest)
if current is not None:
values = current + " " + values

Check warning on line 236 in src/bindings/python/flux/util.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/util.py#L236

Added line #L236 was not covered by tests
setattr(namespace, self.dest, values)


# pylint: disable=redefined-builtin
class FilterTrueAction(argparse.Action):
def __init__(
Expand Down Expand Up @@ -331,7 +342,7 @@
return seconds


def parse_datetime(string, now=None):
def parse_datetime(string, now=None, assumeFuture=True):
"""Parse a possibly human readable datetime string or offset

If string starts with `+` or `-`, then the remainder of the string
Expand Down Expand Up @@ -369,6 +380,9 @@

cal = Calendar()
cal.ptc.StartHour = 0
if not assumeFuture:
cal.ptc.DOWParseStyle = 0
cal.ptc.YearParseStyle = 0

Check warning on line 385 in src/bindings/python/flux/util.py

View check run for this annotation

Codecov / codecov/patch

src/bindings/python/flux/util.py#L384-L385

Added lines #L384 - L385 were not covered by tests
time_struct, status = cal.parse(string, sourceTime=now.timetuple())
if status == 0:
raise ValueError(f'Invalid datetime: "{string}"')
Expand Down
21 changes: 10 additions & 11 deletions src/cmd/flux-jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
from flux.hostlist import Hostlist
from flux.idset import IDset
from flux.job import JobID, JobInfo, JobInfoFormat, JobList, job_fields_to_attrs
from flux.job.list import JobListConstraintParser
from flux.job.stats import JobStats
from flux.util import (
FilterAction,
FilterActionConcatenate,
FilterActionSetUpdate,
FilterTrueAction,
UtilConfig,
Expand Down Expand Up @@ -153,32 +155,30 @@ def fetch_jobs_flux(args, fields, flux_handle=None):
if args.filter:
LOGGER.warning("Both -a and --filter specified, ignoring -a")
else:
args.filter.update(["pending", "running", "inactive"])
args.filter = "pending,running,inactive"

if not args.filter:
args.filter = {"pending", "running"}
args.filter = "pending,running"

constraint = None
if args.include:
try:
constraint = {"ranks": [IDset(args.include).encode()]}
args.filter += " ranks:" + IDset(args.include).encode()
except ValueError:
try:
constraint = {"hostlist": [Hostlist(args.include).encode()]}
args.filter += " host:" + Hostlist(args.include).encode()
except ValueError:
raise ValueError(f"-i/--include: invalid targets: {args.include}")

jobs_rpc = JobList(
flux_handle,
ids=args.jobids,
attrs=attrs,
filters=args.filter,
user=args.user,
max_entries=args.count,
since=since,
name=args.name,
queue=args.queue,
constraint=constraint,
constraint=JobListConstraintParser().parse(args.filter),
)

jobs = jobs_rpc.jobs()
Expand Down Expand Up @@ -231,10 +231,9 @@ def parse_args():
parser.add_argument(
"-f",
"--filter",
action=FilterActionSetUpdate,
metavar="STATE|RESULT",
default=set(),
help="List jobs with specific job state or result",
action=FilterActionConcatenate,
metavar="QUERY",
help="Restrict jobs using a constraint query string",
)
parser.add_argument(
"--since",
Expand Down
Loading