Skip to content

Commit

Permalink
Add stream_match for wildcard prefix queries
Browse files Browse the repository at this point in the history
  • Loading branch information
jyeshe committed Oct 14, 2024
1 parent d2adc7d commit d160c17
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 14 deletions.
47 changes: 37 additions & 10 deletions lib/lightning/collections.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ defmodule Lightning.Collections do
alias Lightning.Collections.Item
alias Lightning.Repo

@query_all_limit Application.compile_env!(:lightning, __MODULE__)[:query_all_limit]
@query_all_limit Application.compile_env!(:lightning, __MODULE__)[
:query_all_limit
]

@spec get_collection(String.t()) ::
{:ok, Collection.t()} | {:error, :collection_not_found}
Expand Down Expand Up @@ -45,15 +47,28 @@ defmodule Lightning.Collections do

@spec stream_all(Collection.t(), String.t() | nil, integer()) :: Enum.t()
def stream_all(%{id: collection_id}, cursor \\ nil, limit \\ @query_all_limit) do
Item
|> where([i], i.collection_id == ^collection_id)
|> limit(^limit)
|> then(fn query ->
case cursor do
nil -> query
cursor_key -> where(query, [i], i.key > ^cursor_key)
end
end)
collection_id
|> stream_query(cursor, limit)
|> Repo.stream()
end

@spec stream_match(Collection.t(), String.t(), String.t() | nil, integer()) ::
Enum.t()
def stream_match(
%{id: collection_id},
pattern,
cursor \\ nil,
limit \\ @query_all_limit
) do
pattern =
pattern
|> String.replace("\\", "\\\\")
|> String.replace("%", "\%")
|> String.replace("*", "%")

collection_id
|> stream_query(cursor, limit)
|> where([i], like(i.key, ^pattern))
|> Repo.stream()
end

Expand All @@ -75,4 +90,16 @@ defmodule Lightning.Collections do
item -> Repo.delete(item)
end
end

defp stream_query(collection_id, cursor, limit) do
Item
|> where([i], i.collection_id == ^collection_id)
|> limit(^limit)
|> then(fn query ->
case cursor do
nil -> query
cursor_key -> where(query, [i], i.key > ^cursor_key)
end
end)
end
end
165 changes: 161 additions & 4 deletions test/lightning/collections_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ defmodule Lightning.CollectionsTest do
describe "stream_all/3" do
test "returns all items for the given collection" do
collection = insert(:collection)
items = insert_list(11, :collection_item, collection: collection)
items = insert_list(12, :collection_item, collection: collection)

Repo.transaction(fn ->
assert stream = Collections.stream_all(collection)
assert stream_items = Stream.take(stream, 12)
assert stream_items = Stream.take(stream, 15)

assert MapSet.new(items) ==
stream_items
Expand All @@ -105,12 +105,21 @@ defmodule Lightning.CollectionsTest do
test "returns the items after a cursor up to a limited amount" do
collection = insert(:collection)

items = insert_list(30, :collection_item, collection: collection)
items =
1..30
|> Enum.map(fn _i ->
insert(:collection_item,
key: "rkey#{:rand.uniform()}",
collection: collection
)
end)
|> Enum.sort_by(& &1.key)

%{key: cursor} = Enum.at(items, 4)

Repo.transaction(fn ->
assert stream = Collections.stream_all(collection, cursor)
assert Enum.count(stream) == 30 - (4 + 1)
assert stream |> Enum.to_list() |> Enum.count() == 30 - (4 + 1)
end)

Repo.transaction(fn ->
Expand Down Expand Up @@ -151,6 +160,154 @@ defmodule Lightning.CollectionsTest do
end
end

describe "stream_match/3" do
test "returns item with exact match" do
collection = insert(:collection)
_itemA = insert(:collection_item, key: "keyA", collection: collection)
itemB = insert(:collection_item, key: "keyB", collection: collection)

Repo.transaction(fn ->
assert stream = Collections.stream_match(collection, "keyB*")

assert [itemB] ==
stream
|> Enum.to_list()
|> Repo.preload(collection: :project)
end)
end

test "returns matching items for the given collection" do
collection = insert(:collection)

items =
1..11
|> Enum.map(fn _i ->
insert(:collection_item,
key: "rkeyA#{:rand.uniform()}",
collection: collection
)
end)
|> Enum.sort_by(& &1.key)

for _i <- 1..5,
do:
insert(:collection_item,
key: "rkeyB#{:rand.uniform()}",
collection: collection
)

Repo.transaction(fn ->
assert stream = Collections.stream_match(collection, "rkeyA*")
assert stream_items = Stream.take(stream, 12)

assert MapSet.new(items) ==
stream_items
|> Enum.to_list()
|> Repo.preload(collection: :project)
|> MapSet.new()
end)
end

test "returns matching items after a cursor up to a limited amount" do
collection = insert(:collection)

items =
1..30
|> Enum.map(fn _i ->
insert(:collection_item,
key: "rkeyA#{:rand.uniform()}",
collection: collection
)
end)
|> Enum.sort_by(& &1.key)

%{key: cursor} = Enum.at(items, 9)

for _i <- 1..5,
do:
insert(:collection_item,
key: "rkeyB#{:rand.uniform()}",
collection: collection
)

Repo.transaction(fn ->
assert stream = Collections.stream_match(collection, "rkeyA*", cursor)
assert Enum.count(stream) == 30 - (9 + 1)
end)

Repo.transaction(fn ->
assert stream =
Collections.stream_match(collection, "rkeyA*", cursor, 16)

assert Enum.count(stream) == 16
end)
end

test "returns empty list when collection is empty" do
collection = insert(:collection)

Repo.transaction(fn ->
assert stream = Collections.stream_match(collection, "any-key")
assert Enum.count(stream) == 0
end)
end

test "returns empty list when the collection doesn't exist" do
insert(:collection_item, key: "existing_key")

Repo.transaction(fn ->
assert stream =
Collections.stream_match(
%{id: Ecto.UUID.generate()},
"existing_key"
)

assert Enum.count(stream) == 0
end)
end

test "returns item escaping the %" do
collection = insert(:collection)
item = insert(:collection_item, key: "keyA%", collection: collection)

Repo.transaction(fn ->
assert stream = Collections.stream_match(collection, "keyA%*")

assert [item] ==
stream
|> Enum.to_list()
|> Repo.preload(collection: :project)
end)
end

test "returns item escaping the \\" do
collection = insert(:collection)
item = insert(:collection_item, key: "keyA\\", collection: collection)

Repo.transaction(fn ->
assert stream = Collections.stream_match(collection, "keyA\\*")

assert [item] ==
stream
|> Enum.to_list()
|> Repo.preload(collection: :project)
end)
end

test "fails when outside of an explicit transaction" do
collection = insert(:collection)
_items = insert_list(5, :collection_item, collection: collection)

assert stream = Collections.stream_match(collection, "key*")

assert_raise RuntimeError,
~r/cannot reduce stream outside of transaction/,
fn ->
Enum.take(stream, 5) |> Enum.each(&IO.inspect/1)
end
end
end

describe "put/3" do
test "creates a new entry in the collection for the given collection" do
collection = insert(:collection)
Expand Down

0 comments on commit d160c17

Please sign in to comment.