Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Store preferable infohashes for queries #7786

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,12 @@ def notify_gui(request, processing_results):
if r.obj_state == ObjState.NEW_OBJECT
]
if self.composition.notifier:
self.composition.notifier[notifications.remote_query_results](
{"results": results, "uuid": str(request_uuid), "peer": hexlify(request.peer.mid)})
self.composition.notifier[notifications.remote_query_results]({
"query": kwargs.get("txt_filter"),
"results": results,
"uuid": str(request_uuid),
"peer": hexlify(request.peer.mid)
})

peers_to_query = self.get_random_peers(self.composition.max_query_peers)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
from typing import Generator

from pony.orm import db_session
from pytest import fixture

from tribler.core.components.database.db.layers.user_activity_layer import UserActivityLayer
from tribler.core.components.user_activity.types import InfoHash
from tribler.core.utilities.pony_utils import TrackedDatabase


@fixture(name="layer")
def fixture_activity_layer() -> Generator[UserActivityLayer, None, None]:
database = TrackedDatabase()
database.bind(provider="sqlite", filename=":memory:")
ual = UserActivityLayer(database)
database.generate_mapping(create_tables=True)
yield ual
database.disconnect()


def float_equals(a: float, b: float) -> bool:
return round(a, 5) == round(b, 5)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this file, you are using the same construction over and over again:

def test_example(layer: UserActivityLayer) -> None:
    layer.store("test query", InfoHash(b"\x00" * 20), set())

    with db_session():
        queries = layer.Query.select()[:]

        assert queries[0].query == "test query"
        assert len(queries[0].infohashes) == 1
        ...

This is basically a single test case that repeated in test_store_no_losers, test_store_with_loser, test_store_weighted_decay, test_store_delete_old, test_store_delete_old_over_e.

I would suggest that you extract it to a separate test. Then there will be no need to repeat this in other tests, and they will more accurately describe the specific test case they are testing, without any excess.

Like:

def test_store_query(layer: UserActivityLayer) -> None:
    layer.store("test query", InfoHash(b''), set())

    with db_session():
        test_query = layer.Query.get()

    assert test_query.query == "test query"


def test_store_no_losers(layer: UserActivityLayer) -> None:
"""
Test that queries can be stored and retrieved.
"""
layer.store("test query", InfoHash(b"\x00" * 20), set())

with db_session():
queries = layer.Query.select()[:]

assert len(queries) == 1
assert queries[0].query == "test query"
assert len(queries[0].infohashes) == 1
assert list(queries[0].infohashes)[0].infohash == b"\x00" * 20
assert float_equals(list(queries[0].infohashes)[0].preference, 1.0)
Comment on lines +31 to +38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT Same logic, but a bit more correct and easier to read.

The suggested version uses Query.get() instead of Query.select() because there should only be a single entity in the database, which you assert later. It also confines the with block to only include lines that actually need the db_session. The suggested version avoids multiple, yet identical, conversions to a list and retrieving the first element of the list.

Suggested change
with db_session():
queries = layer.Query.select()[:]
assert len(queries) == 1
assert queries[0].query == "test query"
assert len(queries[0].infohashes) == 1
assert list(queries[0].infohashes)[0].infohash == b"\x00" * 20
assert float_equals(list(queries[0].infohashes)[0].preference, 1.0)
with db_session():
test_query = layer.Query.get()
infohashes = list(test_query.infohashes)
assert test_query.query == "test query"
assert len(infohashes) == 1
infohash = infohashes.pop()
assert infohash.infohash == b"\x00" * 20
assert float_equals(infohash.preference, 1.0)



def test_store_with_loser(layer: UserActivityLayer) -> None:
"""
Test that queries with a loser can be stored and retrieved.
"""
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the text of the test, it is not clear why one infohash is called "winner" and another is called "loser". I'm not questioning the naming here (which I will do in the class UserActivityLayer).

Here, I suggest helping the reader by showing that the "loser" is just an infohash that passes as the third function parameter, and in the with statement, you retrieve this infohash.

Suggested change
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)})
layer.store("test query", infohash=InfoHash(b"\x00" * 20), losing_infohashes={InfoHash(b"\x01" * 20)})


with db_session():
queries = layer.Query.select()[:]
winner, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:]
loser, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:]

assert len(queries) == 1
assert queries[0].query == "test query"
assert float_equals(winner.preference, 1.0)
assert float_equals(loser.preference, 0.0)
Comment on lines +48 to +55
Copy link
Contributor

@drew2a drew2a Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: the version avoids unnecessary select querying and list copying:

Suggested change
queries = layer.Query.select()[:]
winner, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:]
loser, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:]
assert len(queries) == 1
assert queries[0].query == "test query"
assert float_equals(winner.preference, 1.0)
assert float_equals(loser.preference, 0.0)
test_query = layer.Query.get()
winner = layer.InfohashPreference.get(lambda x: x.infohash == b"\x00" * 20)
loser = layer.InfohashPreference.get(lambda x: x.infohash == b"\x01" * 20)
assert float_equals(winner.preference, 1.0)
assert float_equals(loser.preference, 0.0)
assert test_query.query == "test query"



def test_store_with_losers(layer: UserActivityLayer) -> None:
"""
Test that queries with multiple losers can be stored and retrieved.
"""
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20),
InfoHash(b"\x02" * 20),
InfoHash(b"\x03" * 20)})
Comment on lines +62 to +64
Copy link
Contributor

@drew2a drew2a Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: It is easier to read the test function if you specify the argument names:

Suggested change
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20),
InfoHash(b"\x02" * 20),
InfoHash(b"\x03" * 20)})
layer.store("test query", infohash=InfoHash(b"\x00" * 20), losing_infohashes={InfoHash(b"\x01" * 20),
InfoHash(b"\x02" * 20),
InfoHash(b"\x03" * 20)})


with db_session():
queries = layer.Query.select()[:]
winner, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:]
loser_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:]
loser_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:]
loser_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:]

assert len(queries) == 1
assert queries[0].query == "test query"
assert float_equals(winner.preference, 1.0)
assert float_equals(loser_1.preference, 0.0)
assert float_equals(loser_2.preference, 0.0)
assert float_equals(loser_3.preference, 0.0)
Comment on lines +67 to +78
Copy link
Contributor

@drew2a drew2a Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: More compact yet correct version of the same logic. It uses select where it is intended to query a list and get when it is supposed to be just a single entity.

Suggested change
queries = layer.Query.select()[:]
winner, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:]
loser_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:]
loser_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:]
loser_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:]
assert len(queries) == 1
assert queries[0].query == "test query"
assert float_equals(winner.preference, 1.0)
assert float_equals(loser_1.preference, 0.0)
assert float_equals(loser_2.preference, 0.0)
assert float_equals(loser_3.preference, 0.0)
test_query = layer.Query.get()
winner = layer.InfohashPreference.get(lambda x: x.infohash == b"\x00" * 20)
losers = list(layer.InfohashPreference.select(lambda x: x.infohash != b"\x00" * 20))
assert test_query.query == "test query"
assert float_equals(winner.preference, 1.0)
assert len(losers) == 3
assert all(float_equals(ls.preference, 0.0) for ls in losers)



def test_store_weighted_decay(layer: UserActivityLayer) -> None:
"""
Test result decay after updating.
"""
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20),
InfoHash(b"\x02" * 20),
InfoHash(b"\x03" * 20)})
layer.store("test query", InfoHash(b"\x01" * 20), {InfoHash(b"\x00" * 20),
InfoHash(b"\x02" * 20),
InfoHash(b"\x03" * 20)})

with db_session():
queries = layer.Query.select()[:]
entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:]
entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:]
entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:]
entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:]

assert len(queries) == 1
assert queries[0].query == "test query"
assert float_equals(entry_1.preference, 0.2)
assert float_equals(entry_2.preference, 0.8)
assert float_equals(entry_3.preference, 0.0)
assert float_equals(entry_4.preference, 0.0)
Comment on lines +93 to +104
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: the version avoids unnecessary select querying and list copying:

Suggested change
queries = layer.Query.select()[:]
entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:]
entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:]
entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:]
entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:]
assert len(queries) == 1
assert queries[0].query == "test query"
assert float_equals(entry_1.preference, 0.2)
assert float_equals(entry_2.preference, 0.8)
assert float_equals(entry_3.preference, 0.0)
assert float_equals(entry_4.preference, 0.0)
test_query = layer.Query.get()
entry_1 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x00" * 20)
entry_2 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x01" * 20)
entry_3 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x02" * 20)
entry_4 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x03" * 20)
assert test_query.query == "test query"
assert float_equals(entry_1.preference, 0.2)
assert float_equals(entry_2.preference, 0.8)
assert float_equals(entry_3.preference, 0.0)
assert float_equals(entry_4.preference, 0.0)



def test_store_delete_old(layer: UserActivityLayer) -> None:
"""
Test result decay after updating.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description of the test likely suffers from missing specifications about the difference between it and the previous test.

"""
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20),
InfoHash(b"\x02" * 20),
InfoHash(b"\x03" * 20)})
layer.store("test query", InfoHash(b"\x04" * 20), {InfoHash(b"\x00" * 20),
InfoHash(b"\x01" * 20),
InfoHash(b"\x02" * 20)})

with db_session():
queries = layer.Query.select()[:]
entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:]
entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:]
entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:]
should_be_dropped = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:]
entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x04" * 20)[:]

assert len(queries) == 1
assert queries[0].query == "test query"
assert float_equals(entry_1.preference, 0.2)
assert float_equals(entry_2.preference, 0.0)
assert float_equals(entry_3.preference, 0.0)
assert should_be_dropped == []
assert float_equals(entry_4.preference, 0.8)
Comment on lines +119 to +132
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: the version avoids unnecessary select querying and list copying:

Suggested change
queries = layer.Query.select()[:]
entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:]
entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:]
entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:]
should_be_dropped = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:]
entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x04" * 20)[:]
assert len(queries) == 1
assert queries[0].query == "test query"
assert float_equals(entry_1.preference, 0.2)
assert float_equals(entry_2.preference, 0.0)
assert float_equals(entry_3.preference, 0.0)
assert should_be_dropped == []
assert float_equals(entry_4.preference, 0.8)
test_query = layer.Query.get()
entry_1 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x00" * 20)
entry_2 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x01" * 20)
entry_3 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x02" * 20)
entry_4 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x04" * 20)
should_be_dropped = layer.InfohashPreference.get(lambda x: x.infohash == b"\x03" * 20)
assert test_query.query == "test query"
assert float_equals(entry_1.preference, 0.2)
assert float_equals(entry_2.preference, 0.0)
assert float_equals(entry_3.preference, 0.0)
assert float_equals(entry_4.preference, 0.8)
assert not should_be_dropped



def test_store_delete_old_over_e(layer: UserActivityLayer) -> None:
"""
Test if entries are not deleted if their preference is still over the threshold e.
"""
layer.e = 0.0
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20),
InfoHash(b"\x02" * 20),
InfoHash(b"\x03" * 20)})
layer.store("test query", InfoHash(b"\x04" * 20), {InfoHash(b"\x00" * 20),
InfoHash(b"\x01" * 20),
InfoHash(b"\x02" * 20)})

with db_session():
queries = layer.Query.select()[:]
entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:]
entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:]
entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:]
entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:]
entry_5, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x04" * 20)[:]

assert len(queries) == 1
assert queries[0].query == "test query"
assert float_equals(entry_1.preference, 0.2)
assert float_equals(entry_2.preference, 0.0)
assert float_equals(entry_3.preference, 0.0)
assert float_equals(entry_4.preference, 0.0)
assert float_equals(entry_5.preference, 0.8)
Comment on lines +148 to +161
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: the version avoids unnecessary select querying and list copying:

Suggested change
queries = layer.Query.select()[:]
entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:]
entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:]
entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:]
entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:]
entry_5, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x04" * 20)[:]
assert len(queries) == 1
assert queries[0].query == "test query"
assert float_equals(entry_1.preference, 0.2)
assert float_equals(entry_2.preference, 0.0)
assert float_equals(entry_3.preference, 0.0)
assert float_equals(entry_4.preference, 0.0)
assert float_equals(entry_5.preference, 0.8)
test_query = layer.Query.get()
entry_1 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x00" * 20)
entry_2 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x01" * 20)
entry_3 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x02" * 20)
entry_4 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x03" * 20)
entry_5 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x04" * 20)
assert test_query.query == "test query"
assert float_equals(entry_1.preference, 0.2)
assert float_equals(entry_2.preference, 0.0)
assert float_equals(entry_3.preference, 0.0)
assert float_equals(entry_4.preference, 0.0)
assert float_equals(entry_5.preference, 0.8)



def test_get_preferable(layer: UserActivityLayer) -> None:
"""
Test if a preferable infohash is correctly retrieved.
"""
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)})

assert layer.get_preferable(b"\x00" * 20) == b"\x00" * 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a type mismatch: "Expected type 'InfoHash', got 'bytes' instead"



def test_get_preferable_already_best(layer: UserActivityLayer) -> None:
"""
Test if a infohash returns itself when it is preferable.
"""
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)})

assert layer.get_preferable(b"\x01" * 20) == b"\x00" * 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a type mismatch: "Expected type 'InfoHash', got 'bytes' instead"



def test_get_preferable_unknown(layer: UserActivityLayer) -> None:
"""
Test if a infohash returns itself when it has no known preferable infohashes.
"""
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)})

assert layer.get_preferable(b"\x02" * 20) == b"\x02" * 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a type mismatch: "Expected type 'InfoHash', got 'bytes' instead"



def test_get_random(layer: UserActivityLayer) -> None:
"""
Test if the preferred infohash always gets returned from a random checked selection.
"""
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), InfoHash(b"\x02" * 20)})
layer.store("test query", InfoHash(b"\x01" * 20), {InfoHash(b"\x00" * 20), InfoHash(b"\x02" * 20)})

random_selection = layer.get_preferable_to_random(limit=1)

assert len(random_selection) == 1
assert list(random_selection)[0] == b"\x01" * 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an unnecessary conversion to a list and retrieval of the first item:

Suggested change
assert list(random_selection)[0] == b"\x01" * 20
assert random_selection.pop() == b"\x01" * 20

128 changes: 128 additions & 0 deletions src/tribler/core/components/database/db/layers/user_activity_layer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from __future__ import annotations

import random
import typing
from dataclasses import dataclass

from pony import orm
from pony.orm import db_session

from tribler.core.components.user_activity.types import InfoHash
from tribler.core.utilities.pony_utils import TrackedDatabase

if typing.TYPE_CHECKING:
@dataclass
class InfohashPreference:
infohash: bytes
preference: float
parent_query: Query

Check warning on line 18 in src/tribler/core/components/database/db/layers/user_activity_layer.py

View check run for this annotation

Codecov / codecov/patch

src/tribler/core/components/database/db/layers/user_activity_layer.py#L14-L18

Added lines #L14 - L18 were not covered by tests

@dataclass
class Query:
query: str
infohashes: typing.Set[InfohashPreference]

Check warning on line 23 in src/tribler/core/components/database/db/layers/user_activity_layer.py

View check run for this annotation

Codecov / codecov/patch

src/tribler/core/components/database/db/layers/user_activity_layer.py#L20-L23

Added lines #L20 - L23 were not covered by tests
Comment on lines +13 to +23
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These structures are used solely in the _select_superior method, and there is no direct transformation into this datatype in the calling code, as they merely replicate the existing structures described in UserActivityLayer. Adopting this approach of duplicating definitions necessitates updating the structures twice (once for the original and again for the duplicate), which increases the risk of errors during future updates. The developer responsible for this task should:

  1. Be aware that there are two definitions that require changes.
  2. Make changes twice, which is more error-prone than making a change once.

Additionally, for a developer who is not the owner of the code, this duplication might not be apparent (as we generally aim for code to be less, rather than more, duplicative), and it may not be obvious that it is necessary to locate another data structure.

My suggestion is to avoid duplication by refactoring the existing code. There are several methods to achieve the same class behavior but without duplication.



class UserActivityLayer:

def __init__(self, database: TrackedDatabase, update_weight: float = 0.8, e: float = 0.01) -> None:
"""
Create a new User Activity scheme for a particular database.

:param database: The database to bind to.
:param update_weight: The weight of new updates.
:param e: A small value to decide near-zero preference.
"""
self.database = database

self.e = e
self.update_weight_new = update_weight
self.update_weight_old = 1 - self.update_weight_new

class Query(database.Entity):
query = orm.PrimaryKey(str)
infohashes = orm.Set("InfohashPreference")

class InfohashPreference(database.Entity):
infohash = orm.Required(bytes)
preference = orm.Required(float)
parent_query = orm.Required(Query)
orm.PrimaryKey(infohash, parent_query)
Comment on lines +42 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The database does not appear to be normalized. As @kozlovsky is coming back from vacation at the same time as you, I summon him to review the database structure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, the database structure looks normalized. It is possible to link InfohashPreference with the Resource entity, but it is actually not necessary and complicates the database schema a bit.

But I'd like to use integer fields instead of floats, like:

        class Query(database.Entity):
            query = orm.PrimaryKey(str)
            searched_counter = orm.Required(int, default=1)
            infohashes = orm.Set("InfohashPreference")

        class InfohashPreference(database.Entity):
            infohash = orm.Required(bytes)
            parent_query = orm.Required(Query)
            chosen_counter = orm.Required(int, default=0)
            ignored_counter = orm.Required(int, default=0)
            orm.PrimaryKey(infohash, parent_query)

This way, changing the formula on how preference is calculated becomes possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some additional thought, I agree with @drew2a that it may be better to link the InfohashPreference entity with the Resource entity in the same way as the Tracker entity of the HealthDataAccessLayer is linked with the Resource entity via the Tracker.torrents / Resource.trackers relationships.

It has the following benefits:

  1. The primary key of InfohashPreference becomes shorter, as it now will use resource id instead of infohash bytes.
  2. With the current approach of TriblerDatabase, the torrent metadata is just a kind of Resource, and having Resource directly linked with InfohashPreference can simplify some future queries.

The drawback is when we want to search InfohashPreference knowing the specific info hash, we first need to find the Resource and then use it instead of the info hash value.

With this change, the code will be like:

class UserActivityLayer:

    def __init__(self, knowledge_layer: KnowledgeDataAccessLayer) -> None:
        self.database = knowledge_layer.instance
        self.Resource = knowledge_layer.Resource

        class Query(self.database.Entity):
            query = orm.PrimaryKey(str)
            searched_counter = orm.Required(int, default=1)
            infohashes = orm.Set("InfohashPreference")

        class InfohashPreference(self.database.Entity):
            torrent = orm.Required(self.Resource)
            query = orm.Required(Query)
            chosen_counter = orm.Required(int, default=0)
            ignored_counter = orm.Required(int, default=0)
            orm.PrimaryKey(torrent, query)

        self.Query = Query
        self.UserPreference = InfohashPreference

(In this example, I renamed parent_query to query, as parent_ prefix looks unnecessary)

And, in the Resource entity of the KnowledgeDataAccessLayer we will have:

infohash_preferences = orm.Set(lambda: db.InfohashPreference, reverse="torrent")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how you could implement decay of previous search and results for the same (infohash, query) with this database format. Is it still possible? Because that is a requirement.

Copy link
Contributor

@kozlovsky kozlovsky Jan 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, it is not enough to implement the proper decay; I missed that requirement. But it leads me to some additional thoughts.

The current scheme implemented in this PR is single-user. I don't think decay is important when the statistics are accumulated only for a single user. But if we aggregate anonymized query-result-preference statistics from thousands of users, the decay indeed makes sense.

But then we have a new question on spreading and accumulating these statistics. It probably should be signed by a second key when gossiping to prevent spam. However, we cannot sign the dynamically decaying preference value of the float type. We can sign some discrete information that at the moment T, an anonymous user U performed the query Q and clicked on the infohash H.

So, if the goal is to aggregate decaying anonymous user-clicks-at-query-results statistics across multiple users, the discrete signed piece of information should probably be (T, U, Q, H). Then, the decay can be implemented by taking the time stamps into account - the weight of the user's "vote" can be inversely proportional to the vote's age.

In that case, the entity attributes might be something like:

class InfohashPreference(self.database.Entity):
    user = orm.Required(User)  # some new entity
    query = orm.Required(Query)
    torrent = orm.Required(self.Resource)
    last_clicked_at = orm.Required(datetime)
    signature = orm.Optional(bytes)  # for remotely received gossips
    # for the next field see https://github.com/Tribler/tribler/pull/7786#discussion_r1439578570
    successfully_downloaded = orm.Required(bool, default=False)  # to ignore local unfinished downloads
    orm.PrimaryKey(user, query, torrent)

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is close to what I had in mind for the long term, in a different PR. I would prefer we discuss the grand design in the linked issue, not on this PR.

Just to touch on it, in short, the plan for now is to use emergent behavior, as follows:

  1. Torrents that are "preferred" have their health checked more frequently locally (this PR).
  2. Torrents that have their health checked recently (locally) are more frequently gossiped in the popularity community (already exists).
  3. Torrents that are gossiped more by others have a higher chance of appearing in search results remotely (already exists).
  4. Effect: search results that are gossiped to more users are more likely to be downloaded. For actually popular content, that is downloaded, this forms a feedback loop: back to step 2.

In summary, this PR creates a soft bias and, therefore, an emergent effect that boosts the popularity of content that is searched for and downloaded.

Establishing shadow identities and more aggressively gossiping - while preventing spam - is something I'll leave for a follow-up PR. Ideally, we don't need to gossip preference directly and we can somehow merge gossiped ML models. However, this should only be implemented after careful experimentation. For now, this PR gives each user a local ranking to start the ML experimentation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation; now I understand your approach better. Initially, I was misguided by the picture in #7632 with a "store signed" label, as my understanding was that it is only possible to sign discrete facts, not float values. If gossiping about individual provable facts is not the intention, then storing calculated preferences is probably fine.

Still, you can consider using torrent = orm.Required(self.Resource) instead of infohash = orm.Required(bytes) in the InfohashPreference entity to reduce the data size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thank you for the suggestion. Once the initial prototype has been merged, we can look at refactoring and optimizations and I will definitely keep your suggestion in mind. That said, once this has been merged, this PR is also no longer my sole responsibility and others can also contribute their excellent suggestions to the communal code. We can grow the code over time.

I do realize, now, that I left enabled = True as the default setting. I'll make sure to keep this disabled by default so we still have the freedom to change things like the database format in future PRs.


self.Query = Query
self.InfohashPreference = InfohashPreference

def store(self, query: str, infohash: InfoHash, losing_infohashes: typing.Set[InfoHash]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a point for discussion regarding this function interface. It is the naming.

You're using a "win-lose" representation which I find misleading, as it suggests a game-like process of identifying winners and losers. However, according to your function description, it's not about winning and losing, but rather about determining which infohashes were used (downloaded) and which weren't used (not downloaded).

I suggest reconsidering the naming to choose a more appropriate representation.

"""
Store a query, its selected infohash, and the infohashes that were not downloaded.

:param query: The text that the user searched for.
:param infohash: The infohash that the user downloaded.
:param losing_infohashes: The infohashes that the user saw but ignored.
"""
# Convert "win" or "loss" to "1.0" or "0.0".
weights = {ih: 0.0 for ih in losing_infohashes}
weights[infohash] = 1.0

# Update or create a new database entry
with db_session:
existing = self.Query.get(query=query)
if existing is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"flat is better than nested" regarding to the Zen of Python. To decrease nesting in your code you can simply use get_or_create function from pony_utils:

        with db_session:
            existing = get_or_create(self.Query, query=query)
            for old_infohash_preference in existing.infohashes:
                ...

            if existing.infohashes and infohash in weights:
                weights[infohash] = self.update_weight_new

Next nesting level could be removed by using this trick:

with db_session:
    existing = get_or_create(self.Query, query=query)
    known_infohashes = (i for i in existing.infohashes if i.infohash in weights)
    unknown_infohashes = (i for i in existing.infohashes if i.infohash not in weights)

    for old_infohash_preference in unknown_infohashes:
        ...

    for old_infohash_preference in known_infohashes:
        ...

Also, "readability counts" and "sparse is better than dense." Therefore, two-line formulas could be rewritten as follows:

            for infohash_preference in known_infohashes:
                weight = weights.pop(infohash_preference.infohash)
                new_weight = infohash_preference.preference * self.update_weight_old + weight * self.update_weight_new
                infohash_preference.preference = new_weight

Therefore, we can significantly simplify the code while retaining the same logic.

I'll add the assembled example with all improvements as a code suggestion.

for old_infohash_preference in existing.infohashes:
if old_infohash_preference.infohash in weights:
new_weight = (old_infohash_preference.preference * self.update_weight_old
+ weights.pop(old_infohash_preference.infohash, 0.0) * self.update_weight_new)
old_infohash_preference.preference = new_weight
else:
# This infohash did not pop up, candidate for deletion
new_weight = old_infohash_preference.preference * self.update_weight_old
if new_weight < self.e:
old_infohash_preference.delete()
else:
old_infohash_preference.preference = new_weight
if infohash in weights:
weights[infohash] = self.update_weight_new
else:
existing = self.Query(query=query, infohashes=set())

for new_infohash, weight in weights.items():
existing.infohashes.add(self.InfohashPreference(infohash=new_infohash, preference=weight,
parent_query=existing))
Comment on lines +68 to +90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified code block:

Suggested change
with db_session:
existing = self.Query.get(query=query)
if existing is not None:
for old_infohash_preference in existing.infohashes:
if old_infohash_preference.infohash in weights:
new_weight = (old_infohash_preference.preference * self.update_weight_old
+ weights.pop(old_infohash_preference.infohash, 0.0) * self.update_weight_new)
old_infohash_preference.preference = new_weight
else:
# This infohash did not pop up, candidate for deletion
new_weight = old_infohash_preference.preference * self.update_weight_old
if new_weight < self.e:
old_infohash_preference.delete()
else:
old_infohash_preference.preference = new_weight
if infohash in weights:
weights[infohash] = self.update_weight_new
else:
existing = self.Query(query=query, infohashes=set())
for new_infohash, weight in weights.items():
existing.infohashes.add(self.InfohashPreference(infohash=new_infohash, preference=weight,
parent_query=existing))
with db_session:
existing = get_or_create(self.Query, query=query)
related_infohashes = (i for i in existing.infohashes if i.infohash in weights)
unrelated_infohashes = (i for i in existing.infohashes if i.infohash not in weights)
for infohash_preference in unrelated_infohashes:
# This infohash did not pop up, candidate for deletion
new_weight = infohash_preference.preference * self.update_weight_old
if new_weight < self.e:
infohash_preference.delete()
else:
infohash_preference.preference = new_weight
for infohash_preference in related_infohashes:
weight = weights.pop(infohash_preference.infohash)
new_weight = infohash_preference.preference * self.update_weight_old + weight * self.update_weight_new
infohash_preference.preference = new_weight
if existing.infohashes and infohash in weights:
weights[infohash] = self.update_weight_new
for new_infohash, weight in weights.items():
existing.infohashes.add(self.InfohashPreference(infohash=new_infohash, preference=weight,
parent_query=existing))


@db_session
def _select_superior(self, infohash_preference: InfohashPreference) -> InfoHash:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: From a class interface perspective, it is better to place private methods (with _underscore) below the public methods, as public methods constitute the public API of the class and should be more readily accessible than the private ones.

"""
For a given InfohashPreference, get the preferable infohash from the parent query.
"""
all_hashes_for_query = list(infohash_preference.parent_query.infohashes)
all_hashes_for_query.sort(key=lambda x: x.preference, reverse=True)
Comment on lines +97 to +98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Perhaps it is more efficient to sort the infohashes on the SQL side instead of querying them and then sorting them on the Python side.

return typing.cast(InfoHash, all_hashes_for_query[0].infohash)

def get_preferable(self, infohash: InfoHash) -> InfoHash:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that this method is implemented correctly. There are no tests for this method with multiple queries but the same infohash, making it hard to predict how this method will be used. For instance, this test shows that each new query will override preferences (is this the intended behavior?):

def test_get_preferable_override(layer: UserActivityLayer) -> None:
    layer.store(
        query="ubuntu",
        infohash=InfoHash(b"\x00" * 20),
        losing_infohashes={InfoHash(b"\x01" * 20), InfoHash(b"\x02" * 20)}
    )

    layer.store(
        query="book ubuntu",
        infohash=InfoHash(b"\x03" * 20),
        losing_infohashes={InfoHash(b"\x01" * 20), InfoHash(b"\x02" * 20)})

    assert layer.get_preferable(b"\x01" * 20) == b"\x00" * 20  # AssertionError

"""
Given an infohash, see if we know of more preferable infohashes.

:param infohash: The infohash to find better alternatives for.
"""
with db_session:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've noticed some inconsistencies in the way you use db_session. For some methods, you use decorators, and for others, you use a context manager. For instance, in this particular method, the db_session is used for the entire method, so you could use the decorator as you have done previously.

existing = self.InfohashPreference.select(infohash=infohash)[:]

if not existing:
return infohash

return self._select_superior(random.SystemRandom().choice(existing))

def get_preferable_to_random(self, limit: int = 1) -> set[InfoHash]:
"""
Retrieve (a) random infohash(es) and then return the preferred infohash for each infohash.

This method selects up to the limit of random infohashes and then outputs the set of preferable infohashes.
This means that you may start with ``limit`` number of infohashes and worst-case, if they all share the same,
preferable infohash, end up with only one infohash as the output.

:param limit: The number of infohashes to randomly get the preferred infohash for (the output set may be less).
:returns: A set of infohashes of size 0 up to ``limit``.
"""
with db_session:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've noticed some inconsistencies in the way you use db_session. For some methods, you use decorators, and for others, you use a context manager. For instance, in this particular method, the db_session is used for the entire method, so you could use the decorator as you have done previously.

random_selection = self.InfohashPreference.select_random(limit=limit)
return {self._select_superior(ih) for ih in random_selection}
3 changes: 3 additions & 0 deletions src/tribler/core/components/database/db/tribler_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from tribler.core.components.database.db.layers.health_data_access_layer import HealthDataAccessLayer
from tribler.core.components.database.db.layers.knowledge_data_access_layer import KnowledgeDataAccessLayer
from tribler.core.components.database.db.layers.user_activity_layer import UserActivityLayer
from tribler.core.utilities.pony_utils import TrackedDatabase, db_session, get_or_create

MEMORY = ':memory:'
Expand All @@ -31,6 +32,8 @@ def __init__(self, filename: Optional[str] = None, *, create_tables: bool = True
self.TorrentHealth = self.health.TorrentHealth
self.Tracker = self.health.Tracker

self.user_activity_layer = UserActivityLayer(self.instance)

filename = filename or MEMORY
db_does_not_exist = filename == MEMORY or not os.path.isfile(filename)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from ipv8.REST.base_endpoint import HTTP_BAD_REQUEST
from ipv8.REST.schema import schema

from tribler.core import notifications
from tribler.core.components.database.category_filter.family_filter import default_xxx_filter
from tribler.core.components.database.db.layers.knowledge_data_access_layer import ResourceType
from tribler.core.components.database.db.serialization import REGULAR_TORRENT, SNIPPET
Expand Down Expand Up @@ -328,6 +330,10 @@ def search_db():
f'Main query executed in {t2 - t1:.6} seconds;\n'
f'Result constructed in {t3 - t2:.6} seconds.')

self.download_manager.notifier[notifications.local_query_results]({
"query": request.query.get("txt_filter"),
"results": list(pony_query)
})
return search_results, total, max_rowid

try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ def on_torrent_finished_alert(self, alert: lt.torrent_finished_alert):
self.update_lt_status(self.handle.status())
self.checkpoint()
downloaded = self.get_state().get_total_transferred(DOWNLOAD)
if downloaded > 0 and self.stream is not None and self.notifier is not None:
if downloaded > 0 and self.notifier is not None:
name = self.tdef.get_name_as_unicode()
infohash = self.tdef.get_infohash().hex()
self.notifier[notifications.torrent_finished](infohash=infohash, name=name, hidden=self.hidden)
Expand Down
Empty file.
7 changes: 7 additions & 0 deletions src/tribler/core/components/user_activity/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from tribler.core.config.tribler_config_section import TriblerConfigSection


class UserActivitySettings(TriblerConfigSection):
enabled: bool = False
max_query_history: int = 500
health_check_interval: float = 5.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: If it were called health_check_interval_sec, it would be easier for the end user to understand which units are used here (seconds, milliseconds, microseconds).

Empty file.
Loading
Loading