Skip to content

Commit

Permalink
Consumer group reset offset
Browse files Browse the repository at this point in the history
Signed-off-by: hemahg <[email protected]>
  • Loading branch information
hemahg committed Sep 11, 2024
1 parent 4b4c454 commit 519c637
Show file tree
Hide file tree
Showing 22 changed files with 3,656 additions and 65 deletions.
92 changes: 90 additions & 2 deletions ui/api/consumerGroups/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@
import { getHeaders } from "@/api/api";
import {
ConsumerGroup,
ConsumerGroupDryrunResponseSchema,
ConsumerGroupResponseSchema,
ConsumerGroupsResponse,
ConsumerGroupsResponseSchema,
ConsumerGroupState,
DryrunResponse,
UpdateConsumerGroupErrorSchema,
} from "@/api/consumerGroups/schema";
import { filterUndefinedFromObj } from "@/utils/filterUndefinedFromObj";
import { logger } from "@/utils/logger";

const log = logger.child({ module: "topics-api" });
const log = logger.child({ module: "consumergroup-api" });

export async function getConsumerGroup(
kafkaId: string,
Expand All @@ -32,6 +36,8 @@ export async function getConsumerGroups(
kafkaId: string,
params: {
fields?: string;
id?: string;
state?: ConsumerGroupState[];
pageSize?: number;
pageCursor?: string;
sort?: string;
Expand All @@ -43,8 +49,12 @@ export async function getConsumerGroups(
filterUndefinedFromObj({
"fields[consumerGroups]":
params.fields ?? "state,simpleConsumerGroup,members,offsets",
"filter[id]": params.id ? `eq,${params.id}` : undefined,
// TODO: pass filter from UI
"filter[state]": "in,STABLE,PREPARING_REBALANCE,COMPLETING_REBALANCE",
"filter[state]":
params.state && params.state.length > 0
? `in,${params.state.join(",")}`
: undefined,
"page[size]": params.pageSize,
"page[after]": params.pageCursor,
sort: params.sort
Expand Down Expand Up @@ -107,3 +117,81 @@ export async function getTopicConsumerGroups(
log.debug({ url, rawData }, "getTopicConsumerGroups response");
return ConsumerGroupsResponseSchema.parse(rawData);
}

export async function updateConsumerGroup(
kafkaId: string,
consumerGroupId: string,
offsets: Array<{
topicId: string;
partition?: number;
offset: string | number;
metadata?: string;
}>,
): Promise<boolean | UpdateConsumerGroupErrorSchema> {
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/consumerGroups/${consumerGroupId}`;
const body = {
data: {
type: "consumerGroups",
id: consumerGroupId,
attributes: {
offsets,
},
},
};

log.debug({ url, body }, "calling updateConsumerGroup");

try {
const res = await fetch(url, {
headers: await getHeaders(),
method: "PATCH",
body: JSON.stringify(body),
});

log.debug({ status: res.status }, "updateConsumerGroup response");

if (res.status === 204) {
return true;
} else {
const rawData = await res.json();
return UpdateConsumerGroupErrorSchema.parse(rawData);
}
} catch (e) {
log.error(e, "updateConsumerGroup unknown error");
console.error("Unknown error occurred:", e);
return false;
}
}

export async function getDryrunResult(
kafkaId: string,
consumerGroupId: string,
offsets: Array<{
topicId: string;
partition?: number;
offset: string | number;
metadata?: string;
}>,
): Promise<DryrunResponse> {
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/consumerGroups/${consumerGroupId}`;
const body = {
meta: {
dryRun: true,
},
data: {
type: "consumerGroups",
id: consumerGroupId,
attributes: {
offsets,
},
},
};
const res = await fetch(url, {
headers: await getHeaders(),
method: "PATCH",
body: JSON.stringify(body),
});
const rawData = await res.json();
log.debug({ url, rawData }, "getConsumerGroup response");
return ConsumerGroupDryrunResponseSchema.parse(rawData).data;
}
55 changes: 54 additions & 1 deletion ui/api/consumerGroups/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ import { ApiError } from "@/api/api";
import { NodeSchema } from "@/api/kafka/schema";
import { z } from "zod";

const ConsumerGroupStateSchema = z.union([
z.literal("STABLE"),
z.literal("EMPTY"),
]);

const OffsetAndMetadataSchema = z.object({
topicId: z.string(),
topicName: z.string(),
Expand Down Expand Up @@ -29,7 +34,7 @@ export const ConsumerGroupSchema = z.object({
type: z.literal("consumerGroups"),
attributes: z.object({
simpleConsumerGroup: z.boolean().optional(),
state: z.string().optional(),
state: ConsumerGroupStateSchema,
members: z.array(MemberDescriptionSchema).optional(),
partitionAssignor: z.string().nullable().optional(),
coordinator: NodeSchema.nullable().optional(),
Expand All @@ -38,7 +43,17 @@ export const ConsumerGroupSchema = z.object({
errors: z.array(ApiError).optional(),
}),
});

const DryrunOffsetSchema = z.object({
topicId: z.string(),
topicName: z.string(),
partition: z.number(),
offset: z.number(),
metadata: z.string(),
});

export type ConsumerGroup = z.infer<typeof ConsumerGroupSchema>;
export type ConsumerGroupState = z.infer<typeof ConsumerGroupStateSchema>;

export const ConsumerGroupsResponseSchema = z.object({
meta: z.object({
Expand All @@ -55,6 +70,34 @@ export const ConsumerGroupsResponseSchema = z.object({
}),
data: z.array(ConsumerGroupSchema),
});

export const DryrunSchema = z.object({
id: z.string(),
type: z.literal("consumerGroups"),
attributes: z.object({
state: ConsumerGroupStateSchema,
members: z.array(MemberDescriptionSchema).optional(),
offsets: z.array(DryrunOffsetSchema).optional(),
}),
});

export const UpdateConsumerGroupErrorSchema = z.object({
errors: z.array(
z.object({
id: z.string(),
status: z.string(),
code: z.string(),
title: z.string(),
detail: z.string(),
source: z
.object({
pointer: z.string().optional(),
})
.optional(),
}),
),
});

export type ConsumerGroupsResponse = z.infer<
typeof ConsumerGroupsResponseSchema
>;
Expand All @@ -65,3 +108,13 @@ export const ConsumerGroupResponseSchema = z.object({
export type ConsumerGroupResponse = z.infer<
typeof ConsumerGroupsResponseSchema
>;

export type UpdateConsumerGroupErrorSchema = z.infer<
typeof UpdateConsumerGroupErrorSchema
>;

export const ConsumerGroupDryrunResponseSchema = z.object({
data: DryrunSchema,
});

export type DryrunResponse = z.infer<typeof DryrunSchema>;
Loading

0 comments on commit 519c637

Please sign in to comment.