Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for publishSettings #1072

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions protocol/pubsub/v2/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"cloud.google.com/go/pubsub"

pscontext "github.com/cloudevents/sdk-go/protocol/pubsub/v2/context"
"github.com/cloudevents/sdk-go/v2/binding"
)
Expand Down Expand Up @@ -62,6 +63,9 @@ type Connection struct {
// ReceiveSettings is used to configure Pubsub pull subscription.
ReceiveSettings *pubsub.ReceiveSettings

// PublishSettings is used to configure Publishing to a topic
PublishSettings *pubsub.PublishSettings

// AckDeadline is Pub/Sub AckDeadline.
// Default is 30 seconds.
// This can only be set prior to first call of any function.
Expand Down Expand Up @@ -128,6 +132,12 @@ func (c *Connection) getOrCreateTopicInfo(ctx context.Context, getAlreadyOpenOnl
}
ti.wasCreated = true
}

// if publishSettings have been provided use them otherwise pubsub will use default settings
if c.PublishSettings != nil {
topic.PublishSettings = *c.PublishSettings
}

// Success.
ti.topic = topic

Expand Down
52 changes: 52 additions & 0 deletions protocol/pubsub/v2/internal/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,58 @@ func TestPublishCreateTopic(t *testing.T) {
verifyTopicDeleteWorks(t, client, psconn, topicID)
}

// Test that publishing to a topic with non default publish settings
func TestPublishWithCustomPublishSettings(t *testing.T) {
t.Run("create topic and publish to it with custom settings", func(t *testing.T) {
ctx := context.Background()
pc := &testPubsubClient{}
defer pc.Close()

projectID, topicID, subID := "test-project", "test-topic", "test-sub"

client, err := pc.New(ctx, projectID, nil)
if err != nil {
t.Fatalf("failed to create pubsub client: %v", err)
}
defer client.Close()

psconn := &Connection{
AllowCreateSubscription: true,
AllowCreateTopic: true,
Client: client,
ProjectID: projectID,
TopicID: topicID,
SubscriptionID: subID,
PublishSettings: &pubsub.PublishSettings{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change itself looks good to me, but I'm curious how this tests it. What in the test makes it fail and what would change if the PublishSettings were not specified?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not specified then default publish settings would be used as documented here https://github.com/googleapis/google-cloud-go/blob/265963bd5b91c257b3c3d3c1f52cdf2b5f4c9d1a/pubsub/topic.go#L76

With regards to what would make this test fail, it exists to more than anything to statically verify the setting is exposed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What verifies that the settings used came from what's on line 186 instead of the defaults?

Copy link
Author

@JamesBLewis JamesBLewis Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the change is to the interface there is a limit to how far we can verify this. that limit (without testing google's own pubsub client code) is that we can verify the Public variable is exposed in by the struct.

Is there some alternative you have in mind that I'm missing?

Copy link
Author

@JamesBLewis JamesBLewis Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see two other similar PRs to draw potential inspiration from:

  1. The original PR adding receiveSettings https://github.com/cloudevents/sdk-go/pull/396/files in which the default value is declared and then a simple check is performed in the test to see if the value is equal to the default.
  2. This PR exposing a custom host option https://github.com/cloudevents/sdk-go/pull/1070/files in which a setter function is added and unit tested.

There is an argument to be made that we should define our own default publish settings like receive although I don't really see how this would make the sdk more intuitive other then perhaps consistency

Copy link
Author

@JamesBLewis JamesBLewis Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my underlying point here remains that the unit test for ReceiveSettings goes arguably less far then this publishSettings test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to make it so that w/o the parameter the operation works and then with your settings it fails? I could live with that much.

Or if that can't be done (e.g. because we need an instance of google pubsub and we don't have that), then perhaps try to get an error from the call to CreateTopic() that complains about a parameter in your settings. That'll prove your settings made it to the pubsub code.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DelayThreshold: 100 * time.Millisecond,
CountThreshold: 00,
ByteThreshold: 2e6,
Timeout: 120 * time.Second,
BufferedByteLimit: 20 * pubsub.MaxPublishRequestBytes,
FlowControlSettings: pubsub.FlowControlSettings{
MaxOutstandingMessages: 10,
MaxOutstandingBytes: 0,
LimitExceededBehavior: pubsub.FlowControlBlock,
},
},
}

topic, err := client.CreateTopic(ctx, topicID)
if err != nil {
t.Fatalf("failed to pre-create topic: %v", err)
}
topic.Stop()

msg := &pubsub.Message{
ID: "msg-id-1",
Data: []byte("msg-data-1"),
}
if _, err := psconn.Publish(ctx, msg); err != nil {
t.Errorf("failed to publish message: %v", err)
}
})
}

// Test that publishing to an already created topic works and doesn't allow topic deletion
func TestPublishExistingTopic(t *testing.T) {
for _, allowCreate := range []bool{true, false} {
Expand Down