diff --git a/kcl/kcl.go b/kcl/kcl.go index b1f37cd..e630653 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -51,13 +51,27 @@ func (i ioHandler) readLine() (string, error) { return line, nil } +// ActionName is the "action" string contained in the JSON sent from the KCL. +type ActionName string + +const ( + ActionNameInitialize ActionName = "initialize" + ActionNameProcessRecords = "processRecords" + ActionNameShutdownRequested = "shutdownRequested" + ActionNameShutdown = "shutdown" + ActionNameCheckpoint = "checkpoint" +) + type ActionInitialize struct { - Action string `json:"action"` ShardID string `json:"shardId"` SequenceNumber string `json:"sequenceNumber"` SubSequenceNumber int `json:"subSequenceNumber"` } +func (a ActionInitialize) Name() ActionName { + return ActionNameInitialize +} + type Record struct { SequenceNumber string `json:"sequenceNumber"` SubSequenceNumber int `json:"subSequenceNumber"` @@ -67,53 +81,63 @@ type Record struct { } type ActionProcessRecords struct { - Action string `json:"action"` Records []Record `json:"records"` MillisBehindLatest int `json:"millisBehindLatest"` } +func (a ActionProcessRecords) Name() ActionName { + return ActionNameProcessRecords +} + type ActionShutdown struct { - Action string `json:"action"` Reason string `json:"reason"` } +func (a ActionShutdown) Name() ActionName { + return ActionNameShutdown +} + type ActionCheckpoint struct { - Action string `json:"action"` - SequenceNumber *string `json:"sequenceNumber,omitempty"` - SubSequenceNumber *int `json:"subSequenceNumber,omitempty"` - Error *string `json:"error,omitempty"` + Action ActionName `json:"action"` + SequenceNumber *string `json:"sequenceNumber,omitempty"` + SubSequenceNumber *int `json:"subSequenceNumber,omitempty"` + Error *string `json:"error,omitempty"` +} + +func (a ActionCheckpoint) Name() ActionName { + return ActionNameCheckpoint } func (i ioHandler) loadAction(line string) (interface{}, error) { lineBytes := []byte(line) var message struct { - Action string `json:"action"` + Action ActionName `json:"action"` } if err := json.Unmarshal(lineBytes, &message); err != nil { return nil, err } switch message.Action { - case "initialize": + case ActionNameInitialize: var actionInitialize ActionInitialize if err := json.Unmarshal(lineBytes, &actionInitialize); err != nil { return nil, err } return actionInitialize, nil - case "processRecords": + case ActionNameProcessRecords: var actionProcessRecords ActionProcessRecords if err := json.Unmarshal(lineBytes, &actionProcessRecords); err != nil { return nil, err } return actionProcessRecords, nil - case "shutdownRequested": + case ActionNameShutdownRequested: fallthrough - case "shutdown": + case ActionNameShutdown: var actionShutdown ActionShutdown if err := json.Unmarshal(lineBytes, &actionShutdown); err != nil { return nil, err } return actionShutdown, nil - case "checkpoint": + case ActionNameCheckpoint: var actionCheckpoint ActionCheckpoint if err := json.Unmarshal(lineBytes, &actionCheckpoint); err != nil { return nil, err @@ -208,31 +232,36 @@ func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error { func (kclp *KCLProcess) sendCheckpoint(seq *string, subSeq *int) error { return kclp.ioHandler.writeAction(ActionCheckpoint{ - Action: "checkpoint", + Action: ActionNameCheckpoint, SequenceNumber: seq, SubSequenceNumber: subSeq, }) } -func (kclp *KCLProcess) reportDone(responseFor string) error { +func (kclp *KCLProcess) reportDone(responseFor ActionName) error { return kclp.ioHandler.writeAction(struct { - Action string `json:"action"` - ResponseFor string `json:"responseFor"` + Action string `json:"action"` + ResponseFor ActionName `json:"responseFor"` }{ Action: "status", ResponseFor: responseFor, }) } -func (kclp *KCLProcess) handleLine(line string) (string, error) { +type Action interface { + Name() ActionName +} + +// handleLine processes a line of text sent to the process by the KCL. It returns the action handled, if any. +func (kclp *KCLProcess) handleLine(line string) (Action, error) { action, err := kclp.ioHandler.loadAction(line) if err != nil { - return "", err + return nil, err } switch action := action.(type) { case ActionCheckpoint: - return "checkpoint", kclp.handleCheckpointAction(action) + return action, kclp.handleCheckpointAction(action) case ActionShutdown: kclp.ioHandler.writeError("Received shutdown action...") @@ -251,23 +280,23 @@ func (kclp *KCLProcess) handleLine(line string) (string, error) { } } - return "shutdown", kclp.reportDone("shutdown") + return action, kclp.reportDone(action.Name()) case ActionInitialize: err := kclp.recordProcessor.Initialize(action.ShardID, kclp) if err != nil { - return "", err + return nil, err } - return "initialize", kclp.reportDone(action.Action) + return action, kclp.reportDone(action.Name()) case ActionProcessRecords: err := kclp.recordProcessor.ProcessRecords(action.Records) if err != nil { - return "", err + return nil, err } - return "process-record", kclp.reportDone(action.Action) + return action, kclp.reportDone(action.Name()) default: - return "", fmt.Errorf("unknown action to dispatch: %+#v", action) + return nil, fmt.Errorf("unknown action to dispatch: %+#v", action) } } @@ -288,7 +317,7 @@ func (kclp *KCLProcess) Run() { action, err := kclp.handleLine(line) if err != nil { kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err)) - } else if action == "shutdown" { + } else if action != nil && action.Name() == ActionNameShutdown { return }