Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove some strings #20

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 56 additions & 27 deletions kcl/kcl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,27 @@ func (i ioHandler) readLine() (string, error) {
return line, nil
}

// ActionName is the "action" string contained in the JSON sent from the KCL.
type ActionName string

const (
ActionNameInitialize ActionName = "initialize"
ActionNameProcessRecords = "processRecords"
ActionNameShutdownRequested = "shutdownRequested"
ActionNameShutdown = "shutdown"
ActionNameCheckpoint = "checkpoint"
)

type ActionInitialize struct {
Action string `json:"action"`
ShardID string `json:"shardId"`
SequenceNumber string `json:"sequenceNumber"`
SubSequenceNumber int `json:"subSequenceNumber"`
}

func (a ActionInitialize) Name() ActionName {
return ActionNameInitialize
}

type Record struct {
SequenceNumber string `json:"sequenceNumber"`
SubSequenceNumber int `json:"subSequenceNumber"`
Expand All @@ -67,53 +81,63 @@ type Record struct {
}

type ActionProcessRecords struct {
Action string `json:"action"`
Records []Record `json:"records"`
MillisBehindLatest int `json:"millisBehindLatest"`
}

func (a ActionProcessRecords) Name() ActionName {
return ActionNameProcessRecords
}

type ActionShutdown struct {
Action string `json:"action"`
Reason string `json:"reason"`
}

func (a ActionShutdown) Name() ActionName {
return ActionNameShutdown
}

type ActionCheckpoint struct {
Action string `json:"action"`
SequenceNumber *string `json:"sequenceNumber,omitempty"`
SubSequenceNumber *int `json:"subSequenceNumber,omitempty"`
Error *string `json:"error,omitempty"`
Action ActionName `json:"action"`
SequenceNumber *string `json:"sequenceNumber,omitempty"`
SubSequenceNumber *int `json:"subSequenceNumber,omitempty"`
Error *string `json:"error,omitempty"`
}

func (a ActionCheckpoint) Name() ActionName {
return ActionNameCheckpoint
}

func (i ioHandler) loadAction(line string) (interface{}, error) {
lineBytes := []byte(line)
var message struct {
Action string `json:"action"`
Action ActionName `json:"action"`
}
if err := json.Unmarshal(lineBytes, &message); err != nil {
return nil, err
}
switch message.Action {
case "initialize":
case ActionNameInitialize:
var actionInitialize ActionInitialize
if err := json.Unmarshal(lineBytes, &actionInitialize); err != nil {
return nil, err
}
return actionInitialize, nil
case "processRecords":
case ActionNameProcessRecords:
var actionProcessRecords ActionProcessRecords
if err := json.Unmarshal(lineBytes, &actionProcessRecords); err != nil {
return nil, err
}
return actionProcessRecords, nil
case "shutdownRequested":
case ActionNameShutdownRequested:
fallthrough
case "shutdown":
case ActionNameShutdown:
var actionShutdown ActionShutdown
if err := json.Unmarshal(lineBytes, &actionShutdown); err != nil {
return nil, err
}
return actionShutdown, nil
case "checkpoint":
case ActionNameCheckpoint:
var actionCheckpoint ActionCheckpoint
if err := json.Unmarshal(lineBytes, &actionCheckpoint); err != nil {
return nil, err
Expand Down Expand Up @@ -208,31 +232,36 @@ func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error {

func (kclp *KCLProcess) sendCheckpoint(seq *string, subSeq *int) error {
return kclp.ioHandler.writeAction(ActionCheckpoint{
Action: "checkpoint",
Action: ActionNameCheckpoint,
SequenceNumber: seq,
SubSequenceNumber: subSeq,
})
}

func (kclp *KCLProcess) reportDone(responseFor string) error {
func (kclp *KCLProcess) reportDone(responseFor ActionName) error {
return kclp.ioHandler.writeAction(struct {
Action string `json:"action"`
ResponseFor string `json:"responseFor"`
Action string `json:"action"`
ResponseFor ActionName `json:"responseFor"`
}{
Action: "status",
ResponseFor: responseFor,
})
}

func (kclp *KCLProcess) handleLine(line string) (string, error) {
type Action interface {
Name() ActionName
}

// handleLine processes a line of text sent to the process by the KCL. It returns the action handled, if any.
func (kclp *KCLProcess) handleLine(line string) (Action, error) {
action, err := kclp.ioHandler.loadAction(line)
if err != nil {
return "", err
return nil, err
}

switch action := action.(type) {
case ActionCheckpoint:
return "checkpoint", kclp.handleCheckpointAction(action)
return action, kclp.handleCheckpointAction(action)
case ActionShutdown:
kclp.ioHandler.writeError("Received shutdown action...")

Expand All @@ -251,23 +280,23 @@ func (kclp *KCLProcess) handleLine(line string) (string, error) {
}
}

return "shutdown", kclp.reportDone("shutdown")
return action, kclp.reportDone(action.Name())
case ActionInitialize:
err := kclp.recordProcessor.Initialize(action.ShardID, kclp)
if err != nil {
return "", err
return nil, err
}

return "initialize", kclp.reportDone(action.Action)
return action, kclp.reportDone(action.Name())
case ActionProcessRecords:
err := kclp.recordProcessor.ProcessRecords(action.Records)
if err != nil {
return "", err
return nil, err
}

return "process-record", kclp.reportDone(action.Action)
return action, kclp.reportDone(action.Name())
default:
return "", fmt.Errorf("unknown action to dispatch: %+#v", action)
return nil, fmt.Errorf("unknown action to dispatch: %+#v", action)
}
}

Expand All @@ -288,7 +317,7 @@ func (kclp *KCLProcess) Run() {
action, err := kclp.handleLine(line)
if err != nil {
kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err))
} else if action == "shutdown" {
} else if action != nil && action.Name() == ActionNameShutdown {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there cases when action will be nil? If I'm reading the code correctly, that should only happen on error.

Copy link
Contributor

@xavi- xavi- Sep 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just occurred to me, you may need to add a "ActionNameShutdownRequested" check here as well.

return
}

Expand Down