Skip to content

Commit

Permalink
How to fix the indexing problem
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrytfleung committed May 24, 2024
1 parent 3357a97 commit 7b1cc37
Show file tree
Hide file tree
Showing 8 changed files with 509 additions and 28 deletions.
4 changes: 3 additions & 1 deletion collector/internal/tools/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/open-telemetry/opentelemetry-lambda/collector/internal/tools

go 1.19
go 1.21

toolchain go1.22.2

require (
github.com/client9/misspell v0.3.4
Expand Down
3 changes: 0 additions & 3 deletions collector/receiver/telemetryapireceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@ go 1.21.0

toolchain go1.21.4

replace github.com/open-telemetry/opentelemetry-lambda/collector => ../../

require (
github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259
github.com/open-telemetry/opentelemetry-lambda/collector v0.98.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.101.0
go.opentelemetry.io/collector/consumer v0.101.0
Expand Down
139 changes: 139 additions & 0 deletions collector/receiver/telemetryapireceiver/go.sum

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetryapireceiver

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"

"go.uber.org/zap"
)

const (
SchemaVersion20220701 = "2022-07-01"
SchemaVersionLatest = SchemaVersion20220701
lambdaAgentIdentifierHeaderKey = "Lambda-Extension-Identifier"
)

type Client struct {
logger *zap.Logger
httpClient *http.Client
baseURL string
}

func NewClient(logger *zap.Logger) *Client {
return &Client{
logger: logger.Named("telemetryAPI.Client"),
httpClient: &http.Client{},
baseURL: fmt.Sprintf("http://%s/%s/telemetry", os.Getenv("AWS_LAMBDA_RUNTIME_API"), SchemaVersionLatest),
}
}

func (c *Client) Subscribe(ctx context.Context, extensionID string, listenerURI string) (string, error) {
eventTypes := []EventType{
Platform,
// Function,
// Extension,
}

bufferingConfig := BufferingCfg{
MaxItems: 1000,
MaxBytes: 256 * 1024,
TimeoutMS: 25,
}

destination := Destination{
Protocol: HttpProto,
HttpMethod: HttpPost,
Encoding: JSON,
URI: URI(listenerURI),
}

data, err := json.Marshal(
&SubscribeRequest{
SchemaVersion: SchemaVersionLatest,
EventTypes: eventTypes,
BufferingCfg: bufferingConfig,
Destination: destination,
})

if err != nil {
return "", fmt.Errorf("Failed to marshal SubscribeRequest: %w", err)
}

headers := make(map[string]string)
headers[lambdaAgentIdentifierHeaderKey] = extensionID

c.logger.Info("Subscribing", zap.String("baseURL", c.baseURL))
resp, err := httpPutWithHeaders(ctx, c.httpClient, c.baseURL, data, headers)
if err != nil {
c.logger.Error("Subscription failed", zap.Error(err))
return "", err
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusAccepted {
c.logger.Error("Subscription failed. Logs API is not supported! Is this extension running in a local sandbox?", zap.Int("status_code", resp.StatusCode))
} else if resp.StatusCode != http.StatusOK {
c.logger.Error("Subscription failed")
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("request to %s failed: %d[%s]: %w", c.baseURL, resp.StatusCode, resp.Status, err)
}

return "", fmt.Errorf("request to %s failed: %d[%s] %s", c.baseURL, resp.StatusCode, resp.Status, string(body))
}

body, _ := io.ReadAll(resp.Body)
c.logger.Info("Subscription success", zap.String("response", string(body)))

return string(body), nil
}

func httpPutWithHeaders(ctx context.Context, client *http.Client, url string, data []byte, headers map[string]string) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewBuffer(data))
if err != nil {
return nil, err
}

contentType := "application/json"
req.Header.Set("Content-Type", contentType)
for k, v := range headers {
req.Header.Set(k, v)
}

return client.Do(req)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetryapireceiver

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"time"

"github.com/golang-collections/go-datastructures/queue"
"go.uber.org/zap"
)

const defaultListenerPort = "53612"
const initialQueueSize = 5

// Listener is used to listen to the Telemetry API
type Listener struct {
httpServer *http.Server
logger *zap.Logger
// queue is a synchronous queue and is used to put the received log events to be dispatched later
queue *queue.Queue
}

func NewListener(logger *zap.Logger) *Listener {
return &Listener{
httpServer: nil,
logger: logger.Named("telemetryAPI.Listener"),
queue: queue.New(initialQueueSize),
}
}

func listenOnAddress() string {
envAwsLocal, ok := os.LookupEnv("AWS_SAM_LOCAL")
var addr string
if ok && envAwsLocal == "true" {
addr = ":" + defaultListenerPort
} else {
addr = "sandbox.localdomain:" + defaultListenerPort
}

return addr
}

// Start the server in a goroutine where the log events will be sent
func (s *Listener) Start() (string, error) {
address := listenOnAddress()
s.logger.Info("Listening for requests", zap.String("address", address))
s.httpServer = &http.Server{Addr: address}
http.HandleFunc("/", s.httpHandler)
go func() {
err := s.httpServer.ListenAndServe()
if err != http.ErrServerClosed {
s.logger.Error("Unexpected stop on HTTP Server", zap.Error(err))
s.Shutdown()
} else {
s.logger.Info("HTTP Server closed:", zap.Error(err))
}
}()
return fmt.Sprintf("http://%s/", address), nil
}

// httpHandler handles the requests coming from the Telemetry API.
// Everytime Telemetry API sends log events, this function will read them from the response body
// and put into a synchronous queue to be dispatched later.
// Logging or printing besides the error cases below is not recommended if you have subscribed to
// receive extension logs. Otherwise, logging here will cause Telemetry API to send new logs for
// the printed lines which may create an infinite loop.
func (s *Listener) httpHandler(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
s.logger.Error("error reading body", zap.Error(err))
return
}

// Parse and put the log messages into the queue
var slice []Event
_ = json.Unmarshal(body, &slice)

for _, el := range slice {
if err := s.queue.Put(el); err != nil {
s.logger.Error("Failed to put event in queue", zap.Error(err))
}
}

s.logger.Debug("logEvents received", zap.Int("count", len(slice)), zap.Int64("queue_length", s.queue.Len()))
slice = nil
}

// Shutdown the HTTP server listening for logs
func (s *Listener) Shutdown() {
if s.httpServer != nil {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err := s.httpServer.Shutdown(ctx)
if err != nil {
s.logger.Error("Failed to shutdown HTTP server gracefully", zap.Error(err))
} else {
s.httpServer = nil
}
}
}

func (s *Listener) Wait(ctx context.Context, reqID string) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
s.logger.Debug("looking for platform.runtimeDone event")
items, err := s.queue.Get(10)
if err != nil {
return fmt.Errorf("unable to get telemetry events from queue: %w", err)
}

for _, item := range items {
i, ok := item.(Event)
if !ok {
s.logger.Warn("non-Event found in queue", zap.Any("item", item))
continue
}
s.logger.Debug("Event processed", zap.Any("event", i))
if i.Type != "platform.runtimeDone" {
continue
}

if i.Record["requestId"] == reqID {
return nil
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetryapireceiver

// EventType represents the type of log events in Lambda
type EventType string

const (
// Platform is used to receive log events emitted by the Lambda platform
Platform EventType = "platform"
// PlatformInitStart is used when function initialization started.
PlatformInitStart EventType = Platform + ".initStart"
// PlatformInitRuntimeDone is used when function initialization ended.
PlatformInitRuntimeDone EventType = Platform + ".initRuntimeDone"
// Function is used to receive log events emitted by the function
Function EventType = "function"
// Extension is used is to receive log events emitted by the extension
Extension EventType = "extension"
)

// BufferingCfg holds configuration for receiving telemetry from the Telemetry API.
// Telemetry will be sent to your listener when one of the conditions below is met.
type BufferingCfg struct {
// Maximum number of log events to be buffered in memory. (default: 10000, minimum: 1000, maximum: 10000)
MaxItems uint32 `json:"maxItems"`
// Maximum size in bytes of the log events to be buffered in memory. (default: 262144, minimum: 262144, maximum: 1048576)
MaxBytes uint32 `json:"maxBytes"`
// Maximum time (in milliseconds) for a batch to be buffered. (default: 1000, minimum: 100, maximum: 30000)
TimeoutMS uint32 `json:"timeoutMs"`
}

// URI is used to set the endpoint where the logs will be sent to
type URI string

// HTTPMethod represents the HTTP method used to receive events from the Telemetry API
type HTTPMethod string

const (
// Receive log events via POST requests to the listener
HttpPost HTTPMethod = "POST"
// Receive log events via PUT requests to the listener
HttpPut HTTPMethod = "PUT"
)

// Used to specify the protocol when subscribing to Telemetry API for HTTP
type HTTPProtocol string

const (
HttpProto HTTPProtocol = "HTTP"
)

// Denotes what the content is encoded in
type HTTPEncoding string

const (
JSON HTTPEncoding = "JSON"
)

// Configuration for listeners that would like to receive telemetry via HTTP
type Destination struct {
Protocol HTTPProtocol `json:"protocol"`
URI URI `json:"URI"`
HttpMethod HTTPMethod `json:"method"`
Encoding HTTPEncoding `json:"encoding"`
}

type SchemaVersion string

// Request body that is sent to the Telemetry API on subscribe
type SubscribeRequest struct {
SchemaVersion SchemaVersion `json:"schemaVersion"`
EventTypes []EventType `json:"types"`
BufferingCfg BufferingCfg `json:"buffering"`
Destination Destination `json:"destination"`
}

type Event struct {
Time string `json:"time"`
Type string `json:"type"`
Record map[string]any `json:"record"`
}
Loading

0 comments on commit 7b1cc37

Please sign in to comment.