Skip to content

Commit

Permalink
Merge pull request flux-framework#6149 from grondo/issue#6141
Browse files Browse the repository at this point in the history
cmd: support multiple queues in `flux jobs`, `pgrep`, and `pkill` `-q, --queue` option
  • Loading branch information
mergify[bot] authored Jul 30, 2024
2 parents 3dc3207 + 14c2571 commit f9bcdbe
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 42 deletions.
5 changes: 3 additions & 2 deletions doc/man1/flux-jobs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ OPTIONS

List jobs with a specific job name.

.. option:: -q, --queue=[QUEUE]
.. option:: -q, --queue=QUEUE[,...]

List jobs in a specific queue.
List jobs in a specific queue or queues. Multiple queues may be separated
by a comma or by using the :option:`-q, --queue` option multiple times.

.. option:: -c, --count=N

Expand Down
6 changes: 4 additions & 2 deletions doc/man1/flux-pgrep.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ OPTIONS
results can be listed separated by comma. See the JOB STATUS section
of the :man1:`flux-jobs` manual for more detail.

.. option:: -q, --queue=QUEUE
.. option:: -q, --queue=QUEUE[,...]

Only include jobs in the named queue *QUEUE*.
Only include jobs in the named queue *QUEUE*. Multiple queues may be
specified as a comma-separated list, or by using the :option:`--queue`
option multiple times.

.. option:: -c, --count=N

Expand Down
9 changes: 7 additions & 2 deletions src/bindings/python/flux/job/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import errno
import os
import pwd
from collections.abc import Iterable

import flux.constants
from flux.future import WaitAllFuture
Expand Down Expand Up @@ -58,7 +59,11 @@ def job_list(
if name:
constraint["and"].append({"name": [name]})
if queue:
constraint["and"].append({"queue": [queue]})
if isinstance(queue, str):
queue = [queue]
if not isinstance(queue, Iterable):
raise ValueError("queue parameter must be a string or iterable")
constraint["and"].append({"queue": list(queue)})
if states and results:
tmp = {"or": []}
tmp["or"].append({"states": [states]})
Expand Down Expand Up @@ -201,7 +206,7 @@ class JobList:
:max_entries: Maximum number of jobs to return
:since: Limit jobs to those that have been active since a given timestamp.
:name: Limit jobs to those with a specific name.
:queue: Limit jobs to those submitted to a specific queue.
:queue: Limit jobs to those submitted to a specific queue or queues
"""

# pylint: disable=too-many-instance-attributes
Expand Down
96 changes: 67 additions & 29 deletions src/bindings/python/flux/job/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
# SPDX-License-Identifier: LGPL-3.0
###############################################################

import itertools

from flux.rpc import RPC


Expand All @@ -33,44 +35,80 @@ class JobStats:
"""

states = (
"depend",
"priority",
"sched",
"run",
"cleanup",
"inactive",
"total",
)
stats = (
"successful",
"failed",
"timeout",
"canceled",
"inactive_purged",
)
derived_stats = (
"pending",
"running",
"active",
)

class QueueStats:
"""Container for a set of per-queue stats"""

def __init__(self, stats=None):
if stats is None:
self.queue_name = ""
for stat in itertools.chain(JobStats.states, JobStats.stats):
setattr(self, stat, 0)
return

self.queue_name = stats["name"] if "name" in stats else "all"
# Move all stats to top-level attributes of this object
for state in JobStats.states:
setattr(self, state, stats["job_states"][state])
for stat in JobStats.stats:
setattr(self, stat, stats[stat])

def __iadd__(self, other):
self.queue_name += "," + other.queue_name
for stat in itertools.chain(JobStats.states, JobStats.stats):
setattr(self, stat, getattr(self, stat) + getattr(other, stat))
return self

def __init__(self, handle, queue=None):
"""Initialize a JobStats object with Flux handle ``handle``"""
self.handle = handle
self.queue = queue
self.queues = []
# Accept queue as str or iterable
if queue is not None:
self.queues.extend([queue] if isinstance(queue, str) else queue)
self.callback = None
self.cb_kwargs = {}
for attr in [
"depend",
"priority",
"sched",
"run",
"cleanup",
"inactive",
"successful",
"failed",
"timeout",
"canceled",
"inactive_purged",
"pending",
"running",
"active",
]:
for attr in itertools.chain(
JobStats.states, JobStats.stats, JobStats.derived_stats
):
setattr(self, attr, -1)

def _update_cb(self, rpc):
resp = rpc.get()
if self.queue:
tmpstat = None
if resp["queues"]:
tmpstat = [x for x in resp["queues"] if x["name"] == self.queue]
if not tmpstat:
raise ValueError(f"no stats available for queue {self.queue}")
resp = tmpstat[0]

for state, count in resp["job_states"].items():
setattr(self, state, count)
for state in ["successful", "failed", "timeout", "canceled", "inactive_purged"]:
setattr(self, state, resp[state])
queues = {x["name"]: self.QueueStats(x) for x in resp["queues"]}
if self.queues:
qstat = self.QueueStats()
for queue in self.queues:
try:
qstat += queues[queue]
except KeyError:
raise ValueError(f"no stats available for queue {queue}")
else:
qstat = self.QueueStats(resp)

for attr in itertools.chain(JobStats.states, JobStats.stats):
setattr(self, attr, getattr(qstat, attr))

# Compute some stats for convenience:
# pylint: disable=attribute-defined-outside-init
Expand Down
8 changes: 4 additions & 4 deletions src/cmd/flux-jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,10 @@ def parse_args():
parser.add_argument(
"-q",
"--queue",
action=FilterAction,
type=str,
metavar="QUEUE",
help="Limit output to specific queue",
action=FilterActionSetUpdate,
default=set(),
metavar="QUEUE,...",
help="Limit output to specific queue or queues",
)
parser.add_argument(
"-o",
Expand Down
6 changes: 3 additions & 3 deletions src/cmd/flux-pgrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ def parse_args():
parser.add_argument(
"-q",
"--queue",
type=str,
metavar="QUEUE",
help="Limit output to specific queue",
type=FilterActionSetUpdate,
metavar="QUEUE,...",
help="Limit output to specific queue or queues",
)
parser.add_argument(
"-c",
Expand Down
4 changes: 4 additions & 0 deletions t/t2800-jobs-cmd.t
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ test_expect_success 'flux-jobs --queue works' '
test_debug "flux jobs -an --queue=foobar" &&
test $(flux jobs -an --queue=foobar | wc -l) -eq 0
'
test_expect_success 'flux-jobs --queue accepts multiple queues' '
test $(flux jobs -anq queue1,queue2 | wc -l) \
-eq $(job_list_state_count completed sched run)
'

# Recall pending = depend | priority | sched, running = run | cleanup,
# active = pending | running
Expand Down

0 comments on commit f9bcdbe

Please sign in to comment.