Skip to content

Commit

Permalink
chore: update messages
Browse files Browse the repository at this point in the history
  • Loading branch information
jkoenig134 committed Nov 21, 2023
1 parent a79cf85 commit 6800133
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions src/modules/pubSubPublisher/PubSubPublisherModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ export default class PubSubPublisherModule extends ConnectorRuntimeModule<PubSub
if (!this.configuration.topicName) throw new Error("Cannot start the module, the topic is not defined.");
if (!this.configuration.keyFile) throw new Error("Cannot start the module, the keyFile is not defined.");

this.logger.info(`Initializing PubSubPublisherModule with projectId '${this.configuration.projectId}' and topicName '${this.configuration.topicName}'.`);

this.pubSub = new PubSub({
projectId: this.configuration.projectId,
keyFile: this.configuration.keyFile
Expand All @@ -27,7 +29,7 @@ export default class PubSubPublisherModule extends ConnectorRuntimeModule<PubSub
this.logger.info("Checking if topic exists...");

const topicExists = (await this.topic.exists())[0];
if (!topicExists) throw new Error(`Topic ${this.configuration.topicName} does not exist.`);
if (!topicExists) throw new Error(`Topic ${this.configuration.topicName} does not exist in the project '${this.configuration.projectId}'.`);
}

public start(): void {
Expand All @@ -38,12 +40,9 @@ export default class PubSubPublisherModule extends ConnectorRuntimeModule<PubSub
const data = event instanceof DataEvent || event instanceof RuntimeDataEvent ? event.data : {};
const buffer = Buffer.from(JSON.stringify(data));

await this.topic
.publishMessage({
attributes: { namespace: event.namespace },
data: buffer
})
.catch((e) => this.logger.error("Could not publish message", e));
const namespace = event.namespace;

await this.topic.publishMessage({ attributes: { namespace }, data: buffer }).catch((e) => this.logger.error(`Could not publish message with namespace '${namespace}'`, e));
}

public async stop(): Promise<void> {
Expand Down

0 comments on commit 6800133

Please sign in to comment.