Skip to content

Commit

Permalink
Add consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
cv65kr committed Feb 9, 2024
1 parent 129ea34 commit 148f9b0
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 16 deletions.
11 changes: 3 additions & 8 deletions pubsubjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type Driver struct {
pipeline atomic.Pointer[jobs.Pipeline]
tracer *sdktrace.TracerProvider
prop propagation.TextMapPropagator
consumeAll bool
skipDeclare bool
topic string
msgInFlight *int64
Expand All @@ -64,7 +63,7 @@ type Driver struct {

// FromConfig initializes google_pub_sub_driver_ pipeline
func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("new_google_pub_sub_consumer")
const op = errors.Op("google_pub_sub_consumer")

if tracer == nil {
tracer = sdktrace.NewTracerProvider()
Expand Down Expand Up @@ -130,7 +129,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip

// FromPipeline initializes consumer from pipeline
func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("new_google_pub_sub_consumer_from_pipeline")
const op = errors.Op("google_pub_sub_consumer_from_pipeline")
if tracer == nil {
tracer = sdktrace.NewTracerProvider()
}
Expand Down Expand Up @@ -185,9 +184,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.
}

func (d *Driver) Push(ctx context.Context, jb jobs.Message) error {
const op = errors.Op("google_pub_sub_driver_push")
// check if the pipeline registered

ctx, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(tracerName).Start(ctx, "google_pub_sub_push")
defer span.End()

Expand Down Expand Up @@ -246,7 +243,6 @@ func (d *Driver) Run(ctx context.Context, p jobs.Pipeline) error {
}

func (d *Driver) State(ctx context.Context) (*jobs.State, error) {
const op = errors.Op("google_pub_sub_driver_state")
_, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(tracerName).Start(ctx, "google_pub_sub_state")
defer span.End()

Expand Down Expand Up @@ -362,8 +358,7 @@ func (d *Driver) manageTopic(ctx context.Context) error {

_, err = d.client.CreateSubscription(ctx, d.sub, pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 10 * time.Second,
ExpirationPolicy: time.Duration(0),
AckDeadline: 30 * time.Second,
})
if err != nil {
if !strings.Contains(err.Error(), "Subscription already exists") {
Expand Down
7 changes: 0 additions & 7 deletions pubsubjobs/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,6 @@ func fromJob(job jobs.Message) *Item {
},
}
}
func bytesToStr(data []byte) string {
if len(data) == 0 {
return ""
}

return unsafe.String(unsafe.SliceData(data), len(data))
}

func strToBytes(data string) []byte {
if data == "" {
Expand Down
6 changes: 5 additions & 1 deletion pubsubjobs/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (d *Driver) listen(ctx context.Context) {
d.log.Debug("listener was stopped")
return
default:
d.client.Subscription(d.sub).Receive(ctx, func(ctx context.Context, message *pubsub.Message) {
err := d.client.Subscription(d.sub).Receive(ctx, func(ctx context.Context, message *pubsub.Message) {
d.cond.L.Lock()
// lock when we hit the limit
for atomic.LoadInt64(d.msgInFlight) >= int64(atomic.LoadInt32(d.msgInFlightLimit)) {
Expand Down Expand Up @@ -53,6 +53,10 @@ func (d *Driver) listen(ctx context.Context) {
d.cond.L.Unlock()
span.End()
})

if err != nil {
d.log.Error("subscribing error", zap.Error(err))
}
}
}
}()
Expand Down

0 comments on commit 148f9b0

Please sign in to comment.