Skip to content

Commit

Permalink
chore: small refactoring, add more logs
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <[email protected]>
  • Loading branch information
rustatian committed Jul 11, 2024
1 parent a6b2de5 commit 2ebbb65
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 85 deletions.
67 changes: 36 additions & 31 deletions pubsubjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,25 @@ type Configurer interface {
type Driver struct {
mu sync.Mutex

log *zap.Logger
pq jobs.Queue
pipeline atomic.Pointer[jobs.Pipeline]
tracer *sdktrace.TracerProvider
prop propagation.TextMapPropagator
topic string
dltopic string
sub string
maxDeliveryAttempts int
log *zap.Logger
pq jobs.Queue
pipeline atomic.Pointer[jobs.Pipeline]
tracer *sdktrace.TracerProvider
prop propagation.TextMapPropagator

// events
eventsCh chan events.Event
eventBus *events.Bus
id string

// pubsub specific
gsub *pubsub.Subscription
dlsub *pubsub.Subscription
gtopic *pubsub.Topic
gclient *pubsub.Client
gsub *pubsub.Subscription
gtopic *pubsub.Topic
gclient *pubsub.Client
topicStr string
dltopicStr string
subStr string
maxDeliveryAttempts int

// context cancel func used to cancel the pubsub subscription
receiveCtxCancel context.CancelFunc
Expand Down Expand Up @@ -130,11 +129,11 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip
tracer: tracer,
prop: prop,
log: log,
topic: conf.Topic,
dltopic: conf.DeadLetterTopic,
topicStr: conf.Topic,
dltopicStr: conf.DeadLetterTopic,
maxDeliveryAttempts: conf.MaxDeliveryAttempts,
pq: pq,
sub: pipe.Name(),
subStr: pipe.Name(),
gclient: gclient,

// events
Expand Down Expand Up @@ -205,13 +204,13 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.
eventBus, id := events.NewEventBus()

jb := &Driver{
prop: prop,
tracer: tracer,
log: log,
pq: pq,
topic: conf.Topic,
sub: pipe.Name(),
gclient: gclient,
prop: prop,
tracer: tracer,
log: log,
pq: pq,
topicStr: conf.Topic,
subStr: pipe.Name(),
gclient: gclient,

// events
eventsCh: eventsCh,
Expand Down Expand Up @@ -262,6 +261,8 @@ func (d *Driver) Run(ctx context.Context, p jobs.Pipeline) error {

atomic.AddUint32(&d.listeners, 1)

d.log.Debug("start listening for messages", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start))

d.listen()

d.log.Debug("pipeline was started", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start), zap.Duration("elapsed", time.Since(start)))
Expand Down Expand Up @@ -295,6 +296,7 @@ func (d *Driver) Pause(ctx context.Context, p string) error {
return errors.Str("no active listeners, nothing to pause")
}

d.log.Debug("stop listening for messages", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start))
// stop the listener
d.gtopic.Stop()

Expand Down Expand Up @@ -326,6 +328,7 @@ func (d *Driver) Resume(ctx context.Context, p string) error {
return errors.Str("listener is already in the active state")
}

d.log.Debug("resume listening for messages", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start))
d.listen()

// increase num of listeners
Expand Down Expand Up @@ -364,48 +367,50 @@ func (d *Driver) manageSubscriptions() error {

var err error
// Create regular topic
d.gtopic, err = d.gclient.CreateTopic(ctx, d.topic)
d.gtopic, err = d.gclient.CreateTopic(ctx, d.topicStr)
if err != nil {
if !strings.Contains(err.Error(), "Topic already exists") {
return err
}

// topic would be nil if it already exists
d.gtopic = d.gclient.Topic(d.topic)
d.gtopic = d.gclient.Topic(d.topicStr)
}

d.log.Debug("created/used topic", zap.String("topic", d.gtopic.String()))

// check or create a Dead Letter Topic
var dltopic *pubsub.Topic

if d.dltopic != "" {
dltopic, err = d.gclient.CreateTopic(ctx, d.dltopic)
if d.dltopicStr != "" {
dltopic, err = d.gclient.CreateTopic(ctx, d.dltopicStr)
if err != nil {
if !strings.Contains(err.Error(), "Topic already exists") {
return err
}

// topic would be nil if it already exists
dltopic = d.gclient.Topic(d.dltopic)
dltopic = d.gclient.Topic(d.dltopicStr)
}

d.log.Debug("created/used dead letter topic", zap.String("topic", dltopic.String()))
}

d.gsub, err = d.gclient.CreateSubscription(ctx, d.sub, pubsub.SubscriptionConfig{
// Create subscription but not listen it
d.gsub, err = d.gclient.CreateSubscription(ctx, d.subStr, pubsub.SubscriptionConfig{
Topic: d.gtopic,
// Ack dedline should be between 10 seconds and 10 minutes
AckDeadline: time.Minute * 5,
AckDeadline: time.Minute * 8,
DeadLetterPolicy: initOrNil(dltopic, d.maxDeliveryAttempts),
})

if err != nil {
if !strings.Contains(err.Error(), "Subscription already exists") {
return err
}
}

d.log.Debug("created subscription", zap.String("topic", d.topic), zap.String("subscription", d.sub))
d.log.Debug("created subscription, not listening", zap.String("topic", d.topicStr), zap.String("subscription", d.subStr))

return nil
}
Expand Down
64 changes: 10 additions & 54 deletions pubsubjobs/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,17 @@ func (d *Driver) listen() {
d.atomicCtx()
go func() {
err := d.gsub.Receive(d.rctx, func(ctx context.Context, message *pubsub.Message) {
if message == nil {
d.log.Warn("received nil message, skipping processing the message")
return
}

d.log.Debug("receive message", zap.Stringp("ID", &message.ID))

if message.DeliveryAttempt != nil {
d.log.Info("message delivery attempts", zap.Int("attempts", *message.DeliveryAttempt))
}

item := d.unpack(message)

ctxspan, span := d.tracer.Tracer(tracerName).Start(d.prop.Extract(ctx, propagation.HeaderCarrier(item.headers)), "google_pub_sub_listener")
Expand Down Expand Up @@ -69,60 +79,6 @@ func (d *Driver) listen() {
d.log.Error("subscribing error, restarting the pipeline", zap.Error(err), zap.String("pipeline", pipe))
}
}()

if d.dlsub != nil {
go func() {
err := d.dlsub.Receive(d.rctx, func(ctx context.Context, message *pubsub.Message) {
d.log.Debug("dead-letter receive message", zap.Stringp("ID", &message.ID))
item := d.unpack(message)

ctxspan, span := d.tracer.Tracer(tracerName).Start(d.prop.Extract(ctx, propagation.HeaderCarrier(item.headers)), "google_pub_sub_dl_listener")
if item.Options.AutoAck {
message.Ack()
// it is not possible to requeue a message from the dead-letter queue when auto ack is turned on
d.log.Debug("dead-letter auto ack is turned on, message acknowledged")
}

if item.headers == nil {
item.headers = make(map[string][]string, 2)
}

d.prop.Inject(ctxspan, propagation.HeaderCarrier(item.headers))

d.pq.Insert(item)
d.log.Debug("dead-letter message pushed to the priority queue", zap.Uint64("queue size", d.pq.Len()))

span.End()
})
if err != nil {
if errors.Is(err, context.Canceled) {
atomic.StoreUint32(&d.listeners, 0)
return
}
st := status.Convert(err)
if st != nil && st.Message() == "grpc: the client connection is closing" {
// reduce the number of listeners
if atomic.LoadUint32(&d.listeners) > 0 {
atomic.AddUint32(&d.listeners, ^uint32(0))
}

d.log.Debug("dead-letter listener was stopped")
return
}

atomic.StoreUint32(&d.listeners, 0)

// the pipeline was stopped
if atomic.LoadUint64(&d.stopped) == 1 {
return
}
// recreate pipeline on fail
pipe := (*d.pipeline.Load()).Name()
d.eventsCh <- events.NewEvent(events.EventJOBSDriverCommand, pipe, restartStr)
d.log.Error("dead-letter subscribing error", zap.Error(err), zap.String("pipeline", pipe))
}
}()
}
}

func (d *Driver) atomicCtx() {
Expand Down

0 comments on commit 2ebbb65

Please sign in to comment.