Skip to content

Commit

Permalink
minor UI changes
Browse files Browse the repository at this point in the history
Signed-off-by: hemahg <[email protected]>
  • Loading branch information
hemahg committed Sep 10, 2024
1 parent 06ba7c3 commit 5dff846
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 240 deletions.
197 changes: 98 additions & 99 deletions ui/api/consumerGroups/actions.ts
Original file line number Diff line number Diff line change
@@ -1,102 +1,60 @@
"use server";
import { getHeaders } from "@/api/api";
import {
ConsumerGroup,
ConsumerGroupDryrunResponseSchema,
ConsumerGroupResponseSchema,
ConsumerGroupsResponse,
ConsumerGroupsResponseSchema,
ConsumerGroupState,
DryrunResponse,
UpdateConsumerGroupErrorSchema,
} from "@/api/consumerGroups/schema";
import { filterUndefinedFromObj } from "@/utils/filterUndefinedFromObj";
import { logger } from "@/utils/logger";
"use server";
import { getHeaders } from "@/api/api";
import {
ConsumerGroup,
ConsumerGroupDryrunResponseSchema,
ConsumerGroupResponseSchema,
ConsumerGroupsResponse,
ConsumerGroupsResponseSchema,
ConsumerGroupState,
DryrunResponse,
UpdateConsumerGroupErrorSchema,
} from "@/api/consumerGroups/schema";
import { filterUndefinedFromObj } from "@/utils/filterUndefinedFromObj";
import { logger } from "@/utils/logger";

const log = logger.child({ module: "topics-api" });
const log = logger.child({ module: "consumergroup-api" });

export async function getConsumerGroup(
kafkaId: string,
groupId: string,
): Promise<ConsumerGroup> {
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/consumerGroups/${groupId}`;
const res = await fetch(url, {
headers: await getHeaders(),
next: {
tags: [`consumer-group-${kafkaId}-${groupId}`],
},
});
log.debug({ url }, "getConsumerGroup");
const rawData = await res.json();
log.debug({ url, rawData }, "getConsumerGroup response");
return ConsumerGroupResponseSchema.parse(rawData).data;
}

export async function getConsumerGroups(
kafkaId: string,
params: {
fields?: string;
id?: string;
state?: ConsumerGroupState[];
pageSize?: number;
pageCursor?: string;
sort?: string;
sortDir?: string;
export async function getConsumerGroup(
kafkaId: string,
groupId: string,
): Promise<ConsumerGroup> {
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/consumerGroups/${groupId}`;
const res = await fetch(url, {
headers: await getHeaders(),
next: {
tags: [`consumer-group-${kafkaId}-${groupId}`],
},
): Promise<ConsumerGroupsResponse | null> {
try {
const sp = new URLSearchParams(
filterUndefinedFromObj({
"fields[consumerGroups]":
params.fields ?? "state,simpleConsumerGroup,members,offsets",
// TODO: pass filter from UI
"filter[id]": params.id ? `like,*${params.id}*` : undefined,
"filter[state]":
params.state && params.state.length > 0
? `in,${params.state.join(",")}`
: undefined,
"page[size]": params.pageSize,
"page[after]": params.pageCursor,
sort: params.sort
? (params.sortDir !== "asc" ? "-" : "") + params.sort
: undefined,
}),
);
const cgQuery = sp.toString();
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/consumerGroups?${cgQuery}`;
const res = await fetch(url, {
headers: await getHeaders(),
next: {
tags: [`consumer-groups`],
},
});
log.debug({ url }, "getConsumerGroups");
if (res.status === 200) {
const rawData = await res.json();
log.debug({ url, rawData }, "getConsumerGroups response");
return ConsumerGroupsResponseSchema.parse(rawData);
}
} catch (err) {
log.error(err, "getConsumerGroups");
throw new Error("getConsumerGroups: couldn't connect with backend");
}
return null;
}
});
log.debug({ url }, "getConsumerGroup");
const rawData = await res.json();
log.debug({ url, rawData }, "getConsumerGroup response");
return ConsumerGroupResponseSchema.parse(rawData).data;
}

export async function getTopicConsumerGroups(
kafkaId: string,
topicId: string,
params: {
pageSize?: number;
pageCursor?: string;
sort?: string;
sortDir?: string;
},
): Promise<ConsumerGroupsResponse> {
export async function getConsumerGroups(
kafkaId: string,
params: {
fields?: string;
id?: string;
state?: ConsumerGroupState[];
pageSize?: number;
pageCursor?: string;
sort?: string;
sortDir?: string;
},
): Promise<ConsumerGroupsResponse | null> {
try {
const sp = new URLSearchParams(
filterUndefinedFromObj({
"fields[consumerGroups]":
"state,simpleConsumerGroup,members,offsets,authorizedOperations,coordinator,partitionAssignor",
params.fields ?? "state,simpleConsumerGroup,members,offsets",
"filter[id]": params.id ? `eq,${params.id}` : undefined,
// TODO: pass filter from UI
"filter[state]":
params.state && params.state.length > 0
? `in,${params.state.join(",")}`
: undefined,
"page[size]": params.pageSize,
"page[after]": params.pageCursor,
sort: params.sort
Expand All @@ -105,18 +63,60 @@
}),
);
const cgQuery = sp.toString();
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/topics/${topicId}/consumerGroups?${cgQuery}`;
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/consumerGroups?${cgQuery}`;
const res = await fetch(url, {
headers: await getHeaders(),
next: {
tags: [`consumer-group-${topicId}`],
tags: [`consumer-groups`],
},
});
log.debug({ url }, "getTopicConsumerGroups");
const rawData = await res.json();
log.debug({ url, rawData }, "getTopicConsumerGroups response");
return ConsumerGroupsResponseSchema.parse(rawData);
log.debug({ url }, "getConsumerGroups");
if (res.status === 200) {
const rawData = await res.json();
log.debug({ url, rawData }, "getConsumerGroups response");
return ConsumerGroupsResponseSchema.parse(rawData);
}
} catch (err) {
log.error(err, "getConsumerGroups");
throw new Error("getConsumerGroups: couldn't connect with backend");
}
return null;
}

export async function getTopicConsumerGroups(
kafkaId: string,
topicId: string,
params: {
pageSize?: number;
pageCursor?: string;
sort?: string;
sortDir?: string;
},
): Promise<ConsumerGroupsResponse> {
const sp = new URLSearchParams(
filterUndefinedFromObj({
"fields[consumerGroups]":
"state,simpleConsumerGroup,members,offsets,authorizedOperations,coordinator,partitionAssignor",
"page[size]": params.pageSize,
"page[after]": params.pageCursor,
sort: params.sort
? (params.sortDir !== "asc" ? "-" : "") + params.sort
: undefined,
}),
);
const cgQuery = sp.toString();
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/topics/${topicId}/consumerGroups?${cgQuery}`;
const res = await fetch(url, {
headers: await getHeaders(),
next: {
tags: [`consumer-group-${topicId}`],
},
});
log.debug({ url }, "getTopicConsumerGroups");
const rawData = await res.json();
log.debug({ url, rawData }, "getTopicConsumerGroups response");
return ConsumerGroupsResponseSchema.parse(rawData);
}

export async function updateConsumerGroup(
kafkaId: string,
Expand Down Expand Up @@ -194,5 +194,4 @@
const rawData = await res.json();
log.debug({ url, rawData }, "getConsumerGroup response");
return ConsumerGroupDryrunResponseSchema.parse(rawData).data;

}
42 changes: 20 additions & 22 deletions ui/api/consumerGroups/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ import { ApiError } from "@/api/api";
import { NodeSchema } from "@/api/kafka/schema";
import { z } from "zod";

const ConsumerGroupStateSchema = z.union([
z.literal("STABLE"),
z.literal("EMPTY"),
]);

const OffsetAndMetadataSchema = z.object({
topicId: z.string(),
topicName: z.string(),
Expand All @@ -24,22 +29,6 @@ const MemberDescriptionSchema = z.object({
host: z.string(),
assignments: z.array(PartitionKeySchema).optional(),
});

const DryrunOffsetSchema = z.object({
topicId: z.string(),
topicName: z.string(),
partition: z.number(),
offset: z.number(),
metadata: z.string(),
});

const ConsumerGroupStateSchema = z.union([
z.literal("STABLE"),
z.literal("EMPTY"),
]);

export type ConsumerGroupState = z.infer<typeof ConsumerGroupStateSchema>;

export const ConsumerGroupSchema = z.object({
id: z.string(),
type: z.literal("consumerGroups"),
Expand All @@ -54,7 +43,17 @@ export const ConsumerGroupSchema = z.object({
errors: z.array(ApiError).optional(),
}),
});

const DryrunOffsetSchema = z.object({
topicId: z.string(),
topicName: z.string(),
partition: z.number(),
offset: z.number(),
metadata: z.string(),
});

export type ConsumerGroup = z.infer<typeof ConsumerGroupSchema>;
export type ConsumerGroupState = z.infer<typeof ConsumerGroupStateSchema>;

export const ConsumerGroupsResponseSchema = z.object({
meta: z.object({
Expand Down Expand Up @@ -98,18 +97,19 @@ export const UpdateConsumerGroupErrorSchema = z.object({
}),
),
});

export type ConsumerGroupsResponse = z.infer<
typeof ConsumerGroupsResponseSchema
>;

export type UpdateConsumerGroupErrorSchema= z.infer<typeof UpdateConsumerGroupErrorSchema>;
>;

export const ConsumerGroupResponseSchema = z.object({
data: ConsumerGroupSchema,
});
export type ConsumerGroupResponse = z.infer<
typeof ConsumerGroupsResponseSchema
>;
>;

export type UpdateConsumerGroupErrorSchema= z.infer<typeof UpdateConsumerGroupErrorSchema>;

export const ConsumerGroupDryrunResponseSchema = z.object({
data: DryrunSchema
Expand All @@ -118,5 +118,3 @@ export const ConsumerGroupDryrunResponseSchema = z.object({
export type DryrunResponse = z.infer<
typeof DryrunSchema
>;


Loading

0 comments on commit 5dff846

Please sign in to comment.