Skip to content

Commit

Permalink
feat: add prototype/experiment of testing multiple applications per pod
Browse files Browse the repository at this point in the history
combined with the fluxion scheduler as a service,
this could be a pretty cool idea. I am not sold on this
being a good idea for production, but I think it will
afford interesting experiments and workflow designs.

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Jun 27, 2024
1 parent f8254dc commit b55a632
Show file tree
Hide file tree
Showing 23 changed files with 296 additions and 91 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build the manager binary
FROM golang:1.20 as builder
FROM golang:1.20 AS builder

WORKDIR /workspace

Expand Down
29 changes: 23 additions & 6 deletions api/v1alpha2/minicluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ type MiniClusterSpec struct {
// +optional
Interactive bool `json:"interactive"`

// Allow >1 Flux running (oversubscribing resources)
// +kubebuilder:default=false
// +optional
Oversubscribe bool `json:"oversubscribe"`

// Flux options for the broker, shared across cluster
// +optional
Flux FluxSpec `json:"flux"`
Expand Down Expand Up @@ -821,13 +826,16 @@ func (f *MiniCluster) Validate() bool {
// Count the FluxRunners
if container.RunFlux {
fluxRunners += 1
}

// Give all flux containers a name, if not provided
if container.Name == "" {

// Non flux-runners are required to have a name
} else {
if container.Name == "" {
fmt.Printf("😥️ %s is missing a name\n", name)
return false
// Maintain previous behavior to have name == main flux runner
if i == 0 {
container.Name = f.Name
}
container.Name = fmt.Sprintf("%s-%d", container.Name, i)
}

// If a custom script is provided AND a command, no go
Expand All @@ -836,7 +844,16 @@ func (f *MiniCluster) Validate() bool {
return false
}
}
if fluxRunners != 1 {

// If we have more than one flux runner, must explicitly oversubscribe
if fluxRunners > 1 && !f.Spec.Oversubscribe {
fmt.Printf("😥️ More than one flux runner requires oversubscribe: true\n")
valid = false
}

// More than one container can run Flux (and the brokers see the same resources)
// But we need at least one!
if fluxRunners < 1 {
valid = false
}

Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha2/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,11 @@
"default": {},
"$ref": "#/definitions/Network"
},
"oversubscribe": {
"description": "Allow \u003e1 Flux running (oversubscribing resources)",
"type": "boolean",
"default": false
},
"pod": {
"description": "Pod spec details",
"default": {},
Expand Down
8 changes: 8 additions & 0 deletions api/v1alpha2/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions chart/templates/minicluster-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,10 @@ spec:
description: Name for cluster headless service
type: string
type: object
oversubscribe:
default: false
description: Allow >1 Flux running (oversubscribing resources)
type: boolean
pod:
description: Pod spec details
properties:
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/flux-framework.org_miniclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,10 @@ spec:
description: Name for cluster headless service
type: string
type: object
oversubscribe:
default: false
description: Allow >1 Flux running (oversubscribing resources)
type: boolean
pod:
description: Pod spec details
properties:
Expand Down
1 change: 0 additions & 1 deletion config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ kind: Kustomization
images:
- name: controller
newName: ghcr.io/flux-framework/flux-operator
newTag: test
6 changes: 4 additions & 2 deletions controllers/flux/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func getFluxContainer(

func getContainers(
specs []api.MiniClusterContainer,
defaultName string,
customName string,
mounts []corev1.VolumeMount,
serviceContainer bool,
) ([]corev1.Container, error) {
Expand Down Expand Up @@ -82,9 +82,11 @@ func getContainers(
// wait.sh path corresponds to container identifier
waitScript := fmt.Sprintf("/flux_operator/wait-%d.sh", i)
command = []string{"/bin/bash", waitScript}
containerName = defaultName
}

if customName != "" {
containerName = customName
}
// A container not running flux can only have pre/post sections
// in a custom script if we know the entrypoint.
if container.GenerateEntrypoint() && !serviceContainer {
Expand Down
4 changes: 3 additions & 1 deletion controllers/flux/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ func NewMiniClusterJob(cluster *api.MiniCluster) (*batchv1.Job, error) {
}

// Prepare listing of containers for the MiniCluster
// We don't provide a default name because defaults are provided in Validate()
// Only service containers have a custom name here
containers, err := getContainers(
cluster.Spec.Containers,
cluster.Name,
"",
mounts,
false,
)
Expand Down
10 changes: 10 additions & 0 deletions docs/getting_started/custom-resource-definition.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ This would be equivalent to giving a start command of `sleep infinity` however o
(e.g., if there is a flux shutdown from within the Flux instance) the sleep command would
not exit with a failed code.

### oversubscribe

By default, we treat your single application container _or_ the single container in a MiniCluster pod designated to "runFlux" as the only Flux broker. When oversubscribe is set to true, you are allowed to define more than one "runFlux" container, meaning that multiple brokers will be sharing the same resources.

```yaml
oversubscribe: true
```

We created this use case with the intention of having a service container running fluxion alongside the MiniCluster to orchestrate the N containers. This is consiedered an advanced use case and you should use it with caution!

### launcher

If you are using an executor that launches Flux Jobs (e.g., workflow managers such as Snakemake and Nextflow do!)
Expand Down
1 change: 1 addition & 0 deletions docs/tutorials/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ The following tutorials are provided from their respective directories (and are

These examples show how to interact with your flux queue from a sidecar container (that has access to the flux broker of the pod):

- [multiple-applications-per-pod](https://github.com/flux-framework/flux-operator/tree/main/examples/experimental/multiple-applications-per-pod): Allow multiple pods to be scheduled per node (controlled by cgroups)
- [flux-sidecar](https://github.com/flux-framework/flux-operator/blob/main/examples/tests/flux-sidecar) to see a sleep job in the main application queue

### Services
Expand Down
4 changes: 4 additions & 0 deletions examples/dist/flux-operator-arm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,10 @@ spec:
description: Name for cluster headless service
type: string
type: object
oversubscribe:
default: false
description: Allow >1 Flux running (oversubscribing resources)
type: boolean
pod:
description: Pod spec details
properties:
Expand Down
4 changes: 4 additions & 0 deletions examples/dist/flux-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,10 @@ spec:
description: Name for cluster headless service
type: string
type: object
oversubscribe:
default: false
description: Allow >1 Flux running (oversubscribing resources)
type: boolean
pod:
description: Pod spec details
properties:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
apiVersion: flux-framework.org/v1alpha2
kind: MiniCluster
metadata:
name: flux-sample
spec:
size: 4
interactive: true

# shell in, and then:
# source /mnt/flux/flux-view.sh
# flux proxy $fluxsocket bash
# flux resource list

# 1. Create resource graph independent of what hwloc does.
# 2. Will need to pass JGF into fluxion when it starts (try this out for R and starting the broker)
# 3. crearte resource graph with double number of resources we want. If we want N brokers, increase by factor of N.
# 4. Each brokers needs to get one 1/N of thart resource graph
# 5. When we submit jobspec, we need to submit with entire resource graph (request 4x the number of resouresce we want)
# the entire resource graph is allocated to that job
# for the executable we have to specify which part of graph we are execurting.
# all brokers at node level

# Note that:
# 1. all the containers here have an ubuntu 20.04 base!
# 2. The non-flux runners also need a name.
# 3. Since we control the logic of the sidecars, we need to add
# an entrypoint that keeps them running. Otherwise, it jumps to
# "not ready"
# 4. The issue we will run into is that the job won't complete when
# the main flux running shuts down. It will need to be deleted.
flux:
container:
image: ghcr.io/converged-computing/flux-view-ubuntu:tag-focal

# This ensures that fluxion is running as a service to the MiniCluster
services:
- image: ghcr.io/converged-computing/fluxion:latest
command: /code/bin/server --host 0.0.0.0
name: fluxion

# This starts the flux broker without a command (interactive)
interactive: true

# A required marker from the user that they want multiple runFlux
# to work. This is considered an advanced use case.
oversubscribe: true

containers:

# This is a faux "queue only" broker container. It will be
# the interface to which we submit jobs. We don't run Flux
# but we will orchestrate running things.
- image: rockylinux:9
name: queue

# TODO we will need to allow the other ones to still see flux
- image: ghcr.io/rse-ops/lammps-matrix:mpich-ubuntu-20.04-amd64
name: lammps
workingDir: /opt/lammps/examples/reaxff/HNS
command: run_interactive_cluster
runFlux: true

# command: lmp -v x 2 -v y 2 -v z 2 -in in.reaxc.hns -nocite
- image: ghcr.io/converged-computing/metric-ior:latest
name: ior
command: run_interactive_cluster
runFlux: true

- image: ghcr.io/converged-computing/metric-chatterbug:latest
name: chatterbug
command: run_interactive_cluster
runFlux: true
8 changes: 7 additions & 1 deletion pkg/flux/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ func getRandomToken(requested string) string {
}

// generateHostlist for a specific size given the cluster namespace and a size
func generateHostlist(cluster *api.MiniCluster, size int32) string {
// Note that we don't customize on the level of the container, but I'm
// generating them separately anticipating wanting slightly different setups.
func generateHostlist(
cluster *api.MiniCluster,
container api.MiniClusterContainer,
size int32,
) string {

var hosts string
if cluster.Spec.Flux.Bursting.Hostlist != "" {
Expand Down
30 changes: 18 additions & 12 deletions pkg/flux/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func GenerateEntrypoints(cluster *api.MiniCluster) (map[string]string, error) {
// Custom logic for a sidecar container alongside flux
if container.GenerateEntrypoint() {
startScriptID := fmt.Sprintf("start-%d", i)
startScript, err := generateServiceEntrypoint(cluster, container)
startScript, err := generateServiceEntrypoint(cluster, container, i)
if err != nil {
return data, err
}
Expand All @@ -58,11 +58,16 @@ func GenerateEntrypoints(cluster *api.MiniCluster) (map[string]string, error) {
}

// generateServiceEntrypoint generates an entrypoint for a service container
func generateServiceEntrypoint(cluster *api.MiniCluster, container api.MiniClusterContainer) (string, error) {
func generateServiceEntrypoint(
cluster *api.MiniCluster,
container api.MiniClusterContainer,
containerIndex int) (string, error) {

st := ServiceTemplate{
ViewBase: cluster.Spec.Flux.Container.MountPath,
Container: container,
Spec: cluster.Spec,
ViewBase: cluster.Spec.Flux.Container.MountPath,
Container: container,
ContainerIndex: containerIndex,
Spec: cluster.Spec,
}

// Wrap the named template to identify it later
Expand All @@ -88,7 +93,7 @@ func generateEntrypointScript(
) (string, error) {

container := cluster.Spec.Containers[containerIndex]
mainHost := fmt.Sprintf("%s-0", cluster.Name)
mainHost := fmt.Sprintf("%s-0", container.Name)

// Ensure if we have a batch command, it gets split up
batchCommand := strings.Split(container.Command, "\n")
Expand All @@ -99,12 +104,13 @@ func generateEntrypointScript(

// The token uuid is the same across images
wt := WaitTemplate{
RequiredRanks: requiredRanks,
ViewBase: cluster.Spec.Flux.Container.MountPath,
Container: container,
MainHost: mainHost,
Spec: cluster.Spec,
Batch: batchCommand,
RequiredRanks: requiredRanks,
ViewBase: cluster.Spec.Flux.Container.MountPath,
ContainerIndex: containerIndex,
Container: container,
MainHost: mainHost,
Spec: cluster.Spec,
Batch: batchCommand,
}

// Wrap the named template to identify it later
Expand Down
12 changes: 8 additions & 4 deletions pkg/flux/scripts.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ var startComponents string

// ServiceTemplate is for a separate service container
type ServiceTemplate struct {
ViewBase string // Where the mounted view with flux is expected to be
Container api.MiniClusterContainer
Spec api.MiniClusterSpec
ViewBase string // Where the mounted view with flux is expected to be
Container api.MiniClusterContainer
ContainerIndex int
Spec api.MiniClusterSpec
}

// WaitTemplate populates wait.sh for an application container entrypoint
Expand All @@ -40,7 +41,10 @@ type WaitTemplate struct {
MainHost string // Main host identifier
FluxToken string // Token to log into the UI, should be consistent across containers
Container api.MiniClusterContainer
Spec api.MiniClusterSpec

// Index for container, for generation of unique socket path
ContainerIndex int
Spec api.MiniClusterSpec

// Broker initial quorum that must be online to start
// This is used if the cluster MaxSize > Size
Expand Down
Loading

0 comments on commit b55a632

Please sign in to comment.