Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat_: use content-topic override for all community filters #5993

Draft
wants to merge 3 commits into
base: feat/comm-content-topic-poc
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions eth-node/types/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ import (

// NewMessage represents a new whisper message that is posted through the RPC.
type NewMessage struct {
SymKeyID string `json:"symKeyID"`
PublicKey []byte `json:"pubKey"`
SigID string `json:"sig"`
TTL uint32 `json:"ttl"`
PubsubTopic string `json:"pubsubTopic"`
Topic TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PowTime uint32 `json:"powTime"`
PowTarget float64 `json:"powTarget"`
TargetPeer string `json:"targetPeer"`
Ephemeral bool `json:"ephemeral"`
Priority *int `json:"priority"`
SymKeyID string `json:"symKeyID"`
PublicKey []byte `json:"pubKey"`
SigID string `json:"sig"`
TTL uint32 `json:"ttl"`
PubsubTopic string `json:"pubsubTopic"`
Topic TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PowTime uint32 `json:"powTime"`
PowTarget float64 `json:"powTarget"`
TargetPeer string `json:"targetPeer"`
Ephemeral bool `json:"ephemeral"`
Priority *int `json:"priority"`
ContentTopicOverride string `json:"contentTopicOverride"`
}

// Message is the RPC representation of a whisper message.
Expand Down
12 changes: 7 additions & 5 deletions protocol/common/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,11 +660,12 @@ func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMes
}

newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic,
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic,
ContentTopicOverride: rawMessage.ContentTopicOverride,
}

if rawMessage.BeforeDispatch != nil {
Expand Down Expand Up @@ -765,6 +766,7 @@ func (s *MessageSender) SendPublic(
newMessage.Ephemeral = rawMessage.Ephemeral
newMessage.PubsubTopic = rawMessage.PubsubTopic
newMessage.Priority = rawMessage.Priority
newMessage.ContentTopicOverride = rawMessage.ContentTopicOverride

messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)

Expand Down
1 change: 1 addition & 0 deletions protocol/common/raw_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,5 @@ type RawMessage struct {
ResendType ResendType
ResendMethod ResendMethod
Priority *MessagePriority
ContentTopicOverride string
}
1 change: 1 addition & 0 deletions protocol/communities/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4189,6 +4189,7 @@ func (m *Manager) GetOwnedCommunitiesChatIDs() (map[string]bool, error) {
for _, id := range c.ChatIDs() {
chatIDs[id] = true
}
chatIDs[c.MemberUpdateChannelID()] = true // TODO: for now including this chatID as controlled so that archiving works without any issues.
}
}
return chatIDs, nil
Expand Down
4 changes: 4 additions & 0 deletions protocol/communities/manager_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ func (m *ArchiveManager) StartHistoryArchiveTasksInterval(community *Community,
m.logger.Error("failed to get community chat topics ", zap.Error(err))
continue
}
// adding the content-topic used for member updates.
// since member updates would not be too frequent i.e only addition/deletion would add a new message,
// this shouldn't cause too much increase in size of archive generated.
topics = append(topics, m.transport.FilterByChatID(community.MemberUpdateChannelID()).ContentTopic)

ts := time.Now().Unix()
to := time.Unix(ts, 0)
Expand Down
2 changes: 1 addition & 1 deletion protocol/communities/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (s *ManagerSuite) TestGetControlledCommunitiesChatIDs() {
controlledChatIDs, err := s.manager.GetOwnedCommunitiesChatIDs()

s.Require().NoError(err)
s.Require().Len(controlledChatIDs, 1)
s.Require().Len(controlledChatIDs, 2)
}

func (s *ManagerSuite) TestStartAndStopTorrentClient() {
Expand Down
1 change: 1 addition & 0 deletions protocol/communities_key_distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (ckd *CommunitiesKeyDistributorImpl) sendKeyExchangeMessage(community *comm
MessageType: protobuf.ApplicationMetadataMessage_CHAT_MESSAGE,
HashRatchetGroupID: hashRatchetGroupID,
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic
//ContentTopicOverride: community.MemberUpdateChannelID(), //TODO: Confirm if this is correct, could not figure out where LocalChatID is set in this flow
}
_, err := ckd.sender.SendCommunityMessage(context.Background(), &rawMessage)

Expand Down
2 changes: 1 addition & 1 deletion protocol/communities_messenger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3630,7 +3630,7 @@ func (s *MessengerCommunitiesSuite) TestHandleImport() {
message.Sig = crypto.FromECDSAPub(&s.owner.identity.PublicKey)
message.Payload = wrappedPayload

filter := s.alice.transport.FilterByChatID(chat.ID)
filter := s.alice.transport.FilterByChatID(community.MemberUpdateChannelID())
importedMessages := make(map[transport.Filter][]*types.Message, 0)

importedMessages[*filter] = append(importedMessages[*filter], message)
Expand Down
3 changes: 2 additions & 1 deletion protocol/communities_messenger_token_permissions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2152,7 +2152,8 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe
startDate := messageDate.Add(-time.Minute)
endDate := messageDate.Add(time.Minute)
topic := types.BytesToTopic(transport.ToTopic(chat.ID))
topics := []types.TopicType{topic}
communityCommonTopic := types.BytesToTopic(transport.ToTopic(community.MemberUpdateChannelID()))
topics := []types.TopicType{topic, communityCommonTopic}

torrentConfig := params.TorrentConfig{
Enabled: true,
Expand Down
3 changes: 2 additions & 1 deletion protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2268,7 +2268,8 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe
)
return rawMessage, fmt.Errorf("can't post message type '%d' on chat '%s'", rawMessage.MessageType, chat.ID)
}

//setting content-topic over-ride for community messages to use memberUpdatesChannelID
rawMessage.ContentTopicOverride = community.MemberUpdateChannelID()
logger.Debug("sending community chat message", zap.String("chatName", chat.Name))
isCommunityEncrypted, err := m.communitiesManager.IsEncrypted(chat.CommunityID)
if err != nil {
Expand Down
28 changes: 19 additions & 9 deletions protocol/messenger_communities.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ func (m *Messenger) initCommunityChats(community *communities.Community) ([]*Cha
chats := CreateCommunityChats(community, m.getTimesource())

for _, chat := range chats {
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic()})
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic(), ContentTopicOverrideID: community.MemberUpdateChannelID()})

}

Expand Down Expand Up @@ -2402,7 +2402,7 @@ func (m *Messenger) CreateCommunityChat(communityID types.HexBytes, c *protobuf.
for chatID, chat := range changes.ChatsAdded {
c := CreateCommunityChat(changes.Community.IDString(), chatID, chat, m.getTimesource())
chats = append(chats, c)
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: c.ID, PubsubTopic: changes.Community.PubsubTopic()})
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: c.ID, PubsubTopic: changes.Community.PubsubTopic(), ContentTopicOverrideID: changes.Community.MemberUpdateChannelID()})

response.AddChat(c)
}
Expand Down Expand Up @@ -2444,7 +2444,7 @@ func (m *Messenger) EditCommunityChat(communityID types.HexBytes, chatID string,
for chatID, change := range changes.ChatsModified {
c := CreateCommunityChat(community.IDString(), chatID, change.ChatModified, m.getTimesource())
chats = append(chats, c)
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: c.ID, PubsubTopic: community.PubsubTopic()})
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: c.ID, PubsubTopic: community.PubsubTopic(), ContentTopicOverrideID: community.MemberUpdateChannelID()})
response.AddChat(c)
}

Expand Down Expand Up @@ -2498,8 +2498,8 @@ func (m *Messenger) DefaultFilters(o *communities.Community) []transport.Filters

filters := []transport.FiltersToInitialize{
{ChatID: cID, PubsubTopic: communityPubsubTopic},
{ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: mlChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic, ContentTopicOverrideID: memberUpdateChannelID},
{ChatID: mlChannelID, PubsubTopic: communityPubsubTopic, ContentTopicOverrideID: memberUpdateChannelID},
{ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: uncompressedPubKey, PubsubTopic: shard.DefaultNonProtectedPubsubTopic()},
}
Expand Down Expand Up @@ -2713,14 +2713,19 @@ func (m *Messenger) UpdateCommunityFilters(community *communities.Community) err
publicFiltersToInit := make([]transport.FiltersToInitialize, 0, len(defaultFilters)+len(community.Chats()))

publicFiltersToInit = append(publicFiltersToInit, defaultFilters...)

for _, filter := range publicFiltersToInit {
_, err := m.transport.RemoveFilterByChatID(filter.ChatID)
if err != nil {
return err
}
}
for chatID := range community.Chats() {
communityChatID := community.IDString() + chatID
_, err := m.transport.RemoveFilterByChatID(communityChatID)
if err != nil {
return err
}
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: communityChatID, PubsubTopic: community.PubsubTopic()})
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: communityChatID, PubsubTopic: community.PubsubTopic(), ContentTopicOverrideID: community.MemberUpdateChannelID()})
}

_, err := m.transport.InitPublicFilters(publicFiltersToInit)
Expand Down Expand Up @@ -3414,8 +3419,9 @@ func (m *Messenger) handleCommunityResponse(state *ReceivedMessageState, communi

state.Response.AddChat(chat)
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{
ChatID: chat.ID,
PubsubTopic: community.PubsubTopic(),
ChatID: chat.ID,
PubsubTopic: community.PubsubTopic(),
ContentTopicOverrideID: community.MemberUpdateChannelID(),
})
// Update name, currently is the only field is mutable
} else if oldChat.Name != chat.Name ||
Expand Down Expand Up @@ -3949,6 +3955,10 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
for _, filter := range filters {
topics = append(topics, filter.ContentTopic)
}
// adding the content-topic used for member updates.
// since member updates would not be too frequent i.e only addition/deletion would add a new message,
// this shouldn't cause too much increase in size of archive generated.
filters = append(filters, m.transport.FilterByChatID(c.MemberUpdateChannelID()))

// First we need to know the timestamp of the latest waku message
// we've received for this community, so we can request messages we've
Expand Down
37 changes: 24 additions & 13 deletions protocol/transport/filters_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (f *FiltersManager) Init(

// Add public, one-to-one and negotiated filters.
for _, fi := range filtersToInit {
_, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic)
_, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic, fi.ContentTopicOverrideID)
if err != nil {
return nil, err
}
Expand All @@ -123,15 +123,16 @@ func (f *FiltersManager) Init(
}

type FiltersToInitialize struct {
ChatID string
PubsubTopic string
ChatID string
PubsubTopic string
ContentTopicOverrideID string //litte hacky but this is used to override content-topic in filtersManager.
}

func (f *FiltersManager) InitPublicFilters(publicFiltersToInit []FiltersToInitialize) ([]*Filter, error) {
var filters []*Filter
// Add public, one-to-one and negotiated filters.
for _, pf := range publicFiltersToInit {
f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic)
f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic, pf.ContentTopicOverrideID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -455,7 +456,7 @@ func (f *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter,
}

keyString := hex.EncodeToString(secret.Key)
filter, err := f.addSymmetric(keyString, "")
filter, err := f.addSymmetric(keyString, "", "")
if err != nil {
f.logger.Debug("could not register negotiated topic", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -534,11 +535,16 @@ func (f *FiltersManager) PersonalTopicFilter() *Filter {
}

// LoadPublic adds a filter for a public chat.
func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, error) {
func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string, contentTopicID string) (*Filter, error) {
f.mutex.Lock()
defer f.mutex.Unlock()

if chat, ok := f.filters[chatID]; ok {
chatIDToLoad := chatID
if contentTopicID != "" {
chatIDToLoad = contentTopicID
}

if chat, ok := f.filters[chatIDToLoad]; ok {
if chat.PubsubTopic != pubsubTopic {
f.logger.Debug("updating pubsub topic for filter",
zap.String("chatID", chatID),
Expand All @@ -547,13 +553,13 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter,
zap.String("newTopic", pubsubTopic),
)
chat.PubsubTopic = pubsubTopic
f.filters[chatID] = chat
f.filters[chatIDToLoad] = chat //TODO: Do we need to update watchers as well on modification?
}

return chat, nil
}

filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic)
filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic, contentTopicID)
if err != nil {
f.logger.Debug("could not register public chat topic", zap.String("chatID", chatID), zap.Error(err))
return nil, err
Expand Down Expand Up @@ -592,7 +598,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
return f.filters[chatID], nil
}

contactCodeFilter, err := f.addSymmetric(chatID, "")
contactCodeFilter, err := f.addSymmetric(chatID, "", "")
if err != nil {
f.logger.Debug("could not register contact code topic", zap.String("chatID", chatID), zap.Error(err))
return nil, err
Expand All @@ -615,11 +621,16 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
}

// addSymmetric adds a symmetric key filter
func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFilter, error) {
func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string, contentTopicID string) (*RawFilter, error) {
var symKeyID string
var err error

topic := ToTopic(chatID)
var topic []byte
if contentTopicID != "" {
//use override contentTopicID to generate contentTopic
topic = ToTopic(contentTopicID)
} else {
topic = ToTopic(chatID)
}
topics := [][]byte{topic}

symKey, ok := f.keys[chatID]
Expand Down
13 changes: 8 additions & 5 deletions protocol/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (t *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Fil
}

func (t *Transport) JoinPublic(chatID string) (*Filter, error) {
return t.filters.LoadPublic(chatID, "")
return t.filters.LoadPublic(chatID, "", "")
}

func (t *Transport) LeavePublic(chatID string) error {
Expand Down Expand Up @@ -258,6 +258,8 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) {
}

for i := range msgs {
// TODO: find the filter for the msg based on chatID in the message and map it properly. or better this is done in filter layer itself?
// something like t.FilterByChatID()
// Exclude anything that is a cache hit
if !hits[types.EncodeHex(msgs[i].Hash)] {
result[*filter] = append(result[*filter], msgs[i])
Expand All @@ -276,16 +278,16 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) {
// SendPublic sends a new message using the Whisper service.
// For public filters, chat name is used as an ID as well as
// a topic.
// In case of communities a single topic is used to send all messages.
func (t *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage, chatName string) ([]byte, error) {
if err := t.addSig(newMessage); err != nil {
return nil, err
}

filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic)
//passing content-topic override, it will be used if set. otherwise chatName will be used to load filter.
filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic, newMessage.ContentTopicOverride)
if err != nil {
return nil, err
}

newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.ContentTopic
newMessage.PubsubTopic = filter.PubsubTopic
Expand Down Expand Up @@ -362,7 +364,8 @@ func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *types.
}

// We load the filter to make sure we can post on it
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic)
//passing content-topic override, it will be used if set. otherwise chatName will be used to load filter.
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic, newMessage.ContentTopicOverride)
if err != nil {
return nil, err
}
Expand Down