Skip to content

Commit

Permalink
Merge pull request #1 from agentkube/stage
Browse files Browse the repository at this point in the history
execute promql endpoint
  • Loading branch information
siddhantprateek authored Feb 7, 2025
2 parents 96afa21 + 1940dc4 commit 1c4bfd6
Show file tree
Hide file tree
Showing 11 changed files with 323 additions and 14 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
arch.md
bin
prom_data
prom_data
storage
6 changes: 4 additions & 2 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ func main() {
e.Use(middleware.CORS())

// Register handlers
server.RegisterHandlers(e, promClient)

if err := server.RegisterHandlers(e, promClient); err != nil {
fmt.Printf("Error registering handlers: %v\n", err)
os.Exit(1)
}
// Start server
e.Logger.Fatal(e.Start(":" + viper.GetString("server.port")))
}
9 changes: 7 additions & 2 deletions configs/config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
server:
port: 8080
port: 8083
max_body_size: 2MB

prometheus:
Expand All @@ -8,7 +8,7 @@ prometheus:

ai:
model: "gpt-4o-mini"
api_key: "sk-proj-xxxxxxxxxxxx"
api_key: "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"
temperature: 0.7
top_p: 1.0
base_url: "" # Optional: for Azure OpenAI or other endpoints
Expand All @@ -19,6 +19,11 @@ ai:
knowledge_graph:
schema_path: "./configs/prometheus.yaml"
auto_discover: true
enabled: true
graph_database: "neo4j"
graph_uri: "bolt://localhost:7687"
graph_user: "neo4j"
graph_password: "txt2promql"

semantic_memory:
enabled: true
Expand Down
27 changes: 26 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,29 @@ services:
- ./configs/prometheus.yml:/etc/prometheus/prometheus.yml
- ./prom_data:/prometheus
ports:
- "9090:9090"
- "9090:9090"

neo4j:
image: neo4j:5.12.0 # Let's use a stable version instead of 2025.01.0
restart: unless-stopped
ports:
- "7474:7474" # HTTP
- "7687:7687" # Bolt
environment:
- NEO4J_AUTH=neo4j/txt2promql
- NEO4J_server_memory_pagecache_size=1G
- NEO4J_server_memory_heap_initial__size=1G
- NEO4J_server_memory_heap_max__size=1G
- NEO4J_ACCEPT_LICENSE_AGREEMENT=yes
- NEO4J_dbms_connector_bolt_advertised__address=:7687
- NEO4J_dbms_default__database=neo4j
- NEO4J_dbms_connector_bolt_listen__address=0.0.0.0:7687
volumes:
- ./storage/data:/data
- ./storage/logs:/logs
- ./storage/import:/var/lib/neo4j/import
healthcheck:
test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider localhost:7474 || exit 1"]
interval: 10s
timeout: 5s
retries: 5
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.5
require (
github.com/labstack/echo v3.3.10+incompatible
github.com/labstack/echo/v4 v4.13.3
github.com/neo4j/neo4j-go-driver/v5 v5.27.0
github.com/prometheus/client_golang v1.20.5
github.com/sashabaranov/go-openai v1.36.1
github.com/spf13/cobra v1.8.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/neo4j/neo4j-go-driver/v5 v5.27.0 h1:YdsIxDjAQbjlP/4Ha9B/gF8Y39UdgdTwCyihSxy8qTw=
github.com/neo4j/neo4j-go-driver/v5 v5.27.0/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
23 changes: 21 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,13 @@ type Header struct {

// knowledge graph settings
type KGConfig struct {
SchemaPath string `mapstructure:"schema_path"`
AutoDiscover bool `mapstructure:"auto_discover"`
SchemaPath string `mapstructure:"schema_path"`
AutoDiscover bool `mapstructure:"auto_discover"`
GraphEnabled bool `mapstructure:"enabled"`
GraphURI string `mapstructure:"graph_uri"`
GraphUser string `mapstructure:"graph_user"`
GraphPassword string `mapstructure:"graph_password"`
GraphDatabase string `mapstructure:"graph_database"`
}

// SemanticConfig holds semantic memory settings
Expand Down Expand Up @@ -122,8 +127,10 @@ func setDefaults() {
viper.SetDefault("ai.top_p", 1.0)

// Knowledge Graph defaults
viper.SetDefault("knowledge_graph.enabled", false)
viper.SetDefault("knowledge_graph.auto_discover", true)
viper.SetDefault("knowledge_graph.schema_path", "./schemas/prometheus.yaml")
viper.SetDefault("knowledge_graph.graph_database", "neo4j")

// Semantic Memory defaults
viper.SetDefault("semantic_memory.enabled", true)
Expand All @@ -148,6 +155,18 @@ func loadEnvVariables() {
if model := os.Getenv("OPENAI_MODEL"); model != "" {
viper.Set("ai.model", model)
}

if uri := os.Getenv("NEO4J_URI"); uri != "" {
viper.Set("knowledge_graph.graph_uri", uri)
}

if user := os.Getenv("NEO4J_USER"); user != "" {
viper.Set("knowledge_graph.graph_user", user)
}

if pass := os.Getenv("NEO4J_PASSWORD"); pass != "" {
viper.Set("knowledge_graph.graph_password", pass)
}
}

// validateConfig performs validation on the configuration
Expand Down
File renamed without changes.
69 changes: 68 additions & 1 deletion internal/prometheus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/spf13/viper"
)
Expand All @@ -21,7 +22,8 @@ type QueryResult struct {
ResultType string `json:"resultType"`
Result []struct {
Metric map[string]string `json:"metric"`
Value []interface{} `json:"value"`
Values [][]interface{} `json:"values,omitempty"` // for range queries
Value []interface{} `json:"value,omitempty"` // for instant queries
} `json:"result"`
} `json:"data"`
}
Expand Down Expand Up @@ -63,3 +65,68 @@ func (c *Client) Query(ctx context.Context, query string) (*QueryResult, error)

return &result, nil
}

func (c *Client) QueryRange(ctx context.Context, query string, start, end time.Time, step time.Duration) (*QueryResult, error) {
url := fmt.Sprintf("%s/api/v1/query_range", c.baseURL)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, fmt.Errorf("creating request: %w", err)
}

q := req.URL.Query()
q.Add("query", query)
q.Add("start", start.Format(time.RFC3339))
q.Add("end", end.Format(time.RFC3339))
q.Add("step", step.String())
req.URL.RawQuery = q.Encode()

resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("executing request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

var result QueryResult
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("decoding response: %w", err)
}

return &result, nil
}

// QueryInstant executes an instant query at a specific time
func (c *Client) QueryInstant(ctx context.Context, query string, timestamp *time.Time) (*QueryResult, error) {
url := fmt.Sprintf("%s/api/v1/query", c.baseURL)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, fmt.Errorf("creating request: %w", err)
}

q := req.URL.Query()
q.Add("query", query)
if timestamp != nil {
q.Add("time", timestamp.Format(time.RFC3339))
}
req.URL.RawQuery = q.Encode()

resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("executing request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

var result QueryResult
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("decoding response: %w", err)
}

return &result, nil
}
161 changes: 161 additions & 0 deletions internal/server/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,164 @@ func (h *Handlers) HandleHealth(c echo.Context) error {

return c.JSON(http.StatusOK, resp)
}

type ExecuteRequest struct {
Query string `json:"query"`
Start *time.Time `json:"start,omitempty"`
End *time.Time `json:"end,omitempty"`
Step string `json:"step,omitempty"`
Timestamp *time.Time `json:"timestamp,omitempty"`
}

// func (h *Handlers) HandleExecute(c echo.Context) error {
// var req ExecuteRequest
// if err := c.Bind(&req); err != nil {
// return echo.NewHTTPError(http.StatusBadRequest, "Invalid request format")
// }

// if strings.TrimSpace(req.Query) == "" {
// return echo.NewHTTPError(http.StatusBadRequest, "Query cannot be empty")
// }

// ctx := c.Request().Context()

// // If start and end times are provided, perform a range query
// if req.Start != nil && req.End != nil {
// step := time.Minute // default step
// if req.Step != "" {
// var err error
// step, err = time.ParseDuration(req.Step)
// if err != nil {
// return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid step: %v", err))
// }
// }

// result, err := h.promClient.QueryRange(ctx, req.Query, *req.Start, *req.End, step)
// if err != nil {
// return echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Range query error: %v", err))
// }
// return c.JSON(http.StatusOK, result)
// }

// // Otherwise, perform an instant query
// result, err := h.promClient.QueryInstant(ctx, req.Query, req.Timestamp)
// if err != nil {
// return echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Instant query error: %v", err))
// }

// return c.JSON(http.StatusOK, result)
// }

type ChartSuggestion struct {
Type string `json:"type"`
Reason string `json:"reason"`
}

type ExecuteResponse struct {
Status string `json:"status"`
Data *prometheus.QueryResult `json:"data"`
SuggestedChart ChartSuggestion `json:"suggestedChart"`
}

func predictChartType(query string, result *prometheus.QueryResult) ChartSuggestion {
// Check for time-based functions
if strings.Contains(query, "rate(") ||
strings.Contains(query, "increase(") ||
strings.Contains(query, "irate(") {
return ChartSuggestion{
Type: "time-series",
Reason: "Query contains rate or increase functions which are best visualized over time",
}
}

// Check for aggregations
if strings.Contains(query, " by (") {
if strings.Contains(query, "topk") || strings.Contains(query, "bottomk") {
return ChartSuggestion{
Type: "bar",
Reason: "Query uses top/bottom k aggregation which is best shown as a bar chart",
}
}
return ChartSuggestion{
Type: "pie",
Reason: "Query uses grouping which can be effectively shown as a pie chart",
}
}

// Check for single value or gauge-like metrics
if result.Data.ResultType == "vector" && len(result.Data.Result) == 1 {
if strings.Contains(strings.ToLower(query), "total") ||
strings.Contains(strings.ToLower(query), "sum") {
return ChartSuggestion{
Type: "gauge",
Reason: "Query returns a single total/sum value suitable for a gauge",
}
}
}

// Check for complex multi-metric results
if result.Data.ResultType == "matrix" ||
(result.Data.ResultType == "vector" && len(result.Data.Result) > 10) {
return ChartSuggestion{
Type: "table",
Reason: "Query returns multiple metrics which are best viewed in a table",
}
}

// Check for hierarchical data
if strings.Contains(query, "count") && strings.Contains(query, " by (") {
return ChartSuggestion{
Type: "tree",
Reason: "Query counts by labels which can be shown hierarchically",
}
}

// Default to time-series for most other cases
return ChartSuggestion{
Type: "time-series",
Reason: "Default visualization for metric data over time",
}
}

func (h *Handlers) HandleExecute(c echo.Context) error {
var req ExecuteRequest
if err := c.Bind(&req); err != nil {
return echo.NewHTTPError(http.StatusBadRequest, "Invalid request format")
}

if strings.TrimSpace(req.Query) == "" {
return echo.NewHTTPError(http.StatusBadRequest, "Query cannot be empty")
}

ctx := c.Request().Context()
var result *prometheus.QueryResult
var err error

if req.Start != nil && req.End != nil {
step := time.Minute
if req.Step != "" {
step, err = time.ParseDuration(req.Step)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid step: %v", err))
}
}
result, err = h.promClient.QueryRange(ctx, req.Query, *req.Start, *req.End, step)
} else {
result, err = h.promClient.QueryInstant(ctx, req.Query, req.Timestamp)
}

if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
}

// Predict chart type based on query and result
chartSuggestion := predictChartType(req.Query, result)

response := ExecuteResponse{
Status: "success",
Data: result,
SuggestedChart: chartSuggestion,
}

return c.JSON(http.StatusOK, response)
}
Loading

0 comments on commit 1c4bfd6

Please sign in to comment.