Skip to content

Commit

Permalink
refactor: Refactor Go architecture (#43)
Browse files Browse the repository at this point in the history
* refactor: Refactor Go architecture

Refactoring Go SDK to be closer to the API and internal representation
that we have with the Rust SDK. Just doing Datadog at the moment and
seeking some feedback then i'll port the others and add docs and tests.

* fix code formatting

* fix formatting

* combine new and init

* remove vistigial parts of adapter struct

* Add back and refactor other adapters

* move some more out of trace_ctx

* Add 128bit support to TelemetryId, add comments

* fix datadog adapter

* apply tid changes from comments

* Fix otel adapter memory grow events

* fix: include memory grow events as allocation attributes

---------

Co-authored-by: Steve Manuel <[email protected]>
  • Loading branch information
bhelx and nilslice authored Jul 19, 2023
1 parent 2ef94e6 commit c21807b
Show file tree
Hide file tree
Showing 16 changed files with 523 additions and 543 deletions.
140 changes: 39 additions & 101 deletions go/adapter.go
Original file line number Diff line number Diff line change
@@ -1,125 +1,63 @@
package observe

import (
"bytes"
"errors"
"fmt"
"math/rand"
"time"
"context"

"github.com/tetratelabs/wabin/leb128"
"github.com/tetratelabs/wabin/wasm"
"github.com/tetratelabs/wazero"
)

// The primary interface that every Adapter needs to follow
// Start() and Stop() can just call the implementations on AdapterBase
// or provide some custom logic. HandleTraceEvent is called after
// an invocation of a wasm module is done and all events are collected.
type Adapter interface {
Start(collector *Collector, wasm []byte) error
Stop(collector *Collector)
Event(Event)
Start()
Stop()
HandleTraceEvent(TraceEvent)
}

type AdapterBase struct {
Collectors map[*Collector]chan bool
// The payload that contains all the Events
// from a single wasm module invocation
type TraceEvent struct {
Events []Event
TelemetryId TelemetryId
}

func checkVersion(m *wasm.Module) error {
var minorGlobal *wasm.Export = nil
var majorGlobal *wasm.Export = nil
for _, export := range m.ExportSection {
if export.Type != wasm.ExternTypeGlobal {
continue
}

if export.Name == "wasm_instr_version_minor" {
minorGlobal = export
} else if export.Name == "wasm_instr_version_major" {
majorGlobal = export
}
}

if minorGlobal == nil || majorGlobal == nil {
return errors.New("wasm_instr_version functions not found")
}

minor, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[minorGlobal.Index].Init.Data))
if err != nil {
return err
}
major, _, err := leb128.DecodeUint32(bytes.NewReader(m.GlobalSection[majorGlobal.Index].Init.Data))
if err != nil {
return err
}

if major != wasmInstrVersionMajor || minor < wasmInstrVersionMinor {
return errors.New(fmt.Sprintf("Expected instrumentation version >= %d.%d but got %d.%d", wasmInstrVersionMajor, wasmInstrVersionMinor, major, minor))
}

return nil
// Shared implementation for all Adapters
type AdapterBase struct {
TraceEvents chan TraceEvent
stop chan bool
}

func (a *AdapterBase) Wait(collector *Collector, timeout time.Duration, callback func()) {
for {
select {
case <-time.After(timeout):
if len(collector.Events) > 0 {
if callback != nil {
callback()
}
continue
}
a.RemoveCollector(collector)
return
}
func (a *AdapterBase) NewTraceCtx(ctx context.Context, r wazero.Runtime, wasm []byte, config *Config) (*TraceCtx, error) {
if config == nil {
config = NewDefaultConfig()
}
return newTraceCtx(ctx, a, r, wasm, config)
}

func NewAdapterBase() AdapterBase {
a := AdapterBase{
Collectors: map[*Collector]chan bool{},
}
return a
}

func (a *AdapterBase) Start(collector *Collector, wasm []byte) error {
a.Collectors[collector] = make(chan bool, 1)
return collector.GetNames(wasm)
}

func (a *AdapterBase) RemoveCollector(collector *Collector) {
delete(a.Collectors, collector)
}

func (a *AdapterBase) Stop(collector *Collector) {
stop, ok := a.Collectors[collector]
if ok {
stop <- true
a.RemoveCollector(collector)
return AdapterBase{
// TODO set to some kind of max, add dump logic
TraceEvents: make(chan TraceEvent, 100),
}
}

func (a AdapterBase) StopChan(collector *Collector) chan bool {
return a.Collectors[collector]
}

type TelemetryId uint64

var rng rand.Source
func (b *AdapterBase) Start(a Adapter) {
b.stop = make(chan bool)

func init() {
rng = rand.NewSource(time.Now().UnixNano())
}

func NewTraceId() TelemetryId {
return TelemetryId(rng.Int63())
}

func NewSpanId() TelemetryId {
return TelemetryId(rng.Int63())
}

func (t TelemetryId) ToHex8() string {
return fmt.Sprintf("%016x", t)
go func() {
for {
select {
case event := <-b.TraceEvents:
a.HandleTraceEvent(event)
case <-b.stop:
return
}
}
}()
}

func (t TelemetryId) ToHex16() string {
return fmt.Sprintf("%032x", t)
func (b *AdapterBase) Stop() {
b.stop <- true
}
100 changes: 32 additions & 68 deletions go/adapter/datadog/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"log"
"net/http"
"net/url"
"strconv"
"time"

"github.com/dylibso/observe-sdk/go"
"github.com/dylibso/observe-sdk/go/adapter/datadog_formatter"
Expand All @@ -31,9 +29,7 @@ func DefaultDatadogConfig() *DatadogConfig {

type DatadogAdapter struct {
observe.AdapterBase
TraceId uint64
Spans []datadog_formatter.Span
Config *DatadogConfig
Config *DatadogConfig
}

func NewDatadogAdapter(config *DatadogConfig) (DatadogAdapter, error) {
Expand All @@ -43,76 +39,48 @@ func NewDatadogAdapter(config *DatadogConfig) (DatadogAdapter, error) {

return DatadogAdapter{
AdapterBase: observe.NewAdapterBase(),
TraceId: uint64(observe.NewTraceId()),
Config: config,
}, nil
}

func (d *DatadogAdapter) Event(e observe.Event) {
switch event := e.(type) {
case observe.CallEvent:
spans := d.makeCallSpans(event, nil)
if len(spans) > 0 {
d.Spans = append(d.Spans, spans...)
}

case observe.MemoryGrowEvent:
if len(d.Spans) > 0 {
d.Spans[len(d.Spans)-1].AddAllocation(event.MemoryGrowAmount())
}
case observe.CustomEvent:
if value, ok := event.Metadata["trace_id"]; ok {
traceId, err := strconv.ParseUint(value.(string), 10, 64)
if err != nil {
log.Println("failed to parse traceId from event metadata:", err)
return
}

d.TraceId = traceId
}
}
func (d *DatadogAdapter) Start() {
d.AdapterBase.Start(d)
}

func (d *DatadogAdapter) Wait(collector *observe.Collector, timeout time.Duration) {
d.AdapterBase.Wait(collector, timeout, nil)
func (d *DatadogAdapter) Stop() {
d.AdapterBase.Stop()
}

func (d *DatadogAdapter) Start(collector *observe.Collector, wasm []byte) error {
if err := d.AdapterBase.Start(collector, wasm); err != nil {
return err
}

stop := d.StopChan(collector)

go func() {
for {
select {
case event := <-collector.Events:
d.Event(event)
case <-stop:
return
func (d *DatadogAdapter) HandleTraceEvent(te observe.TraceEvent) {
var allSpans []*datadog_formatter.Span
for _, e := range te.Events {
switch event := e.(type) {
case observe.CallEvent:
traceId := te.TelemetryId.ToUint64()
spans := d.makeCallSpans(event, nil, traceId)
if len(spans) > 0 {
allSpans = append(allSpans, spans...)
}
case observe.MemoryGrowEvent:
log.Println("MemoryGrowEvent should be attached to a span")
case observe.CustomEvent:
log.Println("Datadog adapter does not respect custom events")
}
}()

return nil
}

func (d *DatadogAdapter) Stop(collector *observe.Collector) {
d.AdapterBase.Stop(collector)
}

if len(d.Spans) == 0 {
if len(allSpans) <= 1 {
log.Println("No spans built for datadog trace")
return
}

go func() {
output := datadog_formatter.New()
// TODO: for the moment, these are hard-coded, but will transition to a programmer-
// controlled API to customer these values.
d.Spans[0].Resource = "request"
allSpans[0].Resource = "request"
tt := d.Config.TraceType.String()
d.Spans[0].Type = &tt
output.AddTrace(d.Spans)
allSpans[0].Type = &tt
output.AddTrace(allSpans)

b, err := json.Marshal(output)
if err != nil {
Expand Down Expand Up @@ -140,28 +108,24 @@ func (d *DatadogAdapter) Stop(collector *observe.Collector) {
}()
}

func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *uint64) []datadog_formatter.Span {
func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *uint64, traceId uint64) []*datadog_formatter.Span {
name := event.FunctionName()
span := datadog_formatter.NewSpan(d.Config.ServiceName, d.TraceId, parentId, name, event.Time, event.Time.Add(event.Duration))
span := datadog_formatter.NewSpan(d.Config.ServiceName, traceId, parentId, name, event.Time, event.Time.Add(event.Duration))

spans := []datadog_formatter.Span{*span}
spans := []*datadog_formatter.Span{span}
for _, ev := range event.Within() {
if call, ok := ev.(observe.CallEvent); ok {
spans = append(spans, d.makeCallSpans(call, &span.SpanId)...)
spans = append(spans, d.makeCallSpans(call, &span.SpanId, traceId)...)
}
if alloc, ok := ev.(observe.MemoryGrowEvent); ok {
span := spans[len(spans)-1]
span.AddAllocation(alloc.MemoryGrowAmount())
}
}

return spans
}

func NewTraceId() uint64 {
return uint64(observe.NewTraceId())
}

func (d *DatadogAdapter) SetTraceId(traceId uint64) {
d.TraceId = traceId
}

type DatadogSpanKind int

const (
Expand Down
4 changes: 2 additions & 2 deletions go/adapter/datadog_formatter/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

type DatadogFormatter []Trace

type Trace []Span
type Trace []*Span

type Span struct {
TraceId uint64 `json:"trace_id"`
Expand All @@ -29,7 +29,7 @@ type Span struct {
func NewSpan(service string, traceId uint64, parentId *uint64, name string, start, end time.Time) *Span {
id := observe.NewSpanId()
span := Span{
SpanId: uint64(id),
SpanId: id.ToUint64(),
ParentId: parentId,
TraceId: traceId,
Name: name,
Expand Down
6 changes: 3 additions & 3 deletions go/adapter/otel_formatter/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (r *ResourceSpan) AddAttribute(key string, value any) *ResourceSpan {
return r
}

func (r *ResourceSpan) AddSpans(spans []Span) {
func (r *ResourceSpan) AddSpans(spans []*Span) {
r.ScopeSpans = append(r.ScopeSpans, ScopeSpan{
Scope: Scope{
Name: "event",
Expand All @@ -46,8 +46,8 @@ type Resource struct {
}

type ScopeSpan struct {
Scope Scope `json:"scope"`
Spans []Span `json:"spans"`
Scope Scope `json:"scope"`
Spans []*Span `json:"spans"`
}

type Attribute struct {
Expand Down
Loading

0 comments on commit c21807b

Please sign in to comment.