Skip to content

Commit

Permalink
feat_: phase-1 of use single content-topic to send messages for all c…
Browse files Browse the repository at this point in the history
…ommunity chats
  • Loading branch information
chaitanyaprem committed Nov 1, 2024
1 parent 768cda8 commit 103b415
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 36 deletions.
27 changes: 14 additions & 13 deletions eth-node/types/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ import (

// NewMessage represents a new whisper message that is posted through the RPC.
type NewMessage struct {
SymKeyID string `json:"symKeyID"`
PublicKey []byte `json:"pubKey"`
SigID string `json:"sig"`
TTL uint32 `json:"ttl"`
PubsubTopic string `json:"pubsubTopic"`
Topic TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PowTime uint32 `json:"powTime"`
PowTarget float64 `json:"powTarget"`
TargetPeer string `json:"targetPeer"`
Ephemeral bool `json:"ephemeral"`
Priority *int `json:"priority"`
SymKeyID string `json:"symKeyID"`
PublicKey []byte `json:"pubKey"`
SigID string `json:"sig"`
TTL uint32 `json:"ttl"`
PubsubTopic string `json:"pubsubTopic"`
Topic TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PowTime uint32 `json:"powTime"`
PowTarget float64 `json:"powTarget"`
TargetPeer string `json:"targetPeer"`
Ephemeral bool `json:"ephemeral"`
Priority *int `json:"priority"`
ContentTopicOverride string `json:"contentTopicOverride"`
}

// Message is the RPC representation of a whisper message.
Expand Down
12 changes: 7 additions & 5 deletions protocol/common/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,11 +660,12 @@ func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMes
}

newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic,
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic,
ContentTopicOverride: rawMessage.ContentTopicOverride,
}

if rawMessage.BeforeDispatch != nil {
Expand Down Expand Up @@ -765,6 +766,7 @@ func (s *MessageSender) SendPublic(
newMessage.Ephemeral = rawMessage.Ephemeral
newMessage.PubsubTopic = rawMessage.PubsubTopic
newMessage.Priority = rawMessage.Priority
newMessage.ContentTopicOverride = rawMessage.ContentTopicOverride

messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)

Expand Down
1 change: 1 addition & 0 deletions protocol/common/raw_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,5 @@ type RawMessage struct {
ResendType ResendType
ResendMethod ResendMethod
Priority *MessagePriority
ContentTopicOverride string
}
6 changes: 6 additions & 0 deletions protocol/communities/community.go
Original file line number Diff line number Diff line change
Expand Up @@ -1567,6 +1567,12 @@ func (o *Community) setPrivateKey(pk *ecdsa.PrivateKey) {
o.config.PrivateKey = pk
}
}
func (o *Community) UniversalChatID() string {
// Using Member updates channelID as chatID to act as a universal content-topic for all chats in the community as explained here https://forum.vac.dev/t/status-communities-review-and-proposed-usage-of-waku-content-topics/335
// This is to match filter criteria of community with the content-topic usage.
// This specific topic is chosen as existing users before the change are already subscribed to this and will not get affected by it.
return o.MemberUpdateChannelID()
}

func (o *Community) SetResendAccountsClock(clock uint64) {
o.config.CommunityDescription.ResendAccountsClock = clock
Expand Down
16 changes: 16 additions & 0 deletions protocol/communities/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4194,6 +4194,22 @@ func (m *Manager) GetOwnedCommunitiesChatIDs() (map[string]bool, error) {
return chatIDs, nil
}

func (m *Manager) GetOwnedCommunitiesUniversalChatIDs() (map[string]bool, error) {
ownedCommunities, err := m.Controlled()
if err != nil {
return nil, err

Check warning on line 4200 in protocol/communities/manager.go

View check run for this annotation

Codecov / codecov/patch

protocol/communities/manager.go#L4200

Added line #L4200 was not covered by tests
}

chatIDs := make(map[string]bool)
for _, c := range ownedCommunities {
if c.Joined() {
chatIDs[c.UniversalChatID()] = true
}
}
return chatIDs, nil

}

func (m *Manager) StoreWakuMessage(message *types.Message) error {
return m.persistence.SaveWakuMessage(message)
}
Expand Down
4 changes: 4 additions & 0 deletions protocol/communities/manager_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ func (m *ArchiveManager) StartHistoryArchiveTasksInterval(community *Community,
m.logger.Error("failed to get community chat topics ", zap.Error(err))
continue
}
// adding the content-topic used for all community chats as all chat messages will be published on this topic.
// since member updates would not be too frequent i.e only addition/deletion would add a new message,
// this shouldn't cause too much increase in size of archive generated.
topics = append(topics, m.transport.FilterByChatID(community.UniversalChatID()).ContentTopic)

Check warning on line 360 in protocol/communities/manager_archive.go

View check run for this annotation

Codecov / codecov/patch

protocol/communities/manager_archive.go#L360

Added line #L360 was not covered by tests

ts := time.Now().Unix()
to := time.Unix(ts, 0)
Expand Down
3 changes: 2 additions & 1 deletion protocol/communities_messenger_token_permissions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2152,7 +2152,8 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe
startDate := messageDate.Add(-time.Minute)
endDate := messageDate.Add(time.Minute)
topic := types.BytesToTopic(transport.ToTopic(chat.ID))
topics := []types.TopicType{topic}
communityCommonTopic := types.BytesToTopic(transport.ToTopic(community.UniversalChatID()))
topics := []types.TopicType{topic, communityCommonTopic}

torrentConfig := params.TorrentConfig{
Enabled: true,
Expand Down
12 changes: 11 additions & 1 deletion protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2268,7 +2268,8 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe
)
return rawMessage, fmt.Errorf("can't post message type '%d' on chat '%s'", rawMessage.MessageType, chat.ID)
}

//setting content-topic over-ride for community messages to use memberUpdatesChannelID
rawMessage.ContentTopicOverride = community.UniversalChatID()
logger.Debug("sending community chat message", zap.String("chatName", chat.Name))
isCommunityEncrypted, err := m.communitiesManager.IsEncrypted(chat.CommunityID)
if err != nil {
Expand Down Expand Up @@ -3589,6 +3590,15 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
if err != nil {
logger.Info("failed to retrieve admin communities", zap.Error(err))
}
//fetch universal chatIDs as well.
controlledCommunitiesUniversalChatIDs, err := m.communitiesManager.GetOwnedCommunitiesUniversalChatIDs()
if err != nil {
logger.Info("failed to retrieve admin communities", zap.Error(err))

Check warning on line 3596 in protocol/messenger.go

View check run for this annotation

Codecov / codecov/patch

protocol/messenger.go#L3596

Added line #L3596 was not covered by tests
}

for chatID, flag := range controlledCommunitiesUniversalChatIDs {
controlledCommunitiesChatIDs[chatID] = flag
}

iterator := m.retrievedMessagesIteratorFactory(chatWithMessages)
for iterator.HasNext() {
Expand Down
11 changes: 11 additions & 0 deletions protocol/messenger_communities.go
Original file line number Diff line number Diff line change
Expand Up @@ -2714,6 +2714,13 @@ func (m *Messenger) UpdateCommunityFilters(community *communities.Community) err

publicFiltersToInit = append(publicFiltersToInit, defaultFilters...)

for _, filter := range defaultFilters {
_, err := m.transport.RemoveFilterByChatID(filter.ChatID)
if err != nil {
return err

Check warning on line 2720 in protocol/messenger_communities.go

View check run for this annotation

Codecov / codecov/patch

protocol/messenger_communities.go#L2720

Added line #L2720 was not covered by tests
}
}

for chatID := range community.Chats() {
communityChatID := community.IDString() + chatID
_, err := m.transport.RemoveFilterByChatID(communityChatID)
Expand Down Expand Up @@ -3949,6 +3956,10 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
for _, filter := range filters {
topics = append(topics, filter.ContentTopic)
}
// adding the content-topic used for member updates.
// since member updates would not be too frequent i.e only addition/deletion would add a new message,
// this shouldn't cause too much increase in size of archive generated.
filters = append(filters, m.transport.FilterByChatID(c.UniversalChatID()))

Check warning on line 3962 in protocol/messenger_communities.go

View check run for this annotation

Codecov / codecov/patch

protocol/messenger_communities.go#L3962

Added line #L3962 was not covered by tests

// First we need to know the timestamp of the latest waku message
// we've received for this community, so we can request messages we've
Expand Down
34 changes: 23 additions & 11 deletions protocol/transport/filters_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (f *FiltersManager) Init(

// Add public, one-to-one and negotiated filters.
for _, fi := range filtersToInit {
_, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic)
_, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic, fi.ContentTopicOverrideID)
if err != nil {
return nil, err
}
Expand All @@ -123,15 +123,16 @@ func (f *FiltersManager) Init(
}

type FiltersToInitialize struct {
ChatID string
PubsubTopic string
ChatID string
PubsubTopic string
ContentTopicOverrideID string //litte hacky but this is used to override content-topic in filtersManager.
}

func (f *FiltersManager) InitPublicFilters(publicFiltersToInit []FiltersToInitialize) ([]*Filter, error) {
var filters []*Filter
// Add public, one-to-one and negotiated filters.
for _, pf := range publicFiltersToInit {
f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic)
f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic, pf.ContentTopicOverrideID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -455,7 +456,7 @@ func (f *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter,
}

keyString := hex.EncodeToString(secret.Key)
filter, err := f.addSymmetric(keyString, "")
filter, err := f.addSymmetric(keyString, "", "")
if err != nil {
f.logger.Debug("could not register negotiated topic", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -534,11 +535,16 @@ func (f *FiltersManager) PersonalTopicFilter() *Filter {
}

// LoadPublic adds a filter for a public chat.
func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, error) {
func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string, contentTopicID string) (*Filter, error) {
f.mutex.Lock()
defer f.mutex.Unlock()

if chat, ok := f.filters[chatID]; ok {
chatIDToLoad := chatID
if contentTopicID != "" {
chatIDToLoad = contentTopicID
}

if chat, ok := f.filters[chatIDToLoad]; ok {
if chat.PubsubTopic != pubsubTopic {
f.logger.Debug("updating pubsub topic for filter",
zap.String("chatID", chatID),
Expand All @@ -547,13 +553,13 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter,
zap.String("newTopic", pubsubTopic),
)
chat.PubsubTopic = pubsubTopic
f.filters[chatID] = chat
f.filters[chatIDToLoad] = chat //TODO: Do we need to update watchers as well on modification?
}

return chat, nil
}

filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic)
filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic, contentTopicID)
if err != nil {
f.logger.Debug("could not register public chat topic", zap.String("chatID", chatID), zap.Error(err))
return nil, err
Expand Down Expand Up @@ -592,7 +598,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
return f.filters[chatID], nil
}

contactCodeFilter, err := f.addSymmetric(chatID, "")
contactCodeFilter, err := f.addSymmetric(chatID, "", "")
if err != nil {
f.logger.Debug("could not register contact code topic", zap.String("chatID", chatID), zap.Error(err))
return nil, err
Expand All @@ -615,7 +621,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
}

// addSymmetric adds a symmetric key filter
func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFilter, error) {
func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string, contentTopicID string) (*RawFilter, error) {
var symKeyID string
var err error

Expand Down Expand Up @@ -644,6 +650,12 @@ func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFi
}
}

if contentTopicID != "" {
//add receive filter for the single default contentTopic for all community chats
topic = ToTopic(contentTopicID)
topics = append(topics, topic)
}

id, err := f.service.Subscribe(&types.SubscriptionOptions{
SymKeyID: symKeyID,
PoW: minPow,
Expand Down
13 changes: 8 additions & 5 deletions protocol/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (t *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Fil
}

func (t *Transport) JoinPublic(chatID string) (*Filter, error) {
return t.filters.LoadPublic(chatID, "")
return t.filters.LoadPublic(chatID, "", "")
}

func (t *Transport) LeavePublic(chatID string) error {
Expand Down Expand Up @@ -223,6 +223,8 @@ func (t *Transport) GetStats() types.StatsSummary {
return t.waku.GetStats()
}

// With change in filter used for communities, messages are indexed here with common filter and not their own chatID filter.
// The caller should not use chatID from the filter to determine chatID of the message, rather should dervice it from messaage itself.
func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) {
result := make(map[Filter][]*types.Message)
logger := t.logger.With(zap.String("site", "retrieveRawAll"))
Expand Down Expand Up @@ -276,16 +278,16 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) {
// SendPublic sends a new message using the Whisper service.
// For public filters, chat name is used as an ID as well as
// a topic.
// In case of communities a single topic is used to send all messages.
func (t *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage, chatName string) ([]byte, error) {
if err := t.addSig(newMessage); err != nil {
return nil, err
}

filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic)
//passing content-topic override, it will be used if set. otherwise chatName will be used to load filter.
filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic, newMessage.ContentTopicOverride)
if err != nil {
return nil, err
}

newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.ContentTopic
newMessage.PubsubTopic = filter.PubsubTopic
Expand Down Expand Up @@ -362,7 +364,8 @@ func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *types.
}

// We load the filter to make sure we can post on it
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic)
//passing content-topic override, it will be used if set. otherwise chatName will be used to load filter.
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic, newMessage.ContentTopicOverride)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 103b415

Please sign in to comment.