Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Nomad Job Event Handling #682

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 54 additions & 15 deletions internal/nomad/event_stream_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (a *APIClient) initializeAllocations(environmentID dto.EnvironmentID) {
continue
case stub.ClientStatus == structs.AllocClientStatusPending || stub.ClientStatus == structs.AllocClientStatusRunning:
log.WithField("jobID", stub.JobID).WithField("status", stub.ClientStatus).Debug("Recovered Allocation")
a.jobAllocationMapping.Add(stub.JobID, stub.ID)
a.allocations.Add(stub.ID, &allocationData{
allocClientStatus: stub.ClientStatus,
allocDesiredStatus: stub.DesiredStatus,
Expand All @@ -90,7 +91,9 @@ func (a *APIClient) WatchEventStream(ctx context.Context, callbacks *AllocationP
case nomadApi.TopicEvaluation:
return false, handleEvaluationEvent(a.evaluations, event)
case nomadApi.TopicAllocation:
return false, handleAllocationEvent(ctx, startTime, a.allocations, event, callbacks)
return false, handleAllocationEvent(ctx, startTime, a.jobAllocationMapping, a.allocations, event, callbacks)
case nomadApi.TopicJob:
return false, handleJobEvent(ctx, a.jobAllocationMapping, a.allocations, event, callbacks)
default:
return false, nil
}
Expand Down Expand Up @@ -211,8 +214,8 @@ func checkEvaluation(eval *nomadApi.Evaluation) (err error) {
// If a new allocation is received, onNewAllocation is called. If an allocation is deleted, onDeletedAllocation
// is called. The allocations storage is used to track pending and running allocations. Using the
// storage the state is persisted between multiple calls of this function.
func handleAllocationEvent(ctx context.Context, startTime int64, allocations storage.Storage[*allocationData],
event *nomadApi.Event, callbacks *AllocationProcessing,
func handleAllocationEvent(ctx context.Context, startTime int64, jobMapping storage.Storage[string],
allocations storage.Storage[*allocationData], event *nomadApi.Event, callbacks *AllocationProcessing,
) error {
alloc, err := event.Allocation()
if err != nil {
Expand Down Expand Up @@ -243,21 +246,40 @@ func handleAllocationEvent(ctx context.Context, startTime int64, allocations sto

switch alloc.ClientStatus {
case structs.AllocClientStatusPending:
handlePendingAllocationEvent(ctx, alloc, allocData, allocations, callbacks)
handlePendingAllocationEvent(ctx, alloc, allocData, jobMapping, allocations, callbacks)
case structs.AllocClientStatusRunning:
handleRunningAllocationEvent(ctx, alloc, allocData, allocations, callbacks)
handleRunningAllocationEvent(ctx, alloc, allocData, jobMapping, allocations, callbacks)
case structs.AllocClientStatusComplete:
handleCompleteAllocationEvent(ctx, alloc, allocData, allocations, callbacks)
case structs.AllocClientStatusFailed:
handleFailedAllocationEvent(ctx, alloc, allocData, allocations, callbacks)
handleFailedAllocationEvent(ctx, alloc, allocData, jobMapping, allocations, callbacks)
case structs.AllocClientStatusLost:
handleLostAllocationEvent(ctx, alloc, allocData, allocations, callbacks)
handleLostAllocationEvent(ctx, alloc, allocData, jobMapping, allocations, callbacks)
default:
log.WithField("alloc", alloc).Warn("Other Client Status")
}
return nil
}

// handleJobEvent is an event handler that processes job events.
// On a JobDeregistered event, onDeletedAllocation is called.
func handleJobEvent(ctx context.Context, jobMapping storage.Storage[string],
allocations storage.Storage[*allocationData], event *nomadApi.Event, callbacks *AllocationProcessing,
) error {
// At this point, we do not filter out events that happened before the subscription to the event stream
// (and are left in Nomad's buffer) because the job events do only have index information instead of time information.
// As we currently only handle `JobDeregistered` events for tracked allocations, the handling of old events does not
// change the behavior.

switch event.Type {
case structs.TypeJobDeregistered:
return handleDeregisteredJobEvent(ctx, event.Key, jobMapping, allocations, callbacks)
default:
log.WithField("event", event).Trace("Ignored Job Event")
}
return nil
}

// filterDuplicateEvents identifies duplicate events or events of unknown allocations.
func filterDuplicateEvents(alloc *nomadApi.Allocation, allocations storage.Storage[*allocationData]) (valid bool) {
newAllocationExpected := alloc.ClientStatus == structs.AllocClientStatusPending &&
Expand Down Expand Up @@ -299,7 +321,7 @@ func updateAllocationData(
// handlePendingAllocationEvent manages allocation that are currently pending.
// This allows the handling of startups and re-placements of allocations.
func handlePendingAllocationEvent(ctx context.Context, alloc *nomadApi.Allocation, allocData *allocationData,
allocations storage.Storage[*allocationData], callbacks *AllocationProcessing,
jobMapping storage.Storage[string], allocations storage.Storage[*allocationData], callbacks *AllocationProcessing,
) {
var stopExpected bool
switch alloc.DesiredStatus {
Expand All @@ -315,12 +337,14 @@ func handlePendingAllocationEvent(ctx context.Context, alloc *nomadApi.Allocatio
// Handle Runner (/Container) re-allocations.
if prevData, ok := allocations.Get(alloc.PreviousAllocation); ok {
stopExpected = callbacks.OnDeleted(ctx, prevData.jobID, ErrAllocationRescheduled)
jobMapping.Delete(alloc.JobID)
allocations.Delete(alloc.PreviousAllocation)
} else {
log.WithField("alloc", alloc).Warn("Previous Allocation not found")
}
}

jobMapping.Add(alloc.JobID, alloc.ID)
// Store Pending Allocation - Allocation gets started, wait until it runs.
allocations.Add(alloc.ID, &allocationData{
allocClientStatus: alloc.ClientStatus,
Expand All @@ -332,6 +356,7 @@ func handlePendingAllocationEvent(ctx context.Context, alloc *nomadApi.Allocatio
})
case structs.AllocDesiredStatusStop:
// As this allocation was still pending, we don't have to propagate its deletion.
jobMapping.Delete(alloc.JobID)
allocations.Delete(alloc.ID)
// Anyway, we want to monitor the occurrences.
if !allocData.stopExpected {
Expand All @@ -347,20 +372,33 @@ func handlePendingAllocationEvent(ctx context.Context, alloc *nomadApi.Allocatio

// handleRunningAllocationEvent calls the passed AllocationProcessor filtering similar events.
func handleRunningAllocationEvent(ctx context.Context, alloc *nomadApi.Allocation, allocData *allocationData,
allocations storage.Storage[*allocationData], callbacks *AllocationProcessing,
jobMapping storage.Storage[string], allocations storage.Storage[*allocationData], callbacks *AllocationProcessing,
) {
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusRun:
startupDuration := time.Since(allocData.start)
callbacks.OnNew(ctx, alloc, startupDuration)
case structs.AllocDesiredStatusStop:
callbacks.OnDeleted(ctx, alloc.JobID, ErrAllocationCompleted)
jobMapping.Delete(alloc.JobID)
allocations.Delete(alloc.ID)
default:
log.WithField("alloc", alloc).Warn("Other Desired Status")
}
}

func handleDeregisteredJobEvent(ctx context.Context, jobID string,
jobMapping storage.Storage[string], allocations storage.Storage[*allocationData], callbacks *AllocationProcessing,
) error {
if allocID, ok := jobMapping.Pop(jobID); ok {
// If the allocData is already removed, another event has already handled the Deletion (or the allocation was not tracked by Poseidon).
if _, ok = allocations.Pop(allocID); ok {
callbacks.OnDeleted(ctx, jobID, ErrJobDeregistered)
}
}
return nil
}

// handleCompleteAllocationEvent handles allocations that stopped.
func handleCompleteAllocationEvent(_ context.Context, alloc *nomadApi.Allocation, _ *allocationData,
allocations storage.Storage[*allocationData], _ *AllocationProcessing,
Expand All @@ -381,24 +419,24 @@ func handleCompleteAllocationEvent(_ context.Context, alloc *nomadApi.Allocation

// handleFailedAllocationEvent logs only the last of the multiple failure events.
func handleFailedAllocationEvent(ctx context.Context, alloc *nomadApi.Allocation, allocData *allocationData,
allocations storage.Storage[*allocationData], callbacks *AllocationProcessing,
jobMapping storage.Storage[string], allocations storage.Storage[*allocationData], callbacks *AllocationProcessing,
) {
// The stop is expected when the allocation desired to stop even before it failed.
reschedulingExpected := allocData.allocDesiredStatus != structs.AllocDesiredStatusStop
handleStoppingAllocationEvent(ctx, alloc, allocations, callbacks, reschedulingExpected)
handleStoppingAllocationEvent(ctx, alloc, jobMapping, allocations, callbacks, reschedulingExpected)
}

// handleLostAllocationEvent logs only the last of the multiple lost events.
func handleLostAllocationEvent(ctx context.Context, alloc *nomadApi.Allocation, allocData *allocationData,
allocations storage.Storage[*allocationData], callbacks *AllocationProcessing,
jobMapping storage.Storage[string], allocations storage.Storage[*allocationData], callbacks *AllocationProcessing,
) {
// The stop is expected when the allocation desired to stop even before it got lost.
reschedulingExpected := allocData.allocDesiredStatus != structs.AllocDesiredStatusStop
handleStoppingAllocationEvent(ctx, alloc, allocations, callbacks, reschedulingExpected)
handleStoppingAllocationEvent(ctx, alloc, jobMapping, allocations, callbacks, reschedulingExpected)
}

func handleStoppingAllocationEvent(ctx context.Context, alloc *nomadApi.Allocation, allocations storage.Storage[*allocationData],
callbacks *AllocationProcessing, reschedulingExpected bool,
func handleStoppingAllocationEvent(ctx context.Context, alloc *nomadApi.Allocation, jobMapping storage.Storage[string],
allocations storage.Storage[*allocationData], callbacks *AllocationProcessing, reschedulingExpected bool,
) {
replacementAllocationScheduled := alloc.NextAllocation != ""
correctRescheduling := reschedulingExpected == replacementAllocationScheduled
Expand All @@ -412,6 +450,7 @@ func handleStoppingAllocationEvent(ctx context.Context, alloc *nomadApi.Allocati
reason = ErrAllocationRescheduledUnexpectedly
}
removedByPoseidon = callbacks.OnDeleted(ctx, alloc.JobID, reason)
jobMapping.Delete(alloc.JobID)
allocations.Delete(alloc.ID)
}

Expand Down
62 changes: 49 additions & 13 deletions internal/nomad/event_stream_handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ func (s *MainTestSuite) TestApiClient_MonitorEvaluationReturnsErrorWhenStreamRet
apiMock := &apiQuerierMock{}
apiMock.On("EventStream", mock.AnythingOfType("*context.cancelCtx")).
Return(nil, tests.ErrDefault)
apiClient := &APIClient{apiMock, storage.NewLocalStorage[chan error](), storage.NewLocalStorage[*allocationData](), false}
apiClient := &APIClient{
apiMock, storage.NewLocalStorage[chan error](),
storage.NewLocalStorage[string](), storage.NewLocalStorage[*allocationData](), false,
}
err := apiClient.MonitorEvaluation(context.Background(), "id")
s.ErrorIs(err, tests.ErrDefault)
}
Expand Down Expand Up @@ -316,6 +319,29 @@ func (s *MainTestSuite) TestApiClient_WatchAllocationsUsesCallbacksForEvents() {
[]*nomadApi.Allocation{startedAllocation, startedAllocation}, []string{startedAllocation.JobID})
})

deregisteredEvent := nomadApi.Event{Topic: nomadApi.TopicJob, Type: structs.TypeJobDeregistered, Key: tests.DefaultRunnerID}
deregisteredEvents := nomadApi.Events{Events: []nomadApi.Event{deregisteredEvent}}
pendingStartDeregisteredEvents := nomadApi.Events{Events: []nomadApi.Event{
eventForAllocation(s.T(), pendingAllocation),
eventForAllocation(s.T(), startedAllocation),
deregisteredEvent,
}}

s.Run("JobDeregistered behaves like Allocation stopping", func() {
assertWatchAllocation(s, []*nomadApi.Events{&pendingStartDeregisteredEvents},
[]*nomadApi.Allocation{startedAllocation}, []string{startedAllocation.JobID})
})

s.Run("onDelete Handler is not called twice on duplicate JobDeregistered", func() {
assertWatchAllocation(s, []*nomadApi.Events{&pendingStartDeregisteredEvents, &deregisteredEvents},
[]*nomadApi.Allocation{startedAllocation}, []string{startedAllocation.JobID})
})

s.Run("onDelete Handler is not called twice on JobDeregistered and Allocation stopping", func() {
assertWatchAllocation(s, []*nomadApi.Events{&pendingStartDeregisteredEvents, &stoppingEvents},
[]*nomadApi.Allocation{startedAllocation}, []string{startedAllocation.JobID})
})

rescheduleAllocation := createRecentAllocation(structs.AllocClientStatusPending, structs.AllocDesiredStatusRun)
rescheduleAllocation.ID = tests.AnotherUUID
rescheduleAllocation.PreviousAllocation = pendingAllocation.ID
Expand Down Expand Up @@ -374,7 +400,7 @@ func (s *MainTestSuite) TestHandleAllocationEventBuffersPendingAllocation() {

allocations := storage.NewLocalStorage[*allocationData]()
err := handleAllocationEvent(s.TestCtx,
time.Now().UnixNano(), allocations, &newPendingEvent, noopAllocationProcessing)
time.Now().UnixNano(), storage.NewLocalStorage[string](), allocations, &newPendingEvent, noopAllocationProcessing)
s.Require().NoError(err)

_, ok := allocations.Get(newPendingAllocation.ID)
Expand All @@ -387,7 +413,7 @@ func (s *MainTestSuite) TestHandleAllocationEventBuffersPendingAllocation() {

allocations := storage.NewLocalStorage[*allocationData]()
err := handleAllocationEvent(s.TestCtx,
time.Now().UnixNano(), allocations, &newPendingEvent, noopAllocationProcessing)
time.Now().UnixNano(), storage.NewLocalStorage[string](), allocations, &newPendingEvent, noopAllocationProcessing)
s.Require().NoError(err)

_, ok := allocations.Get(newPendingAllocation.ID)
Expand Down Expand Up @@ -451,21 +477,25 @@ func (s *MainTestSuite) TestHandleAllocationEvent_ReportsOOMKilledStatus() {
allocations.Add(restartedAllocation.ID, &allocationData{jobID: restartedAllocation.JobID})

var reason error
err := handleAllocationEvent(s.TestCtx, time.Now().UnixNano(), allocations, &restartedEvent, &AllocationProcessing{
OnNew: func(_ context.Context, _ *nomadApi.Allocation, _ time.Duration) {},
OnDeleted: func(_ context.Context, _ string, r error) bool {
reason = r
return true
},
})
err := handleAllocationEvent(s.TestCtx, time.Now().UnixNano(), storage.NewLocalStorage[string](),
allocations, &restartedEvent, &AllocationProcessing{
OnNew: func(_ context.Context, _ *nomadApi.Allocation, _ time.Duration) {},
OnDeleted: func(_ context.Context, _ string, r error) bool {
reason = r
return true
},
})
s.Require().NoError(err)
s.ErrorIs(reason, ErrOOMKilled)
}

func (s *MainTestSuite) TestAPIClient_WatchAllocationsReturnsErrorWhenAllocationStreamCannotBeRetrieved() {
apiMock := &apiQuerierMock{}
apiMock.On("EventStream", mock.Anything).Return(nil, tests.ErrDefault)
apiClient := &APIClient{apiMock, storage.NewLocalStorage[chan error](), storage.NewLocalStorage[*allocationData](), false}
apiClient := &APIClient{
apiMock, storage.NewLocalStorage[chan error](),
storage.NewLocalStorage[string](), storage.NewLocalStorage[*allocationData](), false,
}

err := apiClient.WatchEventStream(context.Background(), noopAllocationProcessing)
s.ErrorIs(err, tests.ErrDefault)
Expand Down Expand Up @@ -618,7 +648,10 @@ func asynchronouslyWatchAllocations(stream chan *nomadApi.Events, callbacks *All

apiMock := &apiQuerierMock{}
apiMock.On("EventStream", ctx).Return(readOnlyStream, nil)
apiClient := &APIClient{apiMock, storage.NewLocalStorage[chan error](), storage.NewLocalStorage[*allocationData](), false}
apiClient := &APIClient{
apiMock, storage.NewLocalStorage[chan error](),
storage.NewLocalStorage[string](), storage.NewLocalStorage[*allocationData](), false,
}

errChan := make(chan error)
go func() {
Expand Down Expand Up @@ -733,7 +766,10 @@ func asynchronouslyMonitorEvaluation(stream <-chan *nomadApi.Events) chan error
apiMock := &apiQuerierMock{}
apiMock.On("EventStream", mock.AnythingOfType("*context.cancelCtx")).
Return(readOnlyStream, nil)
apiClient := &APIClient{apiMock, storage.NewLocalStorage[chan error](), storage.NewLocalStorage[*allocationData](), false}
apiClient := &APIClient{
apiMock, storage.NewLocalStorage[chan error](),
storage.NewLocalStorage[string](), storage.NewLocalStorage[*allocationData](), false,
}

errChan := make(chan error)
go func() {
Expand Down
7 changes: 7 additions & 0 deletions internal/nomad/nomad.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var (
// We do not consider it as an error but add it anyway for a complete reporting.
// It is a ErrLocalDestruction because another allocation might be replacing the allocation in the same job.
ErrAllocationCompleted RunnerDeletedReason = fmt.Errorf("the allocation completed: %w", ErrLocalDestruction)
ErrJobDeregistered RunnerDeletedReason = fmt.Errorf("the job got deregistered: %w", ErrLocalDestruction)
)

type RunnerDeletedReason error
Expand Down Expand Up @@ -88,6 +89,8 @@ type ExecutorAPI interface {
type APIClient struct {
apiQuerier
evaluations storage.Storage[chan error]
// jobAllocationMapping maps a Job ID to the most recent Allocation ID.
jobAllocationMapping storage.Storage[string]
// allocations contain management data for all pending and running allocations.
allocations storage.Storage[*allocationData]
isListening bool
Expand All @@ -99,6 +102,10 @@ func NewExecutorAPI(ctx context.Context, nomadConfig *config.Nomad) (ExecutorAPI
client := &APIClient{
apiQuerier: &nomadAPIClient{},
evaluations: storage.NewLocalStorage[chan error](),
jobAllocationMapping: storage.NewMonitoredLocalStorage[string](ctx, monitoring.MeasurementNomadJobs,
func(p *write.Point, allocationID string, _ storage.EventType) {
p.AddTag(monitoring.InfluxKeyAllocationID, allocationID)
}, 0),
allocations: storage.NewMonitoredLocalStorage[*allocationData](ctx, monitoring.MeasurementNomadAllocations,
func(p *write.Point, object *allocationData, _ storage.EventType) {
p.AddTag(monitoring.InfluxKeyJobID, object.jobID)
Expand Down
2 changes: 2 additions & 0 deletions pkg/monitoring/influxdb2_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
measurementPrefix = "poseidon_"
measurementPoolSize = measurementPrefix + "poolsize"
MeasurementNomadEvents = measurementPrefix + "nomad_events"
MeasurementNomadJobs = measurementPrefix + "nomad_jobs"
MeasurementNomadAllocations = measurementPrefix + "nomad_allocations"
MeasurementIdleRunnerNomad = measurementPrefix + "nomad_idle_runners"
MeasurementExecutionsAWS = measurementPrefix + "aws_executions"
Expand All @@ -38,6 +39,7 @@ const (
InfluxKeyRunnerID = dto.KeyRunnerID
InfluxKeyEnvironmentID = dto.KeyEnvironmentID
InfluxKeyJobID = "job_id"
InfluxKeyAllocationID = "allocation_id"
InfluxKeyClientStatus = "client_status"
InfluxKeyNomadNode = "nomad_agent"
InfluxKeyActualContentLength = "actual_length"
Expand Down
Loading