diff --git a/pubsubjobs/driver.go b/pubsubjobs/driver.go index 2f1f7ac..18cae17 100644 --- a/pubsubjobs/driver.go +++ b/pubsubjobs/driver.go @@ -43,7 +43,6 @@ type Driver struct { pipeline atomic.Pointer[jobs.Pipeline] tracer *sdktrace.TracerProvider prop propagation.TextMapPropagator - consumeAll bool skipDeclare bool topic string msgInFlight *int64 @@ -64,7 +63,7 @@ type Driver struct { // FromConfig initializes google_pub_sub_driver_ pipeline func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) { - const op = errors.Op("new_google_pub_sub_consumer") + const op = errors.Op("google_pub_sub_consumer") if tracer == nil { tracer = sdktrace.NewTracerProvider() @@ -130,7 +129,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip // FromPipeline initializes consumer from pipeline func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) { - const op = errors.Op("new_google_pub_sub_consumer_from_pipeline") + const op = errors.Op("google_pub_sub_consumer_from_pipeline") if tracer == nil { tracer = sdktrace.NewTracerProvider() } @@ -185,9 +184,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap. } func (d *Driver) Push(ctx context.Context, jb jobs.Message) error { - const op = errors.Op("google_pub_sub_driver_push") // check if the pipeline registered - ctx, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(tracerName).Start(ctx, "google_pub_sub_push") defer span.End() @@ -246,7 +243,6 @@ func (d *Driver) Run(ctx context.Context, p jobs.Pipeline) error { } func (d *Driver) State(ctx context.Context) (*jobs.State, error) { - const op = errors.Op("google_pub_sub_driver_state") _, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(tracerName).Start(ctx, "google_pub_sub_state") defer span.End() @@ -362,8 +358,7 @@ func (d *Driver) manageTopic(ctx context.Context) error { _, err = d.client.CreateSubscription(ctx, d.sub, pubsub.SubscriptionConfig{ Topic: topic, - AckDeadline: 10 * time.Second, - ExpirationPolicy: time.Duration(0), + AckDeadline: 30 * time.Second, }) if err != nil { if !strings.Contains(err.Error(), "Subscription already exists") { diff --git a/pubsubjobs/item.go b/pubsubjobs/item.go index 82b9c83..c6784c0 100644 --- a/pubsubjobs/item.go +++ b/pubsubjobs/item.go @@ -161,13 +161,6 @@ func fromJob(job jobs.Message) *Item { }, } } -func bytesToStr(data []byte) string { - if len(data) == 0 { - return "" - } - - return unsafe.String(unsafe.SliceData(data), len(data)) -} func strToBytes(data string) []byte { if data == "" { diff --git a/pubsubjobs/listener.go b/pubsubjobs/listener.go index 2d66ac2..ef4f08d 100644 --- a/pubsubjobs/listener.go +++ b/pubsubjobs/listener.go @@ -17,7 +17,7 @@ func (d *Driver) listen(ctx context.Context) { d.log.Debug("listener was stopped") return default: - d.client.Subscription(d.sub).Receive(ctx, func(ctx context.Context, message *pubsub.Message) { + err := d.client.Subscription(d.sub).Receive(ctx, func(ctx context.Context, message *pubsub.Message) { d.cond.L.Lock() // lock when we hit the limit for atomic.LoadInt64(d.msgInFlight) >= int64(atomic.LoadInt32(d.msgInFlightLimit)) { @@ -53,6 +53,10 @@ func (d *Driver) listen(ctx context.Context) { d.cond.L.Unlock() span.End() }) + + if err != nil { + d.log.Error("subscribing error", zap.Error(err)) + } } } }()