From e61ee495a60a8aca6a88a2ccb0d2a7d552cb38a6 Mon Sep 17 00:00:00 2001 From: siddhantprateek Date: Fri, 7 Feb 2025 21:06:55 +0530 Subject: [PATCH 1/2] neo4j integration --- .gitignore | 3 +- cmd/api/main.go | 38 +++- configs/config.yaml | 9 +- docker-compose.yaml | 27 ++- go.mod | 1 + go.sum | 2 + internal/agent/context.go | 1 + internal/config/config.go | 23 ++- internal/core/knowledgegraph/client.go | 140 ++++++++++++++ .../{relationships.go => provider.go} | 0 internal/prometheus/client.go | 69 ++++++- internal/server/handlers/handlers.go | 175 +++++++++++++++++- internal/server/server.go | 41 +++- 13 files changed, 509 insertions(+), 20 deletions(-) create mode 100644 internal/core/knowledgegraph/client.go rename internal/core/knowledgegraph/{relationships.go => provider.go} (100%) diff --git a/.gitignore b/.gitignore index c3d945c..bdd81ab 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ arch.md bin -prom_data \ No newline at end of file +prom_data +storage \ No newline at end of file diff --git a/cmd/api/main.go b/cmd/api/main.go index db6f9e1..8887bd1 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -1,9 +1,12 @@ package main import ( + "context" "fmt" "os" + "github.com/agentkube/txt2promql/internal/config" + "github.com/agentkube/txt2promql/internal/core/knowledgegraph" "github.com/agentkube/txt2promql/internal/prometheus" "github.com/agentkube/txt2promql/internal/server" "github.com/labstack/echo/v4" @@ -36,6 +39,35 @@ func main() { // Initialize Prometheus client promClient := prometheus.NewClient() + cfg, err := config.LoadConfig() + if err != nil { + fmt.Printf("Error loading config: %v\n", err) + os.Exit(1) + } + + var kgClient *knowledgegraph.Client + if cfg.KG.GraphEnabled { + kgClient, err = knowledgegraph.NewClient(&knowledgegraph.Config{ + Enabled: true, + URI: cfg.KG.GraphURI, + User: cfg.KG.GraphUser, + Password: cfg.KG.GraphPassword, + Database: cfg.KG.GraphDatabase, + }) + if err != nil { + fmt.Printf("Warning: Failed to initialize knowledge graph: %v\n", err) + // Continue without knowledge graph + } else { + // Test connection + if err := kgClient.Connect(context.Background()); err != nil { + fmt.Printf("Warning: Failed to connect to knowledge graph: %v\n", err) + kgClient = nil // Disable knowledge graph functionality + } else { + fmt.Println("Successfully connected to knowledge graph") + } + } + } + // Initialize Echo instance e := echo.New() @@ -45,8 +77,10 @@ func main() { e.Use(middleware.CORS()) // Register handlers - server.RegisterHandlers(e, promClient) - + if err := server.RegisterHandlers(e, promClient, kgClient); err != nil { + fmt.Printf("Error registering handlers: %v\n", err) + os.Exit(1) + } // Start server e.Logger.Fatal(e.Start(":" + viper.GetString("server.port"))) } diff --git a/configs/config.yaml b/configs/config.yaml index 74d590e..cfda0b5 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -1,5 +1,5 @@ server: - port: 8080 + port: 8083 max_body_size: 2MB prometheus: @@ -8,7 +8,7 @@ prometheus: ai: model: "gpt-4o-mini" - api_key: "sk-proj-xxxxxxxxxxxx" + api_key: "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" temperature: 0.7 top_p: 1.0 base_url: "" # Optional: for Azure OpenAI or other endpoints @@ -19,6 +19,11 @@ ai: knowledge_graph: schema_path: "./configs/prometheus.yaml" auto_discover: true + enabled: true + graph_database: "neo4j" + graph_uri: "bolt://localhost:7687" + graph_user: "neo4j" + graph_password: "txt2promql" semantic_memory: enabled: true diff --git a/docker-compose.yaml b/docker-compose.yaml index 1347a54..df06f98 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -19,4 +19,29 @@ services: - ./configs/prometheus.yml:/etc/prometheus/prometheus.yml - ./prom_data:/prometheus ports: - - "9090:9090" \ No newline at end of file + - "9090:9090" + + neo4j: + image: neo4j:5.12.0 # Let's use a stable version instead of 2025.01.0 + restart: unless-stopped + ports: + - "7474:7474" # HTTP + - "7687:7687" # Bolt + environment: + - NEO4J_AUTH=neo4j/txt2promql + - NEO4J_server_memory_pagecache_size=1G + - NEO4J_server_memory_heap_initial__size=1G + - NEO4J_server_memory_heap_max__size=1G + - NEO4J_ACCEPT_LICENSE_AGREEMENT=yes + - NEO4J_dbms_connector_bolt_advertised__address=:7687 + - NEO4J_dbms_default__database=neo4j + - NEO4J_dbms_connector_bolt_listen__address=0.0.0.0:7687 + volumes: + - ./storage/data:/data + - ./storage/logs:/logs + - ./storage/import:/var/lib/neo4j/import + healthcheck: + test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider localhost:7474 || exit 1"] + interval: 10s + timeout: 5s + retries: 5 \ No newline at end of file diff --git a/go.mod b/go.mod index 94cae0f..a70b8d3 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.22.5 require ( github.com/labstack/echo v3.3.10+incompatible github.com/labstack/echo/v4 v4.13.3 + github.com/neo4j/neo4j-go-driver/v5 v5.27.0 github.com/prometheus/client_golang v1.20.5 github.com/sashabaranov/go-openai v1.36.1 github.com/spf13/cobra v1.8.1 diff --git a/go.sum b/go.sum index 3696366..7eac2e8 100644 --- a/go.sum +++ b/go.sum @@ -42,6 +42,8 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/neo4j/neo4j-go-driver/v5 v5.27.0 h1:YdsIxDjAQbjlP/4Ha9B/gF8Y39UdgdTwCyihSxy8qTw= +github.com/neo4j/neo4j-go-driver/v5 v5.27.0/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/internal/agent/context.go b/internal/agent/context.go index d3238ba..f0c73cf 100644 --- a/internal/agent/context.go +++ b/internal/agent/context.go @@ -1,3 +1,4 @@ +// internal/agent/context.go package agent import ( diff --git a/internal/config/config.go b/internal/config/config.go index 33c65a3..cb3b71c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -49,8 +49,13 @@ type Header struct { // knowledge graph settings type KGConfig struct { - SchemaPath string `mapstructure:"schema_path"` - AutoDiscover bool `mapstructure:"auto_discover"` + SchemaPath string `mapstructure:"schema_path"` + AutoDiscover bool `mapstructure:"auto_discover"` + GraphEnabled bool `mapstructure:"enabled"` + GraphURI string `mapstructure:"graph_uri"` + GraphUser string `mapstructure:"graph_user"` + GraphPassword string `mapstructure:"graph_password"` + GraphDatabase string `mapstructure:"graph_database"` } // SemanticConfig holds semantic memory settings @@ -122,8 +127,10 @@ func setDefaults() { viper.SetDefault("ai.top_p", 1.0) // Knowledge Graph defaults + viper.SetDefault("knowledge_graph.enabled", false) viper.SetDefault("knowledge_graph.auto_discover", true) viper.SetDefault("knowledge_graph.schema_path", "./schemas/prometheus.yaml") + viper.SetDefault("knowledge_graph.graph_database", "neo4j") // Semantic Memory defaults viper.SetDefault("semantic_memory.enabled", true) @@ -148,6 +155,18 @@ func loadEnvVariables() { if model := os.Getenv("OPENAI_MODEL"); model != "" { viper.Set("ai.model", model) } + + if uri := os.Getenv("NEO4J_URI"); uri != "" { + viper.Set("knowledge_graph.graph_uri", uri) + } + + if user := os.Getenv("NEO4J_USER"); user != "" { + viper.Set("knowledge_graph.graph_user", user) + } + + if pass := os.Getenv("NEO4J_PASSWORD"); pass != "" { + viper.Set("knowledge_graph.graph_password", pass) + } } // validateConfig performs validation on the configuration diff --git a/internal/core/knowledgegraph/client.go b/internal/core/knowledgegraph/client.go new file mode 100644 index 0000000..3e375e5 --- /dev/null +++ b/internal/core/knowledgegraph/client.go @@ -0,0 +1,140 @@ +// internal/core/knowledgegraph/client.go +package knowledgegraph + +import ( + "context" + "fmt" + "time" + + "github.com/neo4j/neo4j-go-driver/v5/neo4j" +) + +type Client struct { + enabled bool + driver neo4j.DriverWithContext + config *Config +} + +type Config struct { + Enabled bool + URI string + User string + Password string + Database string +} + +type MetricInfo struct { + Name string `json:"name"` + Type string `json:"type"` + Description string `json:"description"` + Labels map[string]string `json:"labels"` + SimilarTo []string `json:"similar_to,omitempty"` +} + +// NewClient creates a new knowledge graph client +func NewClient(config *Config) (*Client, error) { + client := &Client{ + enabled: config.Enabled, + config: config, + } + + if !config.Enabled { + return client, nil + } + + driver, err := neo4j.NewDriverWithContext( + config.URI, + neo4j.BasicAuth(config.User, config.Password, ""), + func(c *neo4j.Config) { + c.MaxConnectionLifetime = 30 * time.Minute + c.MaxConnectionPoolSize = 50 + }) + if err != nil { + return nil, fmt.Errorf("creating neo4j driver: %w", err) + } + + client.driver = driver + return client, nil +} + +func (c *Client) Close(ctx context.Context) error { + if !c.enabled || c.driver == nil { + return nil + } + return c.driver.Close(ctx) +} + +func (c *Client) Connect(ctx context.Context) error { + if !c.enabled || c.driver == nil { + return nil + } + + maxRetries := 5 + for i := 0; i < maxRetries; i++ { + err := c.driver.VerifyConnectivity(ctx) + if err == nil { + return nil + } + fmt.Printf("Attempt %d: Failed to connect to Neo4j: %v\n", i+1, err) + time.Sleep(time.Second * 2) // Wait before retrying + } + + return c.driver.VerifyConnectivity(ctx) +} + +func (c *Client) FindSimilarMetrics(ctx context.Context, metricName string) ([]MetricInfo, error) { + if !c.enabled || c.driver == nil { + return nil, nil + } + + session := c.driver.NewSession(ctx, neo4j.SessionConfig{ + DatabaseName: c.config.Database, + AccessMode: neo4j.AccessModeRead, + }) + defer session.Close(ctx) + + result, err := neo4j.ExecuteRead[[]MetricInfo](ctx, session, + func(tx neo4j.ManagedTransaction) ([]MetricInfo, error) { + query := ` + MATCH (m:Metric {name: $name})-[r:RELATED]-(similar:Metric) + RETURN similar, r.weight as weight + ORDER BY r.weight DESC + LIMIT 5 + ` + result, err := tx.Run(ctx, query, map[string]any{"name": metricName}) + if err != nil { + return nil, err + } + + var metrics []MetricInfo + records, err := result.Collect(ctx) + if err != nil { + return nil, err + } + + for _, record := range records { + node, ok := record.Values[0].(neo4j.Node) + if !ok { + continue + } + + metric := MetricInfo{ + Name: node.Props["name"].(string), + Type: node.Props["type"].(string), + Description: node.Props["description"].(string), + } + if labels, ok := node.Props["labels"].(map[string]string); ok { + metric.Labels = labels + } + metrics = append(metrics, metric) + } + + return metrics, nil + }) + + if err != nil { + return nil, fmt.Errorf("finding similar metrics: %w", err) + } + + return result, nil +} diff --git a/internal/core/knowledgegraph/relationships.go b/internal/core/knowledgegraph/provider.go similarity index 100% rename from internal/core/knowledgegraph/relationships.go rename to internal/core/knowledgegraph/provider.go diff --git a/internal/prometheus/client.go b/internal/prometheus/client.go index 12d6c41..e3d04eb 100644 --- a/internal/prometheus/client.go +++ b/internal/prometheus/client.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "net/http" + "time" "github.com/spf13/viper" ) @@ -21,7 +22,8 @@ type QueryResult struct { ResultType string `json:"resultType"` Result []struct { Metric map[string]string `json:"metric"` - Value []interface{} `json:"value"` + Values [][]interface{} `json:"values,omitempty"` // for range queries + Value []interface{} `json:"value,omitempty"` // for instant queries } `json:"result"` } `json:"data"` } @@ -63,3 +65,68 @@ func (c *Client) Query(ctx context.Context, query string) (*QueryResult, error) return &result, nil } + +func (c *Client) QueryRange(ctx context.Context, query string, start, end time.Time, step time.Duration) (*QueryResult, error) { + url := fmt.Sprintf("%s/api/v1/query_range", c.baseURL) + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, fmt.Errorf("creating request: %w", err) + } + + q := req.URL.Query() + q.Add("query", query) + q.Add("start", start.Format(time.RFC3339)) + q.Add("end", end.Format(time.RFC3339)) + q.Add("step", step.String()) + req.URL.RawQuery = q.Encode() + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("executing request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + var result QueryResult + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("decoding response: %w", err) + } + + return &result, nil +} + +// QueryInstant executes an instant query at a specific time +func (c *Client) QueryInstant(ctx context.Context, query string, timestamp *time.Time) (*QueryResult, error) { + url := fmt.Sprintf("%s/api/v1/query", c.baseURL) + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, fmt.Errorf("creating request: %w", err) + } + + q := req.URL.Query() + q.Add("query", query) + if timestamp != nil { + q.Add("time", timestamp.Format(time.RFC3339)) + } + req.URL.RawQuery = q.Encode() + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("executing request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + var result QueryResult + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("decoding response: %w", err) + } + + return &result, nil +} diff --git a/internal/server/handlers/handlers.go b/internal/server/handlers/handlers.go index 8552473..100d188 100644 --- a/internal/server/handlers/handlers.go +++ b/internal/server/handlers/handlers.go @@ -8,6 +8,7 @@ import ( "time" "github.com/agentkube/txt2promql/internal/agent" + "github.com/agentkube/txt2promql/internal/core/knowledgegraph" "github.com/agentkube/txt2promql/internal/prometheus" "github.com/agentkube/txt2promql/internal/provider/openai" "github.com/labstack/echo/v4" @@ -15,6 +16,7 @@ import ( type Handlers struct { promClient *prometheus.Client + kgClient *knowledgegraph.Client contextExtractor *agent.ContextExtractor explainer *agent.Explainer queryBuilder *agent.QueryBuilder @@ -22,9 +24,10 @@ type Handlers struct { lastCacheTime time.Time } -func New(promClient *prometheus.Client, openaiClient *openai.OpenAIClient) *Handlers { +func New(promClient *prometheus.Client, openaiClient *openai.OpenAIClient, kgClient *knowledgegraph.Client) *Handlers { return &Handlers{ promClient: promClient, + kgClient: kgClient, contextExtractor: agent.NewContextExtractor(openaiClient), explainer: agent.NewExplainer(openaiClient), queryBuilder: agent.NewQueryBuilder(), @@ -37,9 +40,12 @@ type ConvertRequest struct { } type ConvertResponse struct { - PromQL string `json:"promql"` - Explanation string `json:"explanation,omitempty"` - Warnings []string `json:"warnings,omitempty"` + PromQL string `json:"promql"` + Explanation string `json:"explanation,omitempty"` + Warnings []string `json:"warnings,omitempty"` + SimilarMetrics []knowledgegraph.MetricInfo `json:"similar_metrics,omitempty"` + PatternExplanation string `json:"pattern_explanation,omitempty"` + NavigationPath []string `json:"navigation_path,omitempty"` } func (h *Handlers) HandleConvert(c echo.Context) error { @@ -157,3 +163,164 @@ func (h *Handlers) HandleHealth(c echo.Context) error { return c.JSON(http.StatusOK, resp) } + +type ExecuteRequest struct { + Query string `json:"query"` + Start *time.Time `json:"start,omitempty"` + End *time.Time `json:"end,omitempty"` + Step string `json:"step,omitempty"` + Timestamp *time.Time `json:"timestamp,omitempty"` +} + +// func (h *Handlers) HandleExecute(c echo.Context) error { +// var req ExecuteRequest +// if err := c.Bind(&req); err != nil { +// return echo.NewHTTPError(http.StatusBadRequest, "Invalid request format") +// } + +// if strings.TrimSpace(req.Query) == "" { +// return echo.NewHTTPError(http.StatusBadRequest, "Query cannot be empty") +// } + +// ctx := c.Request().Context() + +// // If start and end times are provided, perform a range query +// if req.Start != nil && req.End != nil { +// step := time.Minute // default step +// if req.Step != "" { +// var err error +// step, err = time.ParseDuration(req.Step) +// if err != nil { +// return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid step: %v", err)) +// } +// } + +// result, err := h.promClient.QueryRange(ctx, req.Query, *req.Start, *req.End, step) +// if err != nil { +// return echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Range query error: %v", err)) +// } +// return c.JSON(http.StatusOK, result) +// } + +// // Otherwise, perform an instant query +// result, err := h.promClient.QueryInstant(ctx, req.Query, req.Timestamp) +// if err != nil { +// return echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Instant query error: %v", err)) +// } + +// return c.JSON(http.StatusOK, result) +// } + +type ChartSuggestion struct { + Type string `json:"type"` + Reason string `json:"reason"` +} + +type ExecuteResponse struct { + Status string `json:"status"` + Data *prometheus.QueryResult `json:"data"` + SuggestedChart ChartSuggestion `json:"suggestedChart"` +} + +func predictChartType(query string, result *prometheus.QueryResult) ChartSuggestion { + // Check for time-based functions + if strings.Contains(query, "rate(") || + strings.Contains(query, "increase(") || + strings.Contains(query, "irate(") { + return ChartSuggestion{ + Type: "time-series", + Reason: "Query contains rate or increase functions which are best visualized over time", + } + } + + // Check for aggregations + if strings.Contains(query, " by (") { + if strings.Contains(query, "topk") || strings.Contains(query, "bottomk") { + return ChartSuggestion{ + Type: "bar", + Reason: "Query uses top/bottom k aggregation which is best shown as a bar chart", + } + } + return ChartSuggestion{ + Type: "pie", + Reason: "Query uses grouping which can be effectively shown as a pie chart", + } + } + + // Check for single value or gauge-like metrics + if result.Data.ResultType == "vector" && len(result.Data.Result) == 1 { + if strings.Contains(strings.ToLower(query), "total") || + strings.Contains(strings.ToLower(query), "sum") { + return ChartSuggestion{ + Type: "gauge", + Reason: "Query returns a single total/sum value suitable for a gauge", + } + } + } + + // Check for complex multi-metric results + if result.Data.ResultType == "matrix" || + (result.Data.ResultType == "vector" && len(result.Data.Result) > 10) { + return ChartSuggestion{ + Type: "table", + Reason: "Query returns multiple metrics which are best viewed in a table", + } + } + + // Check for hierarchical data + if strings.Contains(query, "count") && strings.Contains(query, " by (") { + return ChartSuggestion{ + Type: "tree", + Reason: "Query counts by labels which can be shown hierarchically", + } + } + + // Default to time-series for most other cases + return ChartSuggestion{ + Type: "time-series", + Reason: "Default visualization for metric data over time", + } +} + +func (h *Handlers) HandleExecute(c echo.Context) error { + var req ExecuteRequest + if err := c.Bind(&req); err != nil { + return echo.NewHTTPError(http.StatusBadRequest, "Invalid request format") + } + + if strings.TrimSpace(req.Query) == "" { + return echo.NewHTTPError(http.StatusBadRequest, "Query cannot be empty") + } + + ctx := c.Request().Context() + var result *prometheus.QueryResult + var err error + + if req.Start != nil && req.End != nil { + step := time.Minute + if req.Step != "" { + step, err = time.ParseDuration(req.Step) + if err != nil { + return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid step: %v", err)) + } + } + result, err = h.promClient.QueryRange(ctx, req.Query, *req.Start, *req.End, step) + } else { + result, err = h.promClient.QueryInstant(ctx, req.Query, req.Timestamp) + } + + if err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + + // Predict chart type based on query and result + chartSuggestion := predictChartType(req.Query, result) + + response := ExecuteResponse{ + Status: "success", + Data: result, + SuggestedChart: chartSuggestion, + } + + return c.JSON(http.StatusOK, response) +} diff --git a/internal/server/server.go b/internal/server/server.go index 2d3a711..a4ada49 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -1,3 +1,4 @@ +// internal/server/server.go package server import ( @@ -5,6 +6,7 @@ import ( "net/http" "time" + "github.com/agentkube/txt2promql/internal/core/knowledgegraph" prometheus "github.com/agentkube/txt2promql/internal/prometheus" "github.com/agentkube/txt2promql/internal/provider/openai" handlers "github.com/agentkube/txt2promql/internal/server/handlers" @@ -67,7 +69,7 @@ func MetricsMiddleware(next echo.HandlerFunc) echo.HandlerFunc { } } -func RegisterHandlers(e *echo.Echo, promClient *prometheus.Client) error { +func RegisterHandlers(e *echo.Echo, promClient *prometheus.Client, kgClient *knowledgegraph.Client) error { // Load AI configuration var aiConfig openai.Config if err := viper.UnmarshalKey("ai", &aiConfig); err != nil { @@ -80,23 +82,48 @@ func RegisterHandlers(e *echo.Echo, promClient *prometheus.Client) error { return fmt.Errorf("initializing OpenAI client: %w", err) } - // Initialize handlers with both clients - h := handlers.New(promClient, openaiClient) - - // Apply middleware + h := handlers.New(promClient, openaiClient, kgClient) + // middleware e.Use(MetricsMiddleware) - // Core routes + // routes e.GET("/metrics", echo.WrapHandler(promhttp.Handler())) e.GET("/health", h.HandleHealth) // API routes api := e.Group("/api/v1") { - api.POST("/convert", h.HandleConvert) + api.POST("/convert", h.HandleConvert) //TODO high chances of request failure when same statement flows in. api.POST("/validate", h.HandleValidate) + api.POST("/execute", h.HandleExecute) api.GET("/metrics", h.HandleListMetrics) } return nil } + +//TODO high chances of request failure when same statement flows in. +// { +// "message": "Failed to process query" +// } +// { +// "message": "Invalid PromQL query: invalid query: unexpected status code: 400" +// } + +// Agent Response: +// ```json +// [ +// { +// "metric": "prometheus_http_request_duration_seconds_sum", +// "labels": {"handler": "/api/v1/query_range"}, +// "timeRange": "5m", +// "aggregation": "avg" +// }, +// { +// "metric": "prometheus_http_request_duration_seconds_count", +// "labels": {"handler": "/api/v1/query_range"}, +// "timeRange": "5m", +// "aggregation": "avg" +// } +// ] +// ``` From 1940dc4730618af15ab1c3de6dd8275d6704833a Mon Sep 17 00:00:00 2001 From: siddhantprateek Date: Fri, 7 Feb 2025 21:12:51 +0530 Subject: [PATCH 2/2] stage --- cmd/api/main.go | 34 +----- internal/agent/context.go | 1 - internal/core/knowledgegraph/client.go | 140 ------------------------- internal/server/handlers/handlers.go | 14 +-- internal/server/server.go | 5 +- 5 files changed, 7 insertions(+), 187 deletions(-) delete mode 100644 internal/core/knowledgegraph/client.go diff --git a/cmd/api/main.go b/cmd/api/main.go index 8887bd1..65d3756 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -1,12 +1,9 @@ package main import ( - "context" "fmt" "os" - "github.com/agentkube/txt2promql/internal/config" - "github.com/agentkube/txt2promql/internal/core/knowledgegraph" "github.com/agentkube/txt2promql/internal/prometheus" "github.com/agentkube/txt2promql/internal/server" "github.com/labstack/echo/v4" @@ -39,35 +36,6 @@ func main() { // Initialize Prometheus client promClient := prometheus.NewClient() - cfg, err := config.LoadConfig() - if err != nil { - fmt.Printf("Error loading config: %v\n", err) - os.Exit(1) - } - - var kgClient *knowledgegraph.Client - if cfg.KG.GraphEnabled { - kgClient, err = knowledgegraph.NewClient(&knowledgegraph.Config{ - Enabled: true, - URI: cfg.KG.GraphURI, - User: cfg.KG.GraphUser, - Password: cfg.KG.GraphPassword, - Database: cfg.KG.GraphDatabase, - }) - if err != nil { - fmt.Printf("Warning: Failed to initialize knowledge graph: %v\n", err) - // Continue without knowledge graph - } else { - // Test connection - if err := kgClient.Connect(context.Background()); err != nil { - fmt.Printf("Warning: Failed to connect to knowledge graph: %v\n", err) - kgClient = nil // Disable knowledge graph functionality - } else { - fmt.Println("Successfully connected to knowledge graph") - } - } - } - // Initialize Echo instance e := echo.New() @@ -77,7 +45,7 @@ func main() { e.Use(middleware.CORS()) // Register handlers - if err := server.RegisterHandlers(e, promClient, kgClient); err != nil { + if err := server.RegisterHandlers(e, promClient); err != nil { fmt.Printf("Error registering handlers: %v\n", err) os.Exit(1) } diff --git a/internal/agent/context.go b/internal/agent/context.go index f0c73cf..d3238ba 100644 --- a/internal/agent/context.go +++ b/internal/agent/context.go @@ -1,4 +1,3 @@ -// internal/agent/context.go package agent import ( diff --git a/internal/core/knowledgegraph/client.go b/internal/core/knowledgegraph/client.go deleted file mode 100644 index 3e375e5..0000000 --- a/internal/core/knowledgegraph/client.go +++ /dev/null @@ -1,140 +0,0 @@ -// internal/core/knowledgegraph/client.go -package knowledgegraph - -import ( - "context" - "fmt" - "time" - - "github.com/neo4j/neo4j-go-driver/v5/neo4j" -) - -type Client struct { - enabled bool - driver neo4j.DriverWithContext - config *Config -} - -type Config struct { - Enabled bool - URI string - User string - Password string - Database string -} - -type MetricInfo struct { - Name string `json:"name"` - Type string `json:"type"` - Description string `json:"description"` - Labels map[string]string `json:"labels"` - SimilarTo []string `json:"similar_to,omitempty"` -} - -// NewClient creates a new knowledge graph client -func NewClient(config *Config) (*Client, error) { - client := &Client{ - enabled: config.Enabled, - config: config, - } - - if !config.Enabled { - return client, nil - } - - driver, err := neo4j.NewDriverWithContext( - config.URI, - neo4j.BasicAuth(config.User, config.Password, ""), - func(c *neo4j.Config) { - c.MaxConnectionLifetime = 30 * time.Minute - c.MaxConnectionPoolSize = 50 - }) - if err != nil { - return nil, fmt.Errorf("creating neo4j driver: %w", err) - } - - client.driver = driver - return client, nil -} - -func (c *Client) Close(ctx context.Context) error { - if !c.enabled || c.driver == nil { - return nil - } - return c.driver.Close(ctx) -} - -func (c *Client) Connect(ctx context.Context) error { - if !c.enabled || c.driver == nil { - return nil - } - - maxRetries := 5 - for i := 0; i < maxRetries; i++ { - err := c.driver.VerifyConnectivity(ctx) - if err == nil { - return nil - } - fmt.Printf("Attempt %d: Failed to connect to Neo4j: %v\n", i+1, err) - time.Sleep(time.Second * 2) // Wait before retrying - } - - return c.driver.VerifyConnectivity(ctx) -} - -func (c *Client) FindSimilarMetrics(ctx context.Context, metricName string) ([]MetricInfo, error) { - if !c.enabled || c.driver == nil { - return nil, nil - } - - session := c.driver.NewSession(ctx, neo4j.SessionConfig{ - DatabaseName: c.config.Database, - AccessMode: neo4j.AccessModeRead, - }) - defer session.Close(ctx) - - result, err := neo4j.ExecuteRead[[]MetricInfo](ctx, session, - func(tx neo4j.ManagedTransaction) ([]MetricInfo, error) { - query := ` - MATCH (m:Metric {name: $name})-[r:RELATED]-(similar:Metric) - RETURN similar, r.weight as weight - ORDER BY r.weight DESC - LIMIT 5 - ` - result, err := tx.Run(ctx, query, map[string]any{"name": metricName}) - if err != nil { - return nil, err - } - - var metrics []MetricInfo - records, err := result.Collect(ctx) - if err != nil { - return nil, err - } - - for _, record := range records { - node, ok := record.Values[0].(neo4j.Node) - if !ok { - continue - } - - metric := MetricInfo{ - Name: node.Props["name"].(string), - Type: node.Props["type"].(string), - Description: node.Props["description"].(string), - } - if labels, ok := node.Props["labels"].(map[string]string); ok { - metric.Labels = labels - } - metrics = append(metrics, metric) - } - - return metrics, nil - }) - - if err != nil { - return nil, fmt.Errorf("finding similar metrics: %w", err) - } - - return result, nil -} diff --git a/internal/server/handlers/handlers.go b/internal/server/handlers/handlers.go index 100d188..bfc772e 100644 --- a/internal/server/handlers/handlers.go +++ b/internal/server/handlers/handlers.go @@ -8,7 +8,6 @@ import ( "time" "github.com/agentkube/txt2promql/internal/agent" - "github.com/agentkube/txt2promql/internal/core/knowledgegraph" "github.com/agentkube/txt2promql/internal/prometheus" "github.com/agentkube/txt2promql/internal/provider/openai" "github.com/labstack/echo/v4" @@ -16,7 +15,6 @@ import ( type Handlers struct { promClient *prometheus.Client - kgClient *knowledgegraph.Client contextExtractor *agent.ContextExtractor explainer *agent.Explainer queryBuilder *agent.QueryBuilder @@ -24,10 +22,9 @@ type Handlers struct { lastCacheTime time.Time } -func New(promClient *prometheus.Client, openaiClient *openai.OpenAIClient, kgClient *knowledgegraph.Client) *Handlers { +func New(promClient *prometheus.Client, openaiClient *openai.OpenAIClient) *Handlers { return &Handlers{ promClient: promClient, - kgClient: kgClient, contextExtractor: agent.NewContextExtractor(openaiClient), explainer: agent.NewExplainer(openaiClient), queryBuilder: agent.NewQueryBuilder(), @@ -40,12 +37,9 @@ type ConvertRequest struct { } type ConvertResponse struct { - PromQL string `json:"promql"` - Explanation string `json:"explanation,omitempty"` - Warnings []string `json:"warnings,omitempty"` - SimilarMetrics []knowledgegraph.MetricInfo `json:"similar_metrics,omitempty"` - PatternExplanation string `json:"pattern_explanation,omitempty"` - NavigationPath []string `json:"navigation_path,omitempty"` + PromQL string `json:"promql"` + Explanation string `json:"explanation,omitempty"` + Warnings []string `json:"warnings,omitempty"` } func (h *Handlers) HandleConvert(c echo.Context) error { diff --git a/internal/server/server.go b/internal/server/server.go index a4ada49..f8dc69f 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -6,7 +6,6 @@ import ( "net/http" "time" - "github.com/agentkube/txt2promql/internal/core/knowledgegraph" prometheus "github.com/agentkube/txt2promql/internal/prometheus" "github.com/agentkube/txt2promql/internal/provider/openai" handlers "github.com/agentkube/txt2promql/internal/server/handlers" @@ -69,7 +68,7 @@ func MetricsMiddleware(next echo.HandlerFunc) echo.HandlerFunc { } } -func RegisterHandlers(e *echo.Echo, promClient *prometheus.Client, kgClient *knowledgegraph.Client) error { +func RegisterHandlers(e *echo.Echo, promClient *prometheus.Client) error { // Load AI configuration var aiConfig openai.Config if err := viper.UnmarshalKey("ai", &aiConfig); err != nil { @@ -82,7 +81,7 @@ func RegisterHandlers(e *echo.Echo, promClient *prometheus.Client, kgClient *kno return fmt.Errorf("initializing OpenAI client: %w", err) } - h := handlers.New(promClient, openaiClient, kgClient) + h := handlers.New(promClient, openaiClient) // middleware e.Use(MetricsMiddleware)