Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oplogtoredis: Add Configurable Denylist to HTTP Server #64

Merged
merged 21 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ buildGoModule {
'';

# update: set value to an empty string and run `nix build`. This will download Go, fetch the dependencies and calculates their hash.
vendorHash = "sha256-ceToA2DC1bhmg9WIeNSAfoNoU7sk9PrQqgqt5UbpivQ=";
vendorHash = "sha256-Vh7O0iMPG6nAvcyv92h5TVZS2awnR0vz75apyzJeu4c=";

nativeBuildInputs = [ installShellFiles ];
doCheck = false;
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/deckarep/golang-set v1.7.1
github.com/go-redis/redis/v7 v7.4.1
github.com/go-redis/redis/v8 v8.11.5
github.com/gorilla/websocket v1.4.2
github.com/juju/mgo/v2 v2.0.0-20210302023703-70d5d206e208
github.com/juju/replicaset v0.0.0-20210302050932-0303c8575745
Expand All @@ -29,10 +30,10 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/golang/protobuf v1.4.3 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/gomodule/redigo v1.8.5 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/juju/clock v0.0.0-20190205081909-9c5c9712527c // indirect
github.com/juju/errors v0.0.0-20200330140219-3fe23663418f // indirect
github.com/juju/loggo v0.0.0-20200526014432-9ce3a2e09b5e // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
Expand Down
88 changes: 88 additions & 0 deletions integration-tests/acceptance/denylist_http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package main

import (
"bytes"
"encoding/json"
"io"
"net/http"
"os"
"reflect"
"testing"
)

func doRequest(method string, path string, t *testing.T, expectedCode int) interface{} {
req, err := http.NewRequest(method, os.Getenv("OTR_URL")+path, &bytes.Buffer{})
if err != nil {
t.Fatalf("Error creating req: %s", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := (&http.Client{}).Do(req)
if err != nil {
t.Fatalf("Error sending request: %s", err)
}

defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Error eceiving response body: %s", err)
}

if resp.StatusCode != expectedCode {
t.Fatalf("Expected status code %d, but got %d.\nBody was: %s", expectedCode, resp.StatusCode, respBody)
}

if expectedCode == 200 {
var data interface{}
err = json.Unmarshal(respBody, &data)
if err != nil {
t.Fatalf("Error parsing JSON response: %s", err)
}

return data
}
return nil
}

// Test the /denylist HTTP operations
func TestDenyList(t *testing.T) {
// GET empty list of rules
data := doRequest("GET", "/denylist", t, 200)
if !reflect.DeepEqual(data, []interface{}{}) {
t.Fatalf("Expected empty list from blank GET, but got %#v", data)
}
// PUT new rule
doRequest("PUT", "/denylist/abc", t, 201)
// GET list with new rule in it
data = doRequest("GET", "/denylist", t, 200)
if !reflect.DeepEqual(data, []interface{}{"abc"}) {
t.Fatalf("Expected singleton from GET, but got %#v", data)
}
// GET existing rule
data = doRequest("GET", "/denylist/abc", t, 200)
if !reflect.DeepEqual(data, "abc") {
t.Fatalf("Expected matched body from GET, but got %#v", data)
}
// PUT second rule
doRequest("PUT", "/denylist/def", t, 201)
// GET second rule
data = doRequest("GET", "/denylist/def", t, 200)
if !reflect.DeepEqual(data, "def") {
t.Fatalf("Expected matched body from GET, but got %#v", data)
}
// GET list with both rules
data = doRequest("GET", "/denylist", t, 200)
// check both permutations, in case the server reordered them
if !reflect.DeepEqual(data, []interface{}{"abc", "def"}) && !reflect.DeepEqual(data, []interface{}{"def", "abc"}) {
t.Fatalf("Expected doubleton from GET, but got %#v", data)
}
// DELETE first rule
doRequest("DELETE", "/denylist/abc", t, 204)
// GET first rule
doRequest("GET", "/denylist/abc", t, 404)
// GET list with only second rule
data = doRequest("GET", "/denylist", t, 200)
if !reflect.DeepEqual(data, []interface{}{"def"}) {
t.Fatalf("Expected singleton from GET, but got %#V", data)
}
}
72 changes: 72 additions & 0 deletions integration-tests/acceptance/denylist_oplog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package main

import (
"context"
"testing"

"github.com/tulip/oplogtoredis/integration-tests/helpers"
"go.mongodb.org/mongo-driver/bson"
)

func TestDenyOplog(t *testing.T) {
harness := startHarness()
defer harness.stop()

_, err := harness.mongoClient.Collection("Foo").InsertOne(context.Background(), bson.M{
"_id": "id1",
"f": "1",
})
if err != nil {
panic(err)
}

expectedMessage1 := helpers.OTRMessage{
Event: "i",
Document: map[string]interface{}{
"_id": "id1",
},
Fields: []string{"_id", "f"},
}

harness.verify(t, map[string][]helpers.OTRMessage{
"tests.Foo": {expectedMessage1},
"tests.Foo::id1": {expectedMessage1},
})

doRequest("PUT", "/denylist/tests", t, 201)

_, err = harness.mongoClient.Collection("Foo").InsertOne(context.Background(), bson.M{
"_id": "id2",
"g": "2",
})
if err != nil {
panic(err)
}

// second message should not have been received, since it got denied
harness.verify(t, map[string][]helpers.OTRMessage{})

doRequest("DELETE", "/denylist/tests", t, 204)

_, err = harness.mongoClient.Collection("Foo").InsertOne(context.Background(), bson.M{
"_id": "id3",
"h": "3",
})
if err != nil {
panic(err)
}

expectedMessage3 := helpers.OTRMessage{
Event: "i",
Document: map[string]interface{}{
"_id": "id3",
},
Fields: []string{"_id", "h"},
}

// back to normal now that the deny rule is gone
harness.verify(t, map[string][]helpers.OTRMessage{
"tests.Foo": {expectedMessage3},
"tests.Foo::id3": {expectedMessage3},
})
}
127 changes: 127 additions & 0 deletions lib/denylist/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package denylist

import (
"encoding/json"
"net/http"
"strings"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/tulip/oplogtoredis/lib/log"
)

var metricFilterEnabled = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "otr",
Subsystem: "denylist",
Name: "filter_enabled",
Help: "Gauge indicating whether the denylist filter is enabled for a particular DB name",
}, []string{"db"})

// CollectionEndpoint serves the endpoints for the whole Denylist at /denylist
func CollectionEndpoint(denylist *sync.Map) func(http.ResponseWriter, *http.Request) {
return func(response http.ResponseWriter, request *http.Request) {
switch request.Method {
case "GET":
listDenylistKeys(response, denylist)
default:
http.Error(response, http.StatusText(http.StatusNotFound), http.StatusNotFound)
}
}
}

// SingleEndpoint serves the endpoints for particular Denylist entries at /denylist/...
func SingleEndpoint(denylist *sync.Map) func(http.ResponseWriter, *http.Request) {
return func(response http.ResponseWriter, request *http.Request) {
switch request.Method {
case "GET":
getDenylistEntry(response, request, denylist)
case "PUT":
createDenylistEntry(response, request, denylist)
case "DELETE":
deleteDenylistEntry(response, request, denylist)
default:
http.Error(response, http.StatusText(http.StatusNotFound), http.StatusNotFound)
}
}
}

// GET /denylist
func listDenylistKeys(response http.ResponseWriter, denylist *sync.Map) {
keys := []interface{}{}

denylist.Range(func(key interface{}, value interface{}) bool {
keys = append(keys, key)
return true
})

response.Header().Set("Content-Type", "application/json")
response.WriteHeader(http.StatusOK)
err := json.NewEncoder(response).Encode(keys)
if err != nil {
http.Error(response, "couldn't encode result", http.StatusInternalServerError)
return
}
}

// GET /denylist/...
func getDenylistEntry(response http.ResponseWriter, request *http.Request, denylist *sync.Map) {
id := request.URL.Path
if strings.Contains(id, "/") {
http.Error(response, http.StatusText(http.StatusNotFound), http.StatusNotFound)
return
}
_, exists := denylist.Load(id)
if !exists {
http.Error(response, "denylist entry not found with that id", http.StatusNotFound)
return
}

response.Header().Set("Content-Type", "application/json")
response.WriteHeader(http.StatusOK)
err := json.NewEncoder(response).Encode(id)
if err != nil {
http.Error(response, "couldn't encode result", http.StatusInternalServerError)
return
}
}

// PUT /denylist/...
func createDenylistEntry(response http.ResponseWriter, request *http.Request, denylist *sync.Map) {
alex-goodisman marked this conversation as resolved.
Show resolved Hide resolved
id := request.URL.Path
alex-goodisman marked this conversation as resolved.
Show resolved Hide resolved
if strings.Contains(id, "/") {
http.Error(response, http.StatusText(http.StatusNotFound), http.StatusNotFound)
return
}
_, exists := denylist.Load(id)
if exists {
response.WriteHeader(http.StatusNoContent)
return
}

denylist.Store(id, true)
log.Log.Infow("Created denylist entry", "id", id)
metricFilterEnabled.WithLabelValues(id).Set(1)

response.WriteHeader(http.StatusCreated)
}

// DELETE /denylist/...
func deleteDenylistEntry(response http.ResponseWriter, request *http.Request, denylist *sync.Map) {
alex-goodisman marked this conversation as resolved.
Show resolved Hide resolved
id := request.URL.Path
if strings.Contains(id, "/") {
http.Error(response, http.StatusText(http.StatusNotFound), http.StatusNotFound)
return
}
_, exists := denylist.Load(id)
if !exists {
http.Error(response, "denylist entry not found with that id", http.StatusNotFound)
return
}

denylist.Delete(id)
log.Log.Infow("Deleted denylist entry", "id", id)
metricFilterEnabled.WithLabelValues(id).Set(0)

response.WriteHeader(http.StatusNoContent)
}
20 changes: 18 additions & 2 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"strings"
"sync"
"time"

"github.com/tulip/oplogtoredis/lib/config"
Expand All @@ -29,6 +30,7 @@ type Tailer struct {
RedisClients []redis.UniversalClient
RedisPrefix string
MaxCatchUp time.Duration
Denylist *sync.Map
}

// Raw oplog entry from Mongo
Expand Down Expand Up @@ -99,6 +101,13 @@ var (
Name: "last_entry_staleness_seconds",
Help: "Gauge recording the difference between this server's clock and the timestamp on the last read oplog entry.",
})

metricOplogEntriesFiltered = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "otr",
Subsystem: "oplog",
Name: "entries_filtered",
Help: "Oplog entries filtered by denylist",
}, []string{"database"})
)

func init() {
Expand Down Expand Up @@ -197,7 +206,7 @@ func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan b
continue
}

ts, pubs := tailer.unmarshalEntry(rawData)
ts, pubs := tailer.unmarshalEntry(rawData, tailer.Denylist)

if ts != nil {
lastTimestamp = *ts
Expand Down Expand Up @@ -331,7 +340,7 @@ func closeCursor(cursor *mongo.Cursor) {
//
// The timestamp of the entry is returned so that tailOnce knows the timestamp of the last entry it read, even if it
// ignored it or failed at some later step.
func (tailer *Tailer) unmarshalEntry(rawData bson.Raw) (timestamp *primitive.Timestamp, pubs []*redispub.Publication) {
func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map) (timestamp *primitive.Timestamp, pubs []*redispub.Publication) {
var result rawOplogEntry

err := bson.Unmarshal(rawData, &result)
Expand Down Expand Up @@ -363,6 +372,13 @@ func (tailer *Tailer) unmarshalEntry(rawData bson.Raw) (timestamp *primitive.Tim
database = entries[0].Database
}

if _, denied := denylist.Load(database); denied {
torywheelwright marked this conversation as resolved.
Show resolved Hide resolved
log.Log.Debugw("Skipping oplog entry", "database", database)
metricOplogEntriesFiltered.WithLabelValues(database).Add(1)

return
}

type errEntry struct {
err error
op *oplogEntry
Expand Down
Loading
Loading