Skip to content

Commit

Permalink
feat: additional watchevent types for metrics (#331)
Browse files Browse the repository at this point in the history
## Description

This adds `WatchEvent` types for consuming metrics from the informer.


Please look at [Watch
Config](https://github.com/defenseunicorns/kubernetes-fluent-client/blob/8f6bed408fc967fb4f68b60001cf8a8dc5f7bc5e/src/fluent/watch.ts#L49),
as some configuration options have been renamed.

## Related Issue

Fixes #defenseunicorns/pepr#983

<!-- or -->

Relates to # defenseunicorns/pepr#977

## Type of change

- [ ] Bug fix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Other (security config, docs update, etc)

## Checklist before merging

- [x] Test, docs, adr added or updated as needed
- [x] [Contributor Guide
Steps](https://docs.pepr.dev/main/contribute/#submitting-a-pull-request)
followed

BREAKING CHANGE: This changes the names on the WatchConfig. Look at the
WatchConfig as some configuration options have been renamed.

---------

Signed-off-by: Case Wylie <[email protected]>
  • Loading branch information
cmwylie19 authored Jul 29, 2024
1 parent 8f6bed4 commit 6a936fd
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 28 deletions.
20 changes: 10 additions & 10 deletions src/fluent/watch.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ describe("Watcher", () => {

it("should return the cache id", () => {
watcher = K8s(kind.Pod).Watch(evtMock, {
retryDelaySec: 1,
resyncDelaySec: 1,
});
expect(watcher.getCacheID()).toEqual("d69b75a611");
});

it("should handle the CONNECT event", done => {
watcher = K8s(kind.Pod).Watch(evtMock, {
retryDelaySec: 1,
resyncDelaySec: 1,
});
setupAndStartWatcher(WatchEvent.CONNECT, () => {
done();
Expand All @@ -154,7 +154,7 @@ describe("Watcher", () => {

it("should handle the DATA event", done => {
watcher = K8s(kind.Pod).Watch(evtMock, {
retryDelaySec: 1,
resyncDelaySec: 1,
});
setupAndStartWatcher(WatchEvent.DATA, (pod, phase) => {
expect(pod.metadata?.name).toEqual(`pod-0`);
Expand All @@ -181,7 +181,7 @@ describe("Watcher", () => {
.replyWithError("Something bad happened");

watcher = K8s(kind.Pod).Watch(evtMock, {
retryDelaySec: 1,
resyncDelaySec: 1,
});

setupAndStartWatcher(WatchEvent.NETWORK_ERROR, error => {
Expand Down Expand Up @@ -210,7 +210,7 @@ describe("Watcher", () => {
.replyWithError("Something bad happened");

watcher = K8s(kind.Pod).Watch(evtMock, {
retryDelaySec: 0.01,
resyncDelaySec: 0.01,
});

setupAndStartWatcher(WatchEvent.RECONNECT, count => {
Expand All @@ -221,8 +221,8 @@ describe("Watcher", () => {

it("should perform a resync after the resync interval", done => {
watcher = K8s(kind.Pod).Watch(evtMock, {
retryDelaySec: 0.01,
resyncIntervalSec: 0.01,
resyncDelaySec: 0.01,
lastSeenLimitSeconds: 0.01,
});

setupAndStartWatcher(WatchEvent.RECONNECT, count => {
Expand All @@ -249,9 +249,9 @@ describe("Watcher", () => {
.replyWithError("Something bad happened");

watcher = K8s(kind.Pod).Watch(evtMock, {
retryMax: 1,
retryDelaySec: 0.01,
resyncIntervalSec: 1,
resyncFailureMax: 1,
resyncDelaySec: 0.01,
lastSeenLimitSeconds: 1,
});

setupAndStartWatcher(WatchEvent.GIVE_UP, error => {
Expand Down
62 changes: 44 additions & 18 deletions src/fluent/watch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,24 @@ export enum WatchEvent {
LIST = "list",
/** List operation error */
LIST_ERROR = "list_error",
/** Cache Misses */
CACHE_MISS = "cache_miss",
/** Increment resync failure count */
INC_RESYNC_FAILURE_COUNT = "inc_resync_failure_count",
/** Initialize a relist window */
INIT_CACHE_MISS = "init_cache_miss",
}

/** Configuration for the watch function. */
export type WatchCfg = {
/** The maximum number of times to retry the watch, the retry count is reset on success. Unlimited retries if not specified. */
retryMax?: number;
/** Seconds between each retry check. Defaults to 5. */
retryDelaySec?: number;
resyncFailureMax?: number;
/** Seconds between each resync check. Defaults to 5. */
resyncDelaySec?: number;
/** Amount of seconds to wait before relisting the watch list. Defaults to 600 (10 minutes). */
relistIntervalSec?: number;
/** Amount of seconds to wait before a forced-resyncing of the watch list. Defaults to 300 (5 minutes). */
resyncIntervalSec?: number;
/** Max amount of seconds to go without receiving an event before reconciliation starts. Defaults to 300 (5 minutes). */
lastSeenLimitSeconds?: number;
};

const NONE = 50;
Expand All @@ -58,6 +64,7 @@ export class Watcher<T extends GenericClass> {
#filters: Filters;
#callback: WatchAction<T>;
#watchCfg: WatchCfg;
#latestRelistWindow: string = "";

// Track the last time data was received
#lastSeenTime = NONE;
Expand All @@ -67,7 +74,7 @@ export class Watcher<T extends GenericClass> {
#abortController: AbortController;

// Track the number of retries
#retryCount = 0;
#resyncFailureCount = 0;

// Create a stream to read the response body
#stream?: byline.LineStream;
Expand Down Expand Up @@ -104,25 +111,35 @@ export class Watcher<T extends GenericClass> {
*/
constructor(model: T, filters: Filters, callback: WatchAction<T>, watchCfg: WatchCfg = {}) {
// Set the retry delay to 5 seconds if not specified
watchCfg.retryDelaySec ??= 5;
watchCfg.resyncDelaySec ??= 5;

// Set the relist interval to 30 minutes if not specified
watchCfg.relistIntervalSec ??= 1800;

// Set the resync interval to 10 minutes if not specified
watchCfg.resyncIntervalSec ??= 600;
watchCfg.lastSeenLimitSeconds ??= 600;

// Set the last seen limit to the resync interval
this.#lastSeenLimit = watchCfg.resyncIntervalSec * 1000;
this.#lastSeenLimit = watchCfg.lastSeenLimitSeconds * 1000;

// Set the latest relist interval to now
this.#latestRelistWindow = new Date().toISOString();

// Add random jitter to the relist/resync intervals (up to 1 second)
const jitter = Math.floor(Math.random() * 1000);

// Check every relist interval for cache staleness
this.$relistTimer = setInterval(this.#list, watchCfg.relistIntervalSec * 1000 + jitter);

// Rebuild the watch every retry delay interval
this.#resyncTimer = setInterval(this.#checkResync, watchCfg.retryDelaySec * 1000 + jitter);
this.$relistTimer = setInterval(
() => {
this.#latestRelistWindow = new Date().toISOString();
this.#events.emit(WatchEvent.INIT_CACHE_MISS, this.#latestRelistWindow);
this.#list;
},
watchCfg.relistIntervalSec * 1000 + jitter,
);

// Rebuild the watch every resync delay interval
this.#resyncTimer = setInterval(this.#checkResync, watchCfg.resyncDelaySec * 1000 + jitter);

// Bind class properties
this.#model = model;
Expand All @@ -140,6 +157,7 @@ export class Watcher<T extends GenericClass> {
* @returns The AbortController for the watch.
*/
public async start(): Promise<AbortController> {
this.#events.emit(WatchEvent.INIT_CACHE_MISS, this.#latestRelistWindow);
await this.#watch();
return this.#abortController;
}
Expand Down Expand Up @@ -267,6 +285,7 @@ export class Watcher<T extends GenericClass> {

// If the item does not exist, it is new and should be added
if (!alreadyExists) {
this.#events.emit(WatchEvent.CACHE_MISS, this.#latestRelistWindow);
// Send added event. Use void here because we don't care about the result (no consequences here if it fails)
void this.#process(item, WatchPhase.Added);
continue;
Expand All @@ -278,6 +297,7 @@ export class Watcher<T extends GenericClass> {

// Check if the resource version is newer than the cached version
if (itemRV > cachedRV) {
this.#events.emit(WatchEvent.CACHE_MISS, this.#latestRelistWindow);
// Send a modified event if the resource version has changed
void this.#process(item, WatchPhase.Modified);
}
Expand All @@ -291,6 +311,7 @@ export class Watcher<T extends GenericClass> {
} else {
// Otherwise, process the removed items
for (const item of removedItems.values()) {
this.#events.emit(WatchEvent.CACHE_MISS, this.#latestRelistWindow);
void this.#process(item, WatchPhase.Deleted);
}
}
Expand Down Expand Up @@ -362,7 +383,8 @@ export class Watcher<T extends GenericClass> {
const { body } = response;

// Reset the retry count
this.#retryCount = 0;
this.#resyncFailureCount = 0;
this.#events.emit(WatchEvent.INC_RESYNC_FAILURE_COUNT, this.#resyncFailureCount);

// Listen for events and call the callback function
this.#stream.on("data", async line => {
Expand Down Expand Up @@ -430,16 +452,20 @@ export class Watcher<T extends GenericClass> {
this.#lastSeenTime = now;

// If there are more attempts, retry the watch (undefined is unlimited retries)
if (this.#watchCfg.retryMax === undefined || this.#watchCfg.retryMax > this.#retryCount) {
if (
this.#watchCfg.resyncFailureMax === undefined ||
this.#watchCfg.resyncFailureMax > this.#resyncFailureCount
) {
// Increment the retry count
this.#retryCount++;
this.#resyncFailureCount++;
this.#events.emit(WatchEvent.INC_RESYNC_FAILURE_COUNT, this.#resyncFailureCount);

if (this.#pendingReconnect) {
// wait for the connection to be re-established
this.#events.emit(WatchEvent.RECONNECT_PENDING);
} else {
this.#pendingReconnect = true;
this.#events.emit(WatchEvent.RECONNECT, this.#retryCount);
this.#events.emit(WatchEvent.RECONNECT, this.#resyncFailureCount);
this.#streamCleanup();

void this.#watch();
Expand All @@ -448,7 +474,7 @@ export class Watcher<T extends GenericClass> {
// Otherwise, call the finally function if it exists
this.#events.emit(
WatchEvent.GIVE_UP,
new Error(`Retry limit (${this.#watchCfg.retryMax}) exceeded, giving up`),
new Error(`Retry limit (${this.#watchCfg.resyncFailureMax}) exceeded, giving up`),
);
this.close();
}
Expand Down

0 comments on commit 6a936fd

Please sign in to comment.