Skip to content

Commit

Permalink
add a start time for forward matching
Browse files Browse the repository at this point in the history
  • Loading branch information
JssDWt committed Dec 4, 2023
1 parent 9b962c7 commit 09c4cbd
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 1 deletion.
9 changes: 8 additions & 1 deletion history/forward_matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ const (
)

func (s *ForwardMatcher) MatchForwards(ctx context.Context) {
// By default start matching forwards from 2016-1-1
s.lastStartTime = time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC)
t, err := s.store.GetLastMatchedForwardTime(ctx)
if err == nil {
s.lastStartTime = t.Add(-perForwardMatchTimeRange)
}
s.matchForwardsOnce(ctx)

for {
Expand All @@ -48,6 +54,7 @@ func (s *ForwardMatcher) matchForwardsOnce(ctx context.Context) {
start := s.lastStartTime.Add(perForwardMatchTimeRange / 2)
end := start.Add(perForwardMatchTimeRange)
if end.Add(perForwardMatchTimeRange).After(time.Now()) {
log.Printf("forwardsSynchronizeNodeOnce() - Range start %v - end %v is too close to the present. Stopping matching.", start, end)
break
}

Expand All @@ -69,7 +76,7 @@ func (s *ForwardMatcher) matchForwardsOnce(ctx context.Context) {
log.Printf("Error matching internal forwards: %v", err)
break
}
log.Printf("forwardsSynchronizeNodeOnce() - Range start %v - end %v complete", start, end)
log.Printf("forwardsSynchronizeNodeOnce() - Range start %v - end %v complete", start, end)
s.lastStartTime = start
}
log.Printf("forwardsSynchronizeNodeOnce() - Done")
Expand Down
1 change: 1 addition & 0 deletions history/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@ type Store interface {
MatchInternalForwards(ctx context.Context, start time.Time, end time.Time) error
MatchForwardsAndChannels(ctx context.Context) error
GetForwardsWithoutChannelCount(ctx context.Context) (int64, error)
GetLastMatchedForwardTime(ctx context.Context) (*time.Time, error)
}
24 changes: 24 additions & 0 deletions postgresql/history_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,30 @@ func (s *HistoryStore) GetForwardsWithoutChannelCount(ctx context.Context) (int6
return count, nil
}

func (s *HistoryStore) GetLastMatchedForwardTime(ctx context.Context) (*time.Time, error) {
result, err := s.pool.Query(ctx, `
SELECT MAX(resolved_time)
FROM internal_forward_correlations c
INNER JOIN forwarding_history h
ON c.local_forward_nodeid = h.nodeid AND c.local_forward_identifier = h.identifier`)
if err != nil {
return nil, err
}

if !result.Next() {
return nil, fmt.Errorf("could not get a resolved time")
}

var time_ns int64
err = result.Scan(&time_ns)
if err != nil {
return nil, err
}

t := time.Unix(0, time_ns)
return &t, nil
}

func (s *HistoryStore) MatchInternalForwards(ctx context.Context, start time.Time, end time.Time) error {
matches, err := s.getMatches(ctx, start.UnixNano(), end.UnixNano())
if err != nil {
Expand Down

0 comments on commit 09c4cbd

Please sign in to comment.