Skip to content

Commit

Permalink
Support UTC DateTime Structures
Browse files Browse the repository at this point in the history
Contrary to 5.0, invalid timezones received from the server will
result in the transaction being retried and cause the connection
to eventually put back to pool, after all attempts are
exhausted.

While this is not ideal, this has been done so to minimize
breaking changes in 4.x.

This also fixes race condition in integration tests where RESET
was racing against NEXT and would sometimes see a FAILURE,
marking the connection as dead.
  • Loading branch information
fbiville authored Aug 3, 2022
1 parent 576dcb4 commit c7dd381
Show file tree
Hide file tree
Showing 16 changed files with 955 additions and 1,042 deletions.
1 change: 1 addition & 0 deletions neo4j/internal/bolt/bolt3.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func NewBolt3(serverName string, conn net.Conn, logger log.Logger, boltLog log.B
b.state = bolt3_dead
},
boltLogger: boltLog,
useUtc: false,
}
return b
}
Expand Down
3 changes: 3 additions & 0 deletions neo4j/internal/bolt/bolt3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ func TestBolt3(ot *testing.T) {

bolt := c.(*bolt3)
assertBoltState(t, bolt3_ready, bolt)
if bolt.out.useUtc {
t.Fatalf("Bolt 3 connections must never send and receive UTC datetimes")
}
return bolt, cleanup
}

Expand Down
15 changes: 15 additions & 0 deletions neo4j/internal/bolt/bolt4.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ func (b *bolt4) connect(minor int, auth map[string]interface{}, userAgent string
hello["routing"] = routingContext
}
}
checkUtcPatch := minor >= 3
if checkUtcPatch {
hello["patch_bolt"] = []string{"utc"}
}
// Merge authentication keys into hello, avoid overwriting existing keys
for k, v := range auth {
_, exists := hello[k]
Expand All @@ -244,6 +248,17 @@ func (b *bolt4) connect(minor int, auth map[string]interface{}, userAgent string

b.connId = succ.connectionId
b.serverVersion = succ.server
if checkUtcPatch {
useUtc := false
for _, patch := range succ.patches {
if patch == "utc" {
useUtc = true
break
}
}
b.in.hyd.useUtc = useUtc
b.out.useUtc = useUtc
}

// Construct log identity
connectionLogId := fmt.Sprintf("%s@%s", b.connId, b.serverName)
Expand Down
31 changes: 31 additions & 0 deletions neo4j/internal/bolt/bolt4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func TestBolt4(ot *testing.T) {
AssertStringEqual(t, bolt.ServerName(), "serverName")
AssertTrue(t, bolt.IsAlive())
AssertTrue(t, reflect.DeepEqual(bolt.in.connReadTimeout, time.Duration(-1)))
AssertFalse(t, bolt.out.useUtc)
})

ot.Run("Connect success with timeout hint", func(t *testing.T) {
Expand All @@ -153,6 +154,36 @@ func TestBolt4(ot *testing.T) {
AssertTrue(t, reflect.DeepEqual(bolt.in.connReadTimeout, 42*time.Second))
})

for _, version := range [][]byte{{4, 3}, {4, 4}} {
major := version[0]
minor := version[1]
ot.Run(fmt.Sprintf("[%d.%d] Connect success with UTC patch", major, minor), func(t *testing.T) {
bolt, cleanup := connectToServer(t, func(srv *bolt4server) {
srv.waitForHandshake()
srv.acceptVersion(major, minor)
srv.waitForHelloWithPatches([]interface{}{"utc"})
srv.acceptHelloWithPatches([]interface{}{"utc"})
})
defer cleanup()
defer bolt.Close()

AssertTrue(t, bolt.out.useUtc)
})

ot.Run(fmt.Sprintf("[%d.%d] Connect success with unknown patch", major, minor), func(t *testing.T) {
bolt, cleanup := connectToServer(t, func(srv *bolt4server) {
srv.waitForHandshake()
srv.acceptVersion(major, minor)
srv.waitForHelloWithPatches([]interface{}{"utc"})
srv.acceptHelloWithPatches([]interface{}{"some-unknown-patch"})
})
defer cleanup()
defer bolt.Close()

AssertFalse(t, bolt.out.useUtc)
})
}

invalidValues := []interface{}{4.2, "42", -42}
for _, value := range invalidValues {
ot.Run(fmt.Sprintf("Connect success with ignored invalid timeout hint %v", value), func(t *testing.T) {
Expand Down
18 changes: 18 additions & 0 deletions neo4j/internal/bolt/bolt4server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"net"
"reflect"
"testing"

"github.com/neo4j/neo4j-go-driver/v4/neo4j/internal/packstream"
Expand Down Expand Up @@ -76,6 +77,15 @@ func (s *bolt4server) sendIgnoredMsg() {
s.send(msgIgnored)
}

func (s *bolt4server) waitForHelloWithPatches(patches []interface{}) map[string]interface{} {
m := s.waitForHello()
actualPatches := m["patch_bolt"]
if !reflect.DeepEqual(actualPatches, patches) {
s.sendFailureMsg("?", fmt.Sprintf("Expected %v patches, got %v", patches, actualPatches))
}
return m
}

// Returns the first hello field
func (s *bolt4server) waitForHello() map[string]interface{} {
msg := s.receiveMsg()
Expand Down Expand Up @@ -240,6 +250,14 @@ func (s *bolt4server) acceptHelloWithHints(hints map[string]interface{}) {
})
}

func (s *bolt4server) acceptHelloWithPatches(patches []interface{}) {
s.send(msgSuccess, map[string]interface{}{
"connection_id": "cid",
"server": "fake/4.5",
"patch_bolt": patches,
})
}

func (s *bolt4server) rejectHelloUnauthorized() {
s.send(msgFailure, map[string]interface{}{
"code": "Neo.ClientError.Security.Unauthorized",
Expand Down
99 changes: 88 additions & 11 deletions neo4j/internal/bolt/hydrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type success struct {
routingTable *db.RoutingTable
num uint32
configurationHints map[string]interface{}
patches []string
}

func (s *success) String() string {
Expand Down Expand Up @@ -92,6 +93,7 @@ type hydrator struct {
cachedSuccess success
boltLogger log.BoltLogger
logId string
useUtc bool
}

func (h *hydrator) setErr(err error) {
Expand Down Expand Up @@ -245,6 +247,9 @@ func (h *hydrator) success(n uint32) *success {
case "hints":
hints := h.amap()
succ.configurationHints = hints
case "patch_bolt":
patches := h.strings()
succ.patches = patches
default:
// Unknown key, waste it
h.trash()
Expand Down Expand Up @@ -404,9 +409,25 @@ func (h *hydrator) value() interface{} {
case 'Y':
return h.point3d(n)
case 'F':
if h.useUtc {
return h.unknownStructError(t)
}
return h.dateTimeOffset(n)
case 'I':
if !h.useUtc {
return h.unknownStructError(t)
}
return h.utcDateTimeOffset(n)
case 'f':
if h.useUtc {
return h.unknownStructError(t)
}
return h.dateTimeNamedZone(n)
case 'i':
if !h.useUtc {
return h.unknownStructError(t)
}
return h.utcDateTimeNamedZone(n)
case 'd':
return h.localDateTime(n)
case 'D':
Expand All @@ -418,8 +439,7 @@ func (h *hydrator) value() interface{} {
case 'E':
return h.duration(n)
default:
h.err = errors.New(fmt.Sprintf("Unknown tag: %02x", t))
return nil
return h.unknownStructError(t)
}
case packstream.PackedByteArray:
return h.unp.ByteArray()
Expand Down Expand Up @@ -568,30 +588,82 @@ func (h *hydrator) point3d(n uint32) interface{} {

func (h *hydrator) dateTimeOffset(n uint32) interface{} {
h.unp.Next()
secs := h.unp.Int()
seconds := h.unp.Int()
h.unp.Next()
nans := h.unp.Int()
nanos := h.unp.Int()
h.unp.Next()
offs := h.unp.Int()
t := time.Unix(secs, nans).UTC()
l := time.FixedZone("Offset", int(offs))
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), l)
offset := h.unp.Int()
// time.Time in local timezone, e.g. 15th of June 2020, 15:30 in Paris (UTC+2h)
unixTime := time.Unix(seconds, nanos)
// time.Time computed in UTC timezone, e.g. 15th of June 2020, 13:30 in UTC
utcTime := unixTime.UTC()
// time.Time **copied** as-is in the target timezone, e.g. 15th of June 2020, 13:30 in target tz
timeZone := time.FixedZone("Offset", int(offset))
return time.Date(
utcTime.Year(),
utcTime.Month(),
utcTime.Day(),
utcTime.Hour(),
utcTime.Minute(),
utcTime.Second(),
utcTime.Nanosecond(),
timeZone,
)
}

func (h *hydrator) utcDateTimeOffset(n uint32) interface{} {
h.unp.Next()
seconds := h.unp.Int()
h.unp.Next()
nanos := h.unp.Int()
h.unp.Next()
offset := h.unp.Int()
timeZone := time.FixedZone("Offset", int(offset))
return time.Unix(seconds, nanos).In(timeZone)
}

func (h *hydrator) dateTimeNamedZone(n uint32) interface{} {
h.unp.Next()
seconds := h.unp.Int()
h.unp.Next()
nanos := h.unp.Int()
h.unp.Next()
zone := h.unp.String()
// time.Time in local timezone, e.g. 15th of June 2020, 15:30 in Paris (UTC+2h)
unixTime := time.Unix(seconds, nanos)
// time.Time computed in UTC timezone, e.g. 15th of June 2020, 13:30 in UTC
utcTime := unixTime.UTC()
// time.Time **copied** as-is in the target timezone, e.g. 15th of June 2020, 13:30 in target tz
l, err := time.LoadLocation(zone)
if err != nil {
h.setErr(err)
return nil
}
return time.Date(
utcTime.Year(),
utcTime.Month(),
utcTime.Day(),
utcTime.Hour(),
utcTime.Minute(),
utcTime.Second(),
utcTime.Nanosecond(),
l,
)
}

func (h *hydrator) utcDateTimeNamedZone(n uint32) interface{} {
h.unp.Next()
secs := h.unp.Int()
h.unp.Next()
nans := h.unp.Int()
h.unp.Next()
zone := h.unp.String()
t := time.Unix(secs, nans).UTC()
l, err := time.LoadLocation(zone)
timeZone, err := time.LoadLocation(zone)
if err != nil {
h.setErr(err)
return nil
}
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), l)
return time.Unix(secs, nans).In(timeZone)
}

func (h *hydrator) localDateTime(n uint32) interface{} {
Expand Down Expand Up @@ -740,3 +812,8 @@ func parseNotification(m map[string]interface{}) db.Notification {

return n
}

func (h *hydrator) unknownStructError(t byte) interface{} {
h.setErr(fmt.Errorf("Unknown tag: %02x", t))
return nil
}
Loading

0 comments on commit c7dd381

Please sign in to comment.