-
Notifications
You must be signed in to change notification settings - Fork 0
/
create_subscription.py
38 lines (30 loc) · 1.05 KB
/
create_subscription.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import click
from common import (
PUBSUB_EMULATOR_ENDPOINT_DEFAULT,
get_publisher_client,
get_subscriber_client,
)
@click.command()
@click.option(
"--project", default="my-test-project", help="Project where PubSub is running"
)
@click.option(
"--endpoint",
default=PUBSUB_EMULATOR_ENDPOINT_DEFAULT,
help="Host and port of the running Google PubSub Emulator",
)
@click.argument("topic")
@click.argument("subscription")
def main(project: str, endpoint: str, topic: str, subscription: str) -> None:
# Create PubSub clients
publisher_client = get_publisher_client(endpoint)
subscriber_client = get_subscriber_client(endpoint)
# Create subscriptions
topic_path = publisher_client.topic_path(project, topic)
subscription_path = subscriber_client.subscription_path(project, subscription)
subscription_obj = subscriber_client.create_subscription(
request={"name": subscription_path, "topic": topic_path}
)
print(f"Created subscription {subscription_obj.name}")
if __name__ == "__main__":
main()