diff --git a/go.sum b/go.sum index 1ad1608..0dcdc31 100644 --- a/go.sum +++ b/go.sum @@ -126,6 +126,7 @@ github.com/hashicorp/serf v0.10.1 h1:Z1H2J60yRKvfDYAOZLd2MU0ND4AH/WDz7xYHDWQsIPY github.com/hashicorp/serf v0.10.1/go.mod h1:yL2t6BqATOLGc5HF7qbFkTfXoPIY0WZdWHfEvMqbG+4= github.com/hazelcast/hazelcast-go-client v1.4.1 h1:BSpJqqjbACI4MugfWXGxk+JdZR3JRELx0n769pa85kA= github.com/hazelcast/hazelcast-go-client v1.4.1/go.mod h1:PJ38lqXJ18S0YpkrRznPDlUH8GnnMAQCx3jpQtBPZ6Q= +github.com/hazelcast/hazelcast-go-client v1.4.2/go.mod h1:PJ38lqXJ18S0YpkrRznPDlUH8GnnMAQCx3jpQtBPZ6Q= github.com/invopop/yaml v0.2.0 h1:7zky/qH+O0DwAyoobXUqvVBwgBFRxKoQ/3FjcVpjTMY= github.com/invopop/yaml v0.2.0/go.mod h1:2XuRLgs/ouIrW3XNzuNj7J3Nvu/Dig5MXvbCEdiBN3Q= github.com/ioki-mobility/go-outline v0.5.0 h1:BsMcH/wbfcgGpH1+8VTsygzdh9rQmUL3aMpcYW0MKuw= diff --git a/openapi/api.yaml b/openapi/api.yaml index 97067a4..0a5b72e 100644 --- a/openapi/api.yaml +++ b/openapi/api.yaml @@ -293,6 +293,9 @@ components: createdAt: type: string format: date-time + completedAt: + type: string + format: date-time state: type: string enum: diff --git a/pkg/bpmn_engine/api/api.gen.go b/pkg/bpmn_engine/api/api.gen.go index 698c9b5..167b765 100644 --- a/pkg/bpmn_engine/api/api.gen.go +++ b/pkg/bpmn_engine/api/api.gen.go @@ -109,6 +109,7 @@ type ProcessDefinitionsPage struct { type ProcessInstance struct { Activities *string `json:"activities,omitempty"` CaughtEvents *string `json:"caughtEvents,omitempty"` + CompletedAt *time.Time `json:"completedAt,omitempty"` CreatedAt *time.Time `json:"createdAt,omitempty"` Key *string `json:"key,omitempty"` ProcessDefinitionKey *string `json:"processDefinitionKey,omitempty"` @@ -388,27 +389,27 @@ func RegisterHandlersWithBaseURL(router EchoRouter, si ServerInterface, baseURL // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/9xYTXPbNhD9Kxy0R9mU206n1c2JPa3cptEkuWV0AMmVDAcEGGDpVtXwv3cAfhOgRNmy", - "p5OTZQJY7r739gPck1immRQgUJPFnuj4HlJqf17HyB4Z7szvTMkMFDKwK1GWilsOKQj8tMvAPEL7l2hU", - "TGxJMSOxAoqQXKNZ3UiVUiQLklCEC2QpkJl7BEqTy8Rr8AvsvM8zJWPQ+gY2TDBkUvxxeONSaKQihrFt", - "Gin6Qioaj2X0ADGavTVEK7q1Ryjn7zdk8XkIGENI+z++V7AhC/Jd2MIfVtiHDfDtK6lSdOf14bAp49g7", - "QJpQpKRYe0J4y3ONoJZiI12iM6rQYjrd+creqj45KQjinHJcYX5VcKAJKO9SCmkEqu+5s+m4a3cycr05", - "u7rr1SPi/PJ62r6T0XllbYB8DUX3drjMyVxgBwImELagzEG52WgYWdPsX/Ct+JBbDSvSDSBlvI/kwRiH", - "Bj6yNONgsXEr8U0V51FO11N8rV7lrfnV5hML9CMoXaX0k9DT55XhOLivIM1+mroo07L0V/+5TZXm23u8", - "fay79Tm67rP7alNUQOQpWXwugzDvMhhxQEjMe0GlTBjnyNrjxSNVjEYcfpfcX8+L43C+iE4arl5eH8Yi", - "q/pwAjpWLCt7Iflw+/FTcL1aBhupAs6iC5ONFyC2TFhOGZqUJW9W7/4Kbu3T4H0G4nq1JJ38I/PLq8u5", - "rXQZCJoxsiA/Xs4v52RGMor3FpIwLlux+b0tq6HBkBpPTOKT3wC7M8OMKNCZFLrE+If5vKyyAqGsszTL", - "OIvt+fBBl3WgRGTiKGFfY8Hpg1ItBwYyI3Q7bBg95mlK1a50NYg9u2YE6VYbqdbBrs3BOvRQQUR5k6BS", - "e1D40GzxA+B3tTGcDDxtzNX+jvr4ICM97tbbKuNMrzWOfc1B4xuZ7E4ipZ82DzLyp35RvoEpSIyf1T6/", - "rtuNqHIoHNCuXNDuZBS0FaSPVx1nQIMHG2oNloWnRKoqYBdJ20kOidrtOy+p7ZEu55H5n0xjIDdBFU7Q", - "DceVOz+wu0XJB83a1Hy/qGxTcTyeLLB/Ut4HpmlMZsVpBlPkMn+Gnic2tsKr4z43KwflIIGMy50j2Bv7", - "OKCBgL895BzlZkTQ4d4XTXGSzm31VzQFtLelz3vCTGymI5AZETQ1GHhBG/I083HMBP78U8tyO/utXzO9", - "qhF8EoeetHomY6waIg5V7m6SNUPHuWr45GGunsO642Wj/36999p8WvU/O/Ht1DZOeE1KUA3NwxZjnw4y", - "lrXEDNlvOR7hPpyQlcvGiD8pv+agdkez8pQsnPktV3fhrq0ENjTnSBbz6Wbstdlr5Gr+AiXBK/plN/me", - "MvqXnXk4/s8ISqR82rV2vK2rXAgmto7CDjX38TNPkeXe/XhUnCDVU9pH9/vU/755HKohH82d11BB3dLg", - "8qbHdwcaOMQISRDtggFKy+RsfIb9Dwtj1F63u745Vnvf6g8kZAep8QxsN9krOT1Pg/AyV1/4xji7M+vf", - "HFv11+cDRFlgxikyy08kx9gE9VhjmStOFuQeMVuEIZcx5fdS4+KX+a9X5Wec0tZ+AHRvHjQNcrDcqfTt", - "Yn3ZL9bFfwEAAP//fY+RLZsbAAA=", + "H4sIAAAAAAAC/9xYTW/jNhD9KwLboxM7bVG0vmU3Qeu02zU2ewt8oKSxwyxFaslRWtfQfy9IfYuULTtO", + "UOwpjkiOZt5780HtSCSTVAoQqMl8R3T0CAm1P68jZM8Mt+Z3qmQKChnYlTBNxC2HBAR+3qZgHqH9SzQq", + "JjYkn5BIAUWIr9GsrqVKKJI5iSnCBbIEyMQ9AoXJRew1+AW23uepkhFofQNrJhgyKf7Yv3EhNFIRwdA2", + "jRR9IeW1xzJ8ggjN3gqiJd3YI5Tzj2syf+gDxhCS7o/vFazJnHw3beCflthPa+CbV1Kl6Nbrw35TxrEP", + "gDSmSEm+8oTwnmcaQS3EWrpEp1ShxXS886W9ZXVyVBDEOeW4wvyq4EBjUN6lBJIQVNdzZ9Nh1+5k6Hpz", + "dnVXqwfE+eXttH0nw/PK2gD5Foru7HCZk5nAFgRMIGxAmYNyvdYwsKbZv+Bb8SG37FekG0DKeBfJvTH2", + "DdyzJOVgsXEr8U0Z50FOV2N8LV/lrfnl5iML9DMoXab0Sejp88pwGNw3kGY3TV2UaVH6y//cpkqzzSPe", + "Plfd2t0gTSxHVqYTitmLm3FdiUBkCZk/FJGbd9UhmPeCSpgwzpGVx4tnqhgNOfwuub8J5Ic5eBVx1QS/", + "vqiMRVY27xh0pFhaNFDy6fb+c3C9XARrqQLOwguTwhcgNkxYThmaPCfvlh/+Cm7t0+BjCuJ6uSCtpCWz", + "y6vLmS2PKQiaMjInP17OLmdkQlKKjxaSaVT0b/N7U5RQgyE1nphqQX4DbA8aE6JAp1LoAuMfZrOiNAuE", + "ojjTNOUssuenT7ooHgUiI+cP+xoLTheUcjkwkBmh2wnF6DFLEqq2hatB5Nk1IUg32ki1CnZlDlahTxWE", + "lNdZLbUHhU/1Fj8Afldrw3HP09pc5e+gj08y1MNuvS8zzjRo49jXDDS+k/H2KFK6afMkQ3/q58UbmILY", + "+Fnu8+u62Ygqg9wB7coF7U6GQVNBunhVcQY0eLKhVmBZeAqkygJ2ETftZ5+o3Wb1mtoeaI0emf/JNAZy", + "HZThBO1wXLnzPbsblHzQrEzN94vKNhXH49EC+yfhXWDqxmRWnGYwRi6zF+h5ZGPLvTrucrN0UA5iSLnc", + "OoK9sY8DGgj420POQW4GBD3d+aLJj9K5rf6KJoD2ivWwI8zEZjoCmRBBE4OBF7Q+TxMfx0zgzz81LDcD", + "4+ot06uc20dx6EmrFzLGyiFiX+VuJ1k9dJyrho8e5qo5rD2T1vrv1nuvzdOq/9mJb6a2YcIrUoJyaO63", + "GPu0l7GsIabPfsPxAPfTEVm5qI34k/JrBmp7MCuPycKJ33J5gW7bimFNM45kPhtvxt61vUauZq9QEryi", + "X7ST75TRv+jM/fF/QlAi5ePuwsNtXWVCMLFxFLavuQ+fOUWWO/eLU36EVI9pH+2PWv/75rGvhtybO6+h", + "grqlweVND+8ONHCIEOIg3AY9lBbx2ficdr9GDFF73ez65ljtfODfk5AtpIYzsNlkr+T0PA3Cy1x14Rvi", + "7M6sf3NsVZ+s9xBlgRmmyCyfSI6xCeq5wjJTnMzJI2I6n065jCh/lBrnv8x+vSo+4xS2dj2gO/OgaZC9", + "5Valbxary36+yv8LAAD//3f/sx3QGwAA", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/pkg/bpmn_engine/api/server/server.go b/pkg/bpmn_engine/api/server/server.go index c82945b..a393ab0 100644 --- a/pkg/bpmn_engine/api/server/server.go +++ b/pkg/bpmn_engine/api/server/server.go @@ -184,13 +184,15 @@ func (s *server) GetProcessInstance(ctx echo.Context, processInstanceKey int64) key := fmt.Sprintf("%d", pi.Key) processDefintionKey := fmt.Sprintf("%d", pi.ProcessDefinitionKey) - time := time.Unix(0, pi.CreatedAt*int64(time.Second)) + createdAt := time.Unix(0, pi.CreatedAt*int64(time.Second)) + completedAt := time.Unix(0, pi.CompletedAt*int64(time.Second)) state := api.ProcessInstanceState(fmt.Sprintf("%d", pi.State)) processInstanceSimple := api.ProcessInstance{ Key: &key, ProcessDefinitionKey: &processDefintionKey, State: &state, - CreatedAt: &time, + CreatedAt: &createdAt, + CompletedAt: &completedAt, CaughtEvents: &pi.CaughtEvents, VariableHolder: &pi.VariableHolder, Activities: &pi.Activities, @@ -216,13 +218,15 @@ func (s *server) GetProcessInstances(ctx echo.Context, params api.GetProcessInst for _, pi := range partitionProcessInstances { key := fmt.Sprintf("%d", pi.Key) processDefintionKey := fmt.Sprintf("%d", pi.ProcessDefinitionKey) - time := time.Unix(0, pi.CreatedAt*int64(time.Second)) + createdAt := time.Unix(0, pi.CreatedAt*int64(time.Second)) + completedAt := time.Unix(0, pi.CompletedAt*int64(time.Second)) state := api.ProcessInstanceState(fmt.Sprintf("%d", pi.State)) processInstanceSimple := api.ProcessInstance{ Key: &key, ProcessDefinitionKey: &processDefintionKey, State: &state, - CreatedAt: &time, + CreatedAt: &createdAt, + CompletedAt: &completedAt, CaughtEvents: &pi.CaughtEvents, VariableHolder: &pi.VariableHolder, Activities: &pi.Activities, diff --git a/pkg/bpmn_engine/engine.go b/pkg/bpmn_engine/engine.go index a802db7..6113d1e 100644 --- a/pkg/bpmn_engine/engine.go +++ b/pkg/bpmn_engine/engine.go @@ -240,6 +240,10 @@ func (state *BpmnEngineState) run(instance *processInstanceInfo) (err error) { // TODO need to send failed State state.exportEndProcessEvent(*process, *instance) } + if instance.State == Completed { + instance.CompletedAt = time.Now() + instance.SetVariable("duration_in_ms", fmt.Sprintf("%d", time.Since(instance.CreatedAt).Milliseconds())) + } // TODO: persistently update state state.persistence.PersistProcessInstance(instance) diff --git a/pkg/bpmn_engine/jobs.go b/pkg/bpmn_engine/jobs.go index 913e1bc..3099589 100644 --- a/pkg/bpmn_engine/jobs.go +++ b/pkg/bpmn_engine/jobs.go @@ -70,9 +70,17 @@ func (state *BpmnEngineState) handleServiceTask(process *ProcessInfo, instance * state.persistence.PersistJob(job) // } + // FIXME: remove this one + autoComplete(state, job) + return job.JobState == Completed, job } +func autoComplete(state *BpmnEngineState, j *job) { + j.JobState = Completed + state.persistence.PersistJob(j) +} + func (state *BpmnEngineState) JobCompleteById(jobId int64) { jobs := state.persistence.FindJobs("", nil, jobId) diff --git a/pkg/bpmn_engine/persistence/rqlite/persistence_rqlite.go b/pkg/bpmn_engine/persistence/rqlite/persistence_rqlite.go index 8c2996e..749ef74 100644 --- a/pkg/bpmn_engine/persistence/rqlite/persistence_rqlite.go +++ b/pkg/bpmn_engine/persistence/rqlite/persistence_rqlite.go @@ -23,7 +23,7 @@ type BpmnEnginePersistenceRqlite struct { func NewBpmnEnginePersistenceRqlite(cfg *Config) *BpmnEnginePersistenceRqlite { context := Start(cfg) - time.Sleep(2 * time.Second) + time.Sleep(2 * time.Second) // FIXME: don't depend on time, rather using a loop, too check if port 4001 is already open Init(context.Str) @@ -136,14 +136,15 @@ func (persistence *BpmnEnginePersistenceRqlite) FindProcessInstances(processInst def := new(sql.ProcessInstanceEntity) def.Key = (*r.Parameters[0]).GetI() - def.ProcessDefinitionKey = int64((*r.Parameters[1]).GetI()) + def.ProcessDefinitionKey = (*r.Parameters[1]).GetI() def.CreatedAt = (*r.Parameters[2]).GetI() + def.CompletedAt = (*r.Parameters[3]).GetI() - def.State = int((*r.Parameters[3]).GetI()) - def.VariableHolder = (*r.Parameters[4]).GetS() - def.CaughtEvents = (*r.Parameters[5]).GetS() + def.State = int((*r.Parameters[4]).GetI()) + def.VariableHolder = (*r.Parameters[5]).GetS() + def.CaughtEvents = (*r.Parameters[6]).GetS() - def.Activities = (*r.Parameters[6]).GetS() + def.Activities = (*r.Parameters[7]).GetS() processInstances = append(processInstances, def) diff --git a/pkg/bpmn_engine/persistence/rqlite/sql/process_instance.go b/pkg/bpmn_engine/persistence/rqlite/sql/process_instance.go index be5fc35..81f933f 100644 --- a/pkg/bpmn_engine/persistence/rqlite/sql/process_instance.go +++ b/pkg/bpmn_engine/persistence/rqlite/sql/process_instance.go @@ -6,6 +6,7 @@ type ProcessInstanceEntity struct { Key int64 ProcessDefinitionKey int64 CreatedAt int64 + CompletedAt int64 State int VariableHolder string CaughtEvents string @@ -17,6 +18,7 @@ CREATE TABLE IF NOT EXISTS process_instance ( key INTEGER PRIMARY KEY, process_definition_key INTEGER NOT NULL, created_at INTEGER NOT NULL, + completed_at INTEGER, state INTEGER NOT NULL, variable_holder TEXT NOT NULL, caught_events TEXT NOT NULL, @@ -26,13 +28,25 @@ CREATE TABLE IF NOT EXISTS process_instance ( const PROCESS_INSTANCE_INSERT = ` INSERT INTO process_instance - (key, process_definition_key, created_at, state, variable_holder, caught_events, activities) + (key, process_definition_key, created_at, completed_at, state, variable_holder, caught_events, activities) VALUES - (%d, %d, %d, %d, '%s', '%s', '%s') ON CONFLICT DO UPDATE SET state = %d, variable_holder = '%s', caught_events = '%s', activities = '%s';` + (%d, %d, %d, %d, %d, '%s', '%s', '%s') ON CONFLICT DO UPDATE SET state = %d, variable_holder = '%s', caught_events = '%s', activities = '%s';` -const PROCESS_INSTANCE_SELECT = `SELECT key, process_definition_key, created_at, state, variable_holder, caught_events, activities FROM process_instance WHERE %s ORDER BY created_at DESC;` +const PROCESS_INSTANCE_SELECT = `SELECT key, process_definition_key, created_at, completed_at, state, variable_holder, caught_events, activities FROM process_instance WHERE %s ORDER BY created_at DESC;` func BuildProcessInstanceUpsertQuery(pie *ProcessInstanceEntity) string { // FIXME: for speed this needs to be splited - return fmt.Sprintf(PROCESS_INSTANCE_INSERT, pie.Key, pie.ProcessDefinitionKey, pie.CreatedAt, pie.State, pie.VariableHolder, pie.CaughtEvents, pie.Activities, pie.State, pie.VariableHolder, pie.CaughtEvents, pie.Activities) + return fmt.Sprintf(PROCESS_INSTANCE_INSERT, + pie.Key, + pie.ProcessDefinitionKey, + pie.CreatedAt, + pie.CompletedAt, + pie.State, + pie.VariableHolder, + pie.CaughtEvents, + pie.Activities, + pie.State, + pie.VariableHolder, + pie.CaughtEvents, + pie.Activities) } diff --git a/pkg/bpmn_engine/process_instance.go b/pkg/bpmn_engine/process_instance.go index ec4b3b2..78dd346 100644 --- a/pkg/bpmn_engine/process_instance.go +++ b/pkg/bpmn_engine/process_instance.go @@ -12,6 +12,7 @@ type processInstanceInfo struct { InstanceKey int64 `json:"ik"` VariableHolder var_holder.VariableHolder `json:"vh,omitempty"` CreatedAt time.Time `json:"c"` + CompletedAt time.Time `json:"cl"` State ActivityState `json:"s"` CaughtEvents []catchEvent `json:"ce,omitempty"` activities []activity diff --git a/pkg/bpmn_engine/rqlite_persistence_facade.go b/pkg/bpmn_engine/rqlite_persistence_facade.go index 14bbd22..3d50602 100644 --- a/pkg/bpmn_engine/rqlite_persistence_facade.go +++ b/pkg/bpmn_engine/rqlite_persistence_facade.go @@ -321,6 +321,7 @@ func (persistence *BpmnEnginePersistenceRqlite) PersistProcessInstance(processIn Key: processInstance.InstanceKey, ProcessDefinitionKey: processInstance.ProcessInfo.ProcessKey, CreatedAt: processInstance.CreatedAt.Unix(), + CompletedAt: processInstance.CompletedAt.Unix(), State: activityStateMap[processInstance.State], VariableHolder: string(varaiblesJson), CaughtEvents: string(caughtEvents),