Skip to content

Commit

Permalink
refactor: update to use events from messaging
Browse files Browse the repository at this point in the history
Signed-off-by: Felix Gateru <[email protected]>
  • Loading branch information
felixgateru committed Jan 30, 2025
1 parent 457ef9d commit 0d127da
Show file tree
Hide file tree
Showing 11 changed files with 347 additions and 192 deletions.
7 changes: 0 additions & 7 deletions cmd/mqtt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,6 @@ func main() {
defer bsub.Close()
bsub = brokerstracing.NewPubSub(serverConfig, tracer, bsub)

bsub, err = msgevents.NewPubSubMiddleware(ctx, bsub, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

mpub, err := mqttpub.NewPublisher(fmt.Sprintf("mqtt://%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTQoS, cfg.MQTTForwarderTimeout)
if err != nil {
logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err))
Expand Down
7 changes: 4 additions & 3 deletions coap/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic

authzc := newAuthzClient(clientID, chanID, subtopic, svc.channels, c)
subCfg := messaging.SubscriberConfig{
ID: c.Token(),
Topic: subject,
Handler: authzc,
ID: c.Token(),
ClientID: clientID,
Topic: subject,
Handler: authzc,
}
return svc.pubsub.Subscribe(ctx, subCfg)
}
Expand Down
20 changes: 14 additions & 6 deletions journal/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,21 @@ func (page JournalsPage) MarshalJSON() ([]byte, error) {
type ClientTelemetry struct {
ClientID string `json:"client_id"`
DomainID string `json:"domain_id"`
Subscriptions []string `json:"subscriptions"`
Subscriptions uint64 `json:"subscriptions"`
InboundMessages uint64 `json:"inbound_messages"`
OutboundMessages uint64 `json:"outbound_messages"`
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
}

type ClientSubscription struct {
ID string `json:"id" db:"id"`
SubscriberID string `json:"subscriber_id" db:"subscriber_id"`
ChannelID string `json:"channel_id" db:"channel_id"`
Subtopic string `json:"subtopic" db:"subtopic"`
ClientID string `json:"client_id" db:"client_id"`
}

// Service provides access to the journal log service.
//
//go:generate mockery --name Service --output=./mocks --filename service.go --quiet --note "Copyright (c) Abstract Machines"
Expand Down Expand Up @@ -181,13 +189,13 @@ type Repository interface {
DeleteClientTelemetry(ctx context.Context, clientID, domainID string) error

// AddSubscription adds a subscription to the client telemetry.
AddSubscription(ctx context.Context, clientID, sub string) error
AddSubscription(ctx context.Context, sub ClientSubscription) error

// RemoveSubscription removes a subscription from the client telemetry.
RemoveSubscription(ctx context.Context, clientID, sub string) error
// CountSubscriptions returns the number of subscriptions for a client.
CountSubscriptions(ctx context.Context, clientID string) (uint64, error)

// RemoveSubscriptionWithConnID removes a subscription from the client telemetry using the connection ID.
RemoveSubscriptionWithConnID(ctx context.Context, connID, clientID string) error
// RemoveSubscription removes a subscription from the client telemetry.
RemoveSubscription(ctx context.Context, subscriberID string) error

// IncrementInboundMessages increments the inbound messages count for a client.
IncrementInboundMessages(ctx context.Context, clientID string) error
Expand Down
66 changes: 38 additions & 28 deletions journal/mocks/repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 11 additions & 5 deletions journal/postgres/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,25 @@ func Migration() *migrate.MemoryMigrationSource {
`CREATE INDEX idx_journal_default_client_filter ON journal(operation, (attributes->>'id'), (attributes->>'client_id'), occurred_at DESC);`,
`CREATE INDEX idx_journal_default_channel_filter ON journal(operation, (attributes->>'id'), (attributes->>'channel_id'), occurred_at DESC);`,
`CREATE TABLE IF NOT EXISTS clients_telemetry (
client_id VARCHAR(36) NOT NULL,
client_id VARCHAR(36) PRIMARY KEY,
domain_id VARCHAR(36) NOT NULL,
subscriptions TEXT[] DEFAULT '{}',
inbound_messages BIGINT DEFAULT 0,
outbound_messages BIGINT DEFAULT 0,
first_seen TIMESTAMP,
last_seen TIMESTAMP,
PRIMARY KEY (client_id, domain_id)
last_seen TIMESTAMP
)`,
`CREATE TABLE IF NOT EXISTS subscriptions (
id VARCHAR(36) PRIMARY KEY,
subscriber_id VARCHAR(1024) NOT NULL,
channel_id VARCHAR(36) NOT NULL,
subtopic VARCHAR(1024),
client_id VARCHAR(36),
FOREIGN KEY (client_id) REFERENCES clients_telemetry(client_id) ON DELETE CASCADE ON UPDATE CASCADE
)`,
`CREATE INDEX idx_subscriptions_gin ON clients_telemetry USING GIN (subscriptions);`,
},
Down: []string{
`DROP TABLE IF EXISTS clients_telemetry`,
`DROP TABLE IF EXISTS subscriptions`,
`DROP TABLE IF EXISTS journal`,
},
},
Expand Down
Loading

0 comments on commit 0d127da

Please sign in to comment.