Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compress portal event channels into one #673

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 27 additions & 15 deletions portal.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,13 @@ func (portal *Portal) MarkEncrypted() {

func (portal *Portal) ReceiveMatrixEvent(user bridge.User, evt *event.Event) {
if user.GetPermissionLevel() >= bridgeconfig.PermissionLevelUser || portal.HasRelaybot() {
portal.matrixMessages <- PortalMatrixMessage{user: user.(*User), evt: evt, receivedAt: time.Now()}
portal.events <- &PortalEvent{
MatrixMessage: &PortalMatrixMessage{
user: user.(*User),
evt: evt,
receivedAt: time.Now(),
},
}
}
}

Expand Down Expand Up @@ -199,9 +205,7 @@ func (br *WABridge) newBlankPortal(key database.PortalKey) *Portal {
log: br.Log.Sub(fmt.Sprintf("Portal/%s", key)),
zlog: br.ZLog.With().Str("portal_key", key.String()).Logger(),

messages: make(chan PortalMessage, br.Config.Bridge.PortalMessageBuffer),
matrixMessages: make(chan PortalMatrixMessage, br.Config.Bridge.PortalMessageBuffer),
mediaRetries: make(chan PortalMediaRetry, br.Config.Bridge.PortalMessageBuffer),
events: make(chan *PortalEvent, br.Config.Bridge.PortalMessageBuffer),

mediaErrorCache: make(map[types.MessageID]*FailedMediaMeta),
}
Expand Down Expand Up @@ -232,6 +236,12 @@ type fakeMessage struct {
Important bool
}

type PortalEvent struct {
Message *PortalMessage
MatrixMessage *PortalMatrixMessage
MediaRetry *PortalMediaRetry
}

type PortalMessage struct {
evt *events.Message
undecryptable *events.UndecryptableMessage
Expand Down Expand Up @@ -279,9 +289,7 @@ type Portal struct {
currentlyTyping []id.UserID
currentlyTypingLock sync.Mutex

messages chan PortalMessage
matrixMessages chan PortalMatrixMessage
mediaRetries chan PortalMediaRetry
events chan *PortalEvent

mediaErrorCache map[types.MessageID]*FailedMediaMeta

Expand Down Expand Up @@ -337,7 +345,7 @@ var (
_ bridge.TypingPortal = (*Portal)(nil)
)

func (portal *Portal) handleWhatsAppMessageLoopItem(msg PortalMessage) {
func (portal *Portal) handleWhatsAppMessageLoopItem(msg *PortalMessage) {
if len(portal.MXID) == 0 {
if msg.fake == nil && msg.undecryptable == nil && (msg.evt == nil || !containsSupportedMessage(msg.evt.Message)) {
portal.log.Debugln("Not creating portal room for incoming message: message is not a chat message")
Expand Down Expand Up @@ -369,7 +377,7 @@ func (portal *Portal) handleWhatsAppMessageLoopItem(msg PortalMessage) {
}
}

func (portal *Portal) handleMatrixMessageLoopItem(msg PortalMatrixMessage) {
func (portal *Portal) handleMatrixMessageLoopItem(msg *PortalMatrixMessage) {
portal.latestEventBackfillLock.Lock()
defer portal.latestEventBackfillLock.Unlock()
evtTS := time.UnixMilli(msg.evt.Timestamp)
Expand Down Expand Up @@ -483,12 +491,16 @@ func (portal *Portal) handleOneMessageLoopItem() {
}
}()
select {
case msg := <-portal.messages:
portal.handleWhatsAppMessageLoopItem(msg)
case msg := <-portal.matrixMessages:
portal.handleMatrixMessageLoopItem(msg)
case retry := <-portal.mediaRetries:
portal.handleMediaRetry(retry.evt, retry.source)
case msg := <-portal.events:
if msg.Message != nil {
portal.handleWhatsAppMessageLoopItem(msg.Message)
} else if msg.MatrixMessage != nil {
portal.handleMatrixMessageLoopItem(msg.MatrixMessage)
} else if msg.MediaRetry != nil {
portal.handleMediaRetry(msg.MediaRetry.evt, msg.MediaRetry.source)
} else {
portal.log.Warn("Portal event loop returned an event without any data")
}
}
}

Expand Down
52 changes: 32 additions & 20 deletions user.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,15 +635,17 @@ func (user *User) handleCallStart(sender types.JID, id, callType string, ts time
if callType != "" {
text = fmt.Sprintf("Incoming %s call. Use the WhatsApp app to answer.", callType)
}
portal.messages <- PortalMessage{
fake: &fakeMessage{
Sender: sender,
Text: text,
ID: id,
Time: ts,
Important: true,
portal.events <- &PortalEvent{
Message: &PortalMessage{
fake: &fakeMessage{
Sender: sender,
Text: text,
ID: id,
Time: ts,
Important: true,
},
source: user,
},
source: user,
}
}

Expand Down Expand Up @@ -865,11 +867,15 @@ func (user *User) HandleEvent(event interface{}) {
go user.handleChatPresence(v)
case *events.Message:
portal := user.GetPortalByMessageSource(v.Info.MessageSource)
portal.messages <- PortalMessage{evt: v, source: user}
portal.events <- &PortalEvent{
Message: &PortalMessage{evt: v, source: user},
}
case *events.MediaRetry:
user.phoneSeen(v.Timestamp)
portal := user.GetPortalByJID(v.ChatID)
portal.mediaRetries <- PortalMediaRetry{evt: v, source: user}
portal.events <- &PortalEvent{
MediaRetry: &PortalMediaRetry{evt: v, source: user},
}
case *events.CallOffer:
user.handleCallStart(v.CallCreator, v.CallID, "", v.Timestamp)
case *events.CallOfferNotice:
Expand All @@ -885,22 +891,26 @@ func (user *User) HandleEvent(event interface{}) {
if v.Implicit {
text = fmt.Sprintf("Your security code with %s (device #%d) changed.", puppet.Displayname, v.JID.Device)
}
portal.messages <- PortalMessage{
fake: &fakeMessage{
Sender: v.JID,
Text: text,
ID: strconv.FormatInt(v.Timestamp.Unix(), 10),
Time: v.Timestamp,
Important: false,
portal.events <- &PortalEvent{
Message: &PortalMessage{
fake: &fakeMessage{
Sender: v.JID,
Text: text,
ID: strconv.FormatInt(v.Timestamp.Unix(), 10),
Time: v.Timestamp,
Important: false,
},
source: user,
},
source: user,
}
}
case *events.CallTerminate, *events.CallRelayLatency, *events.CallAccept, *events.UnknownCallEvent:
// ignore
case *events.UndecryptableMessage:
portal := user.GetPortalByMessageSource(v.Info.MessageSource)
portal.messages <- PortalMessage{undecryptable: v, source: user}
portal.events <- &PortalEvent{
Message: &PortalMessage{undecryptable: v, source: user},
}
case *events.HistorySync:
if user.bridge.Config.Bridge.HistorySync.Backfill {
user.historySyncs <- v
Expand Down Expand Up @@ -1235,7 +1245,9 @@ func (user *User) handleReceipt(receipt *events.Receipt) {
if portal == nil || len(portal.MXID) == 0 {
return
}
portal.messages <- PortalMessage{receipt: receipt, source: user}
portal.events <- &PortalEvent{
Message: &PortalMessage{receipt: receipt, source: user},
}
}

func (user *User) makeReadMarkerContent(eventID id.EventID, doublePuppet bool) CustomReadMarkers {
Expand Down