Skip to content

Commit

Permalink
Use logger for logging (#36)
Browse files Browse the repository at this point in the history
* cleanup, remove console.log

* console.log--

* console.error--
  • Loading branch information
zhirafovod authored May 29, 2024
1 parent 7ac245d commit 44c1d26
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 170 deletions.
11 changes: 6 additions & 5 deletions example/joinResistance.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ subscribeParams: #parameters for subscribing to an event
type: test
acks: []
rebel: "luke"
joinResistance: |
/${(
$details := rebel.$fetch('https://swapi.dev/api/people/?search='&$).json().results[0].name;
$joined( "/rebelForces/-", $details);
)}
joinResistance: |
/${(
start;
$details := rebel.$fetch('https://swapi.dev/api/people/?search='&$).json().results[0].name;
$joined( "/rebelForces/-", $details);
)}
# starts producer function
send$: $publish(produceParams)
# starts consumer function
Expand Down
182 changes: 21 additions & 161 deletions src/workflow/StatedWorkflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ export class StatedWorkflow {
this.snapshotOpts = context.snapshot || {storage: 'fs', basePath: './.state'}
this.storage = storage || createStorage(this.snapshotOpts);
this.snapshotManager = new SnapshotManager(this.snapshotOpts, this.storage);

// create metrics provider
this.workflowMetrics = new WorkflowMetrics();

this.consumers = new Map(); //key is type, value is pulsar consumer
this.dispatchers = new Map(); //key is type, value Set of WorkflowDispatcher

Expand All @@ -82,8 +78,6 @@ export class StatedWorkflow {
"onHttp": this.onHttp.bind(this),
"publish": this.publish.bind(this),
"logFunctionInvocation": this.logFunctionInvocation.bind(this),
"workflow": this.workflow.bind(this),
// "recover": this.recover.bind(this),
"sleep": Delay.start,
"ack": this.ack.bind(this),
}
Expand Down Expand Up @@ -138,7 +132,7 @@ export class StatedWorkflow {

//
async ack(data) {
console.log(`acknowledging data: ${StatedREPL.stringify(data)}`);
this.templateProcessor.logger.debug(`acknowledging data: ${StatedREPL.stringify(data)}`);
const dispatcherType = this.workflowDispatcher.dispatchers.get(data.type);
for (let t of dispatcherType) {
const dispatcher = this.workflowDispatcher.dispatcherObjects.get(t);
Expand All @@ -156,19 +150,6 @@ export class StatedWorkflow {

}

// setWorkflowPersistence() {
// const storage = new Storage({workflowName: this.templateProcessor.input.name});
// const cbFn = async (data, jsonPtr, removed) => {
// try {
// await storage.persist(this.templateProcessor);
// } catch (error) {
// console.error(`Error persisting workflow state: ${error}`);
// }
// }
// this.templateProcessor.removeDataChangeCallback('/');
// this.templateProcessor.setDataChangeCallback('/',cbFn);
// }

async logFunctionInvocation(stage, args, result, error = null, log) {
const logMessage = {
context: stage.name,
Expand All @@ -186,7 +167,7 @@ export class StatedWorkflow {
logMessage.finish = new Date().toISOString();
logMessage.out = result;
}
console.log(StatedREPL.stringify(logMessage));
this.logger.debug(StatedREPL.stringify(logMessage));

// Assuming 'logs' array is inside 'log' object
if (log.logs) {
Expand Down Expand Up @@ -280,7 +261,7 @@ export class StatedWorkflow {
],
});
} catch (err) {
console.error(`Error publishing to Kafka: ${err}`);
this.logger.error(`Error publishing to Kafka: ${err}`);
} finally {
// Close the producer when done
await producer.disconnect();
Expand Down Expand Up @@ -379,14 +360,11 @@ export class StatedWorkflow {
const messageDataStr = message.getData().toString();
messageData = JSON.parse(messageDataStr);
} catch (error) {
console.error("unable to parse data to json:", error);
this.templateProcessor.logger.error("unable to parse data to json:", error);
// TODO - should we acknowledge the message here?
continue;
}
let resolve;
this.latch = new Promise((_resolve) => {
resolve = _resolve; //we assign our resolve variable that is declared outside this promise so that our onDataChange callbacks can use it
});

// create a callback to acknowledge the message
const dataAckCallback = async () => {
Expand All @@ -399,7 +377,7 @@ export class StatedWorkflow {
break;
}
} catch (error) {
console.error("Error receiving or dispatching message:", error);
this.templateProcessor.logger.error("Error receiving or dispatching message:", error);
} finally {
if (this.pulsarClient === undefined) {
break;
Expand All @@ -410,7 +388,7 @@ export class StatedWorkflow {
try {
await consumer.close();
} catch (error) {
console.error("Error closing consumer:", error);
this.templateProcessor.logger.error("Error closing consumer:", error);
}
})();
}
Expand Down Expand Up @@ -458,10 +436,10 @@ export class StatedWorkflow {
try {
data = await registry.decode(message.value);
} catch (error) {
console.error("Unable to parse data to JSON:", error);
this.logger.error("Unable to parse data to JSON:", error);
}
const ackFunction = async (data2ack) => {
console.log(`acknowledging data: ${StatedREPL.stringify(data)} with data2ack: ${StatedREPL.stringify(data2ack)}`);
this.logger.log(`acknowledging data: ${StatedREPL.stringify(data)} with data2ack: ${StatedREPL.stringify(data2ack)}`);
// TODO: add ack logic
}
await this.workflowDispatcher.dispatchToAllSubscribers(type, data, ackFunction);
Expand Down Expand Up @@ -510,16 +488,9 @@ export class StatedWorkflow {
const str = message.value.toString();
data = JSON.parse(str);
} catch (error) {
console.error("Unable to parse data to JSON:", error);
}
const ackFunction = async (data) => {
// TODO: make the below code working
// const currentOffset = this.templateProcessor.output(subscribeParamsJsonPointer + 'offset',);
// if (currentOffset < message.offset + 1) {
// await consumer.commitOffsets([{ topic, partition, offset: message.offset + 1 }]);
// this.templateProcessor.setData(subscribeParamsJsonPointer + 'offset', message.offset + 1;
// }
this.logger.error("Unable to parse data to JSON:", error);
}

this.workflowDispatcher.dispatchToAllSubscribers(type, data, dataAckCallback);


Expand All @@ -545,7 +516,7 @@ export class StatedWorkflow {
testDataAckFunctionGenerator = (data) => {
return (async () => {
if (Array.isArray(clientParams.acks)) {
console.debug(`acknowledging data: ${StatedREPL.stringify(data)}`);
this.logger.debug(`acknowledging data: ${StatedREPL.stringify(data)}`);
await this.templateProcessor.setData(subscribeParamsJsonPointer + '/client/acks/-',data);
}
}).bind(this);
Expand Down Expand Up @@ -582,23 +553,23 @@ export class StatedWorkflow {
if(clientParams.type === "test"){
await this.subscribeTest(subscriptionParams, subscribeParamsJsonPointer);
} else if (clientType === 'dispatcher') {
this.logger.debug(`No 'real' subscription created because client.type='dispatcher' set for subscription params ${StatedREPL.stringify(subscriptionParams)}`);
this.templateProcessor.logger.debug(`No 'real' subscription created because client.type='dispatcher' set for subscription params ${StatedREPL.stringify(subscriptionParams)}`);
this.workflowDispatcher.getDispatcher(subscriptionParams);
} else if (clientType === 'http') {
this.onHttp(subscriptionParams);
} else if (clientType === 'cop') {
this.logger.debug(`subscribing to cop cloud event sources ${clientParams}`)
this.templateProcessor.logger.debug(`subscribing to cop cloud event sources ${clientParams}`)
this.subscribeCOPKafka(subscriptionParams);
}else if (clientType === 'kafka') {
this.logger.debug(`subscribing to kafka using ${clientParams}`)
this.templateProcessor.logger.debug(`subscribing to kafka using ${clientParams}`)
this.createKafkaClient(clientParams);
this.subscribeKafka(subscriptionParams);
}else if(clientType === 'pulsar') {
this.logger.debug(`subscribing to pulsar (default) using ${clientParams}`)
this.templateProcessor.logger.debug(`subscribing to pulsar (default) using ${clientParams}`)
this.createPulsarClient(clientParams);
this.subscribePulsar(subscriptionParams);
}else if(clientType === 'pulsarMock'){
this.logger.debug(`subscribing to pulsarMock using ${clientParams}`)
this.templateProcessor.logger.debug(`subscribing to pulsarMock using ${clientParams}`)
this.createPulsarClientMock(clientParams);
this.subscribePulsar(subscriptionParams);
}else{
Expand All @@ -613,15 +584,15 @@ export class StatedWorkflow {
this.app = express();
this.app.use(express.json());
this.app.listen(this.port, () => {
console.log(`Server started on http://localhost:${StatedWorkflow.port}`);
this.templateProcessor.logger.log(`Server started on http://localhost:${StatedWorkflow.port}`);
});
// Path = /workflow/:workflowId
// workflowIdToWorkflowDispatcher
if (subscriptionParams.type === undefined) subscriptionParams.type = 'default-type';
if (subscriptionParams.subscriberId === undefined) subscriptionParams.subscriberId = 'default-subscriberId';
const dispatcher = this.workflowDispatcher.getDispatcher(subscriptionParams);
this.app.all('*', async (req, res) => {
console.debug("Received HTTP request: ", req.body, req.method, req.url);
this.templateProcessor.logger.debug("Received HTTP request: ", req.body, req.method, req.url);
// Push the request and response objects to the dispatch queue to be handled by callback
await dispatcher.addToQueue(req.body, ()=>{ res.send("sucess")});
});
Expand All @@ -630,82 +601,6 @@ export class StatedWorkflow {

}

static async deleteStepsLogs(workflowInvocation, steps){
await Promise.all(steps.map(s=>s.deleteLogs(workflowInvocation)));
}

// ensures that the log object has the right structure for the workflow invocation
static initializeLog(log, workflowName, id) {
if (!log[workflowName]) log[workflowName] = {};
if (!log[workflowName][id]) log[workflowName][id] = {
info: {
start: new Date().getTime(),
status: 'in-progress'
},
execution: {}
};
}

static async persistLogRecord(stepRecord) {
this.publish(
{'type': stepRecord.workflowName, 'data': stepRecord},
{type:'pulsar', params: {serviceUrl: 'pulsar://localhost:6650'}}
);
}


async executeStep(step, input, currentLog, stepRecord) {
/*
const stepLog = {
step: step.name,
start: new Date().getTime(),
args: [input]
};
*/

if (currentLog.execution[stepRecord.stepName]?.out) {
console.log(`step ${step.name} has been already executed. Skipping`);
return currentLog.execution[stepRecord.stepName].out;
}
stepRecord["start"] = new Date().getTime();
stepRecord["args"] = input;

// we need to pass invocation id to the step expression
step.workflowInvocation = stepRecord.workflowInvocation;

try {
const result = await step.function.apply(this, [input]);
stepRecord.end = new Date().getTime();
stepRecord.out = result;
currentLog.execution[stepRecord.stepName] = stepRecord;
StatedWorkflow.persistLogRecord(stepRecord);
return result;
} catch (error) {
stepRecord.end = new Date().getTime();
stepRecord.error = {message: error.message};
currentLog.info.status = 'failed';
currentLog.execution[stepRecord.stepName] = stepRecord;
StatedWorkflow.persistLogRecord(stepRecord);
throw error;
}
}
finalizeLog(currentLog) {
currentLog.info.end = new Date().getTime();
if (currentLog.info.status !== 'failed') {
currentLog.info.status = 'succeeded';
}
}

ensureRetention(workflowLogs) {
const maxLogs = 100;
const sortedKeys = Object.keys(workflowLogs).sort((a, b) => workflowLogs[b].info.start - workflowLogs[a].info.start);
while (sortedKeys.length > maxLogs) {
const oldestKey = sortedKeys.pop();
delete workflowLogs[oldestKey];
}
}

static generateUniqueId() {
return `${new Date().getTime()}-${Math.random().toString(36).slice(2, 7)}`;
}
Expand All @@ -719,63 +614,28 @@ export class StatedWorkflow {
return `${dateStr}-${timeInMs}-${randomPart}`;
}

async workflow(input, steps, options={}) {
const {name: workflowName, log} = options;
let {id} = options;

if (log === undefined) {
throw new Error('log is missing from options');
}

if (id === undefined) {
id = StatedWorkflow.generateUniqueId();
options.id = id;
}

StatedWorkflow.initializeLog(log, workflowName, id);

let currentInput = input;
let serialOrdinal = 0;
for (let step of steps) {
const stepRecord = {invocationId: id, workflowName, stepName: step.name, serialOrdinal, branchType:"SERIAL"};
currentInput = await this.executeStep(step, currentInput, log[workflowName][id], stepRecord);
serialOrdinal++;
if (step.next) this.workflow(currentInput, step.next, options);
}

//this.finalizeLog(log[workflowName][id]);
//this.ensureRetention(log[workflowName]);

return currentInput;
}


async close() {

if (this.pulsarClient !== undefined) {
try {
await this.pulsarClient.close();
} catch (error) {
console.error("Error closing pulsar client:", error);
this.logger.error("Error closing pulsar client:", error);
}
this.pulsarClient = undefined;
}

this.templateProcessor.removeDataChangeCallback(this.changeListener);

// TODO: check if consumers can be closed without client
// for (let consumer of StatedWorkflow.consumers.values()) {
// console.log(consumer);
// await consumer.disconnect();
// }

try {
if (this.workflowDispatcher) await this.workflowDispatcher.clear();
if (this.templateProcessor) await this.templateProcessor.close();

} catch (error) {
console.error("Error closing workflow dispatcher:", error);
this.logger.error("Error closing workflow dispatcher:", error);
}
clearInterval(this.snapshotInterval);
await this.workflowMetrics.shutdown();
}
}
Loading

0 comments on commit 44c1d26

Please sign in to comment.