Skip to content

Commit

Permalink
feat: handle internal comm
Browse files Browse the repository at this point in the history
  • Loading branch information
akurilov committed Oct 12, 2024
1 parent 43f9be4 commit 1f9f605
Show file tree
Hide file tree
Showing 16 changed files with 241 additions and 68 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ secret*.yaml
cover.tmp
key.json
.idea
service/converter/emaildata.html
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@ kubectl create secret generic gcp-dns-secret --from-file=key.json
```

```shell
kubectl create secret generic int-email --from-literal=rcpts=rcpt1,rcpt2,...
kubectl create secret generic int-email \
--from-literal=rcptsPublish=rcpt1,rcpt2 \
--from-literal=rcptsInternal=rcpt3,rcpt4,...
```
18 changes: 10 additions & 8 deletions api/smtp/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@ import (
)

type backend struct {
rcpts map[string]bool
dataLimit int64
svc service.Service
rcptsPublish map[string]bool
rcptsInternal map[string]bool
dataLimit int64
svc service.Service
}

func NewBackend(rcpts map[string]bool, dataLimit int64, svc service.Service) smtp.Backend {
func NewBackend(rcptsPublish, rcptsInternal map[string]bool, dataLimit int64, svc service.Service) smtp.Backend {
return backend{
rcpts: rcpts,
dataLimit: dataLimit,
svc: svc,
rcptsPublish: rcptsPublish,
rcptsInternal: rcptsInternal,
dataLimit: dataLimit,
svc: svc,
}
}

func (b backend) NewSession(c *smtp.Conn) (s smtp.Session, err error) {
s = newSession(b.rcpts, b.dataLimit, b.svc)
s = newSession(b.rcptsPublish, b.rcptsInternal, b.dataLimit, b.svc)
return
}
37 changes: 22 additions & 15 deletions api/smtp/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,28 @@ import (
)

type session struct {
rcptsAllowed map[string]bool
dataLimit int64
svc service.Service
rcptsPublish map[string]bool
rcptsInternal map[string]bool
dataLimit int64
svc service.Service
//
allowed bool
from string
publish bool
internal bool
from string
}

func newSession(rcptsAllowed map[string]bool, dataLimit int64, svc service.Service) smtp.Session {
func newSession(rcptsPublish, rcptsInternal map[string]bool, dataLimit int64, svc service.Service) smtp.Session {
return &session{
rcptsAllowed: rcptsAllowed,
dataLimit: dataLimit,
svc: svc,
rcptsPublish: rcptsPublish,
rcptsInternal: rcptsInternal,
dataLimit: dataLimit,
svc: svc,
}
}

func (s *session) Reset() {
s.allowed = false
s.publish = false
s.internal = false
s.from = ""
return
}
Expand All @@ -40,17 +44,20 @@ func (s *session) Mail(from string, opts *smtp.MailOptions) (err error) {
}

func (s *session) Rcpt(to string, opts *smtp.RcptOptions) (err error) {
if s.rcptsAllowed[to] {
s.allowed = true
if s.rcptsPublish[to] {
s.publish = true
}
if s.rcptsInternal[to] {
s.internal = true
}
return
}

func (s *session) Data(r io.Reader) (err error) {
switch s.allowed {
case true:
switch {
case s.publish, s.internal:
r = io.LimitReader(r, s.dataLimit)
err = s.svc.Submit(context.TODO(), s.from, r)
err = s.svc.Submit(context.TODO(), s.from, s.internal, r)
if err != nil {
err = &smtp.SMTPError{
Code: 554,
Expand Down
12 changes: 10 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ type ApiConfig struct {
Limit uint32 `envconfig:"API_SMTP_DATA_LIMIT" default:"1048576" required:"true"`
}
Recipients struct {
Names []string `envconfig:"API_SMTP_RECIPIENTS_NAMES" required:"true"`
Limit uint16 `envconfig:"API_SMTP_RECIPIENTS_LIMIT" default:"100" required:"true"`
Publish []string `envconfig:"API_SMTP_RECIPIENTS_PUBLISH" required:"true"`
Internal []string `envconfig:"API_SMTP_RECIPIENTS_INTERNAL" required:"true"`
Limit uint16 `envconfig:"API_SMTP_RECIPIENTS_LIMIT" default:"100" required:"true"`
}
Timeout struct {
Read time.Duration `envconfig:"API_SMTP_TIMEOUT_READ" default:"1m" required:"true"`
Expand All @@ -46,6 +47,7 @@ type ApiConfig struct {
Backoff time.Duration `envconfig:"API_WRITER_BACKOFF" default:"10s" required:"true"`
BatchSize uint32 `envconfig:"API_WRITER_BATCH_SIZE" default:"16" required:"true"`
Cache WriterCacheConfig
Internal WriterInternalConfig
Uri string `envconfig:"API_WRITER_URI" default:"resolver:50051" required:"true"`
}
}
Expand All @@ -55,6 +57,12 @@ type WriterCacheConfig struct {
Ttl time.Duration `envconfig:"API_WRITER_CACHE_TTL" default:"24h" required:"true"`
}

type WriterInternalConfig struct {
Name string `envconfig:"API_WRITER_INTERNAL_NAME" default:"awkinternal" required:"true"`
Value int32 `envconfig:"API_WRITER_INTERNAL_VALUE" required:"true"`
RateLimitPerMinute int `envconfig:"API_WRITER_INTERNAL_RATE_LIMIT_PER_MINUTE" default:"1" required:"true"`
}

type ReaderConfig struct {
UriEventBase string `envconfig:"API_READER_URI_EVT_BASE" default:"https://awakari.com/pub-msg.html?id=" required:"true"`
}
Expand Down
6 changes: 4 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ func TestConfig(t *testing.T) {
os.Setenv("API_WRITER_BACKOFF", "23h")
os.Setenv("API_WRITER_URI", "writer:56789")
os.Setenv("LOG_LEVEL", "4")
os.Setenv("API_SMTP_RECIPIENTS_NAMES", "rcpt1,rcpt2")
os.Setenv("API_SMTP_RECIPIENTS_PUBLISH", "rcpt1,rcpt2")
os.Setenv("API_SMTP_RECIPIENTS_INTERNAL", "rcpt3,rcpt4")
os.Setenv("API_WRITER_INTERNAL_VALUE", "123")
cfg, err := NewConfigFromEnv()
assert.Nil(t, err)
assert.Equal(t, 23*time.Hour, cfg.Api.Writer.Backoff)
Expand All @@ -24,5 +26,5 @@ func TestConfig(t *testing.T) {
assert.Equal(t, []string{
"rcpt1",
"rcpt2",
}, cfg.Api.Smtp.Recipients.Names)
}, cfg.Api.Smtp.Recipients.Publish)
}
18 changes: 16 additions & 2 deletions helm/int-email/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,16 @@ spec:
{{- end }}
- name: API_SMTP_DATA_LIMIT
value: "{{ .Values.api.smtp.data.limit }}"
- name: API_SMTP_RECIPIENTS_NAMES
- name: API_SMTP_RECIPIENTS_PUBLISH
valueFrom:
secretKeyRef:
name: "{{ include "int-email.fullname" . }}"
key: rcpts
key: rcptsPublish
- name: API_SMTP_RECIPIENTS_INTERNAL
valueFrom:
secretKeyRef:
name: "{{ include "int-email.fullname" . }}"
key: rcptsInternal
- name: API_SMTP_RECIPIENTS_LIMIT
value: "{{ .Values.api.smtp.rcpt.limit }}"
- name: API_SMTP_TIMEOUT_READ
Expand Down Expand Up @@ -74,6 +79,15 @@ spec:
value: "{{ .Values.api.writer.cache.ttl }}"
- name: API_WRITER_URI
value: "{{ .Values.api.writer.uri }}"
- name: API_WRITER_INTERNAL_NAME
value: "{{ .Values.api.writer.internal.name }}"
- name: API_WRITER_INTERNAL_VALUE
valueFrom:
secretKeyRef:
name: "{{ .Values.api.writer.internal.secret }}"
key: "{{ .Values.api.writer.internal.name }}"
- name: API_WRITER_INTERNAL_RATE_LIMIT_PER_MINUTE
value: "{{ .Values.api.writer.internal.rateLimit.minute }}"
- name: API_READER_URI_EVT_BASE
value: "{{ .Values.api.reader.uriEvtBase }}"
- name: LOG_LEVEL
Expand Down
5 changes: 5 additions & 0 deletions helm/int-email/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ api:
reader:
uriEvtBase: "https://awakari.com/pub-msg.html?id="
writer:
internal:
name: "awkinternal"
secret: "resolver-internal-attr-val"
rateLimit:
minute: 1
backoff: "10s"
batchSize: 16
cache:
Expand Down
15 changes: 10 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,22 @@ func main() {

svcWriter := writer.NewService(clientAwk, cfg.Api.Writer.Backoff, cfg.Api.Writer.Cache, log)
svcWriter = writer.NewLogging(svcWriter, log)
svcConv := converter.NewConverter(cfg.Api.EventType.Self, util.HtmlPolicy())
svcConv := converter.NewConverter(cfg.Api.EventType.Self, util.HtmlPolicy(), cfg.Api.Writer.Internal)
svcConv = converter.NewLogging(svcConv, log)
svc := service.NewService(svcConv, svcWriter, cfg.Api.Group)
svc = service.NewLogging(svc, log)

rcpts := map[string]bool{}
for _, name := range cfg.Api.Smtp.Recipients.Names {
rcptsPublish := map[string]bool{}
for _, name := range cfg.Api.Smtp.Recipients.Publish {
rcpt := fmt.Sprintf("%s@%s", name, cfg.Api.Smtp.Host)
rcpts[rcpt] = true
rcptsPublish[rcpt] = true
}
b := apiSmtp.NewBackend(rcpts, int64(cfg.Api.Smtp.Data.Limit), svc)
rcptsInternal := map[string]bool{}
for _, name := range cfg.Api.Smtp.Recipients.Internal {
rcpt := fmt.Sprintf("%s@%s", name, cfg.Api.Smtp.Host)
rcptsInternal[rcpt] = true
}
b := apiSmtp.NewBackend(rcptsPublish, rcptsInternal, int64(cfg.Api.Smtp.Data.Limit), svc)
b = apiSmtp.NewBackendLogging(b, log)

srv := smtp.NewServer(b)
Expand Down
6 changes: 3 additions & 3 deletions service/converter/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func NewLogging(svc Service, log *slog.Logger) Service {
}
}

func (l logging) Convert(src io.Reader, dst *pb.CloudEvent) (err error) {
err = l.svc.Convert(src, dst)
l.log.Log(context.TODO(), util.LogLevel(err), fmt.Sprintf("converter.Convert(source=%s, objectUrl=%s, evtId=%s): %s", dst.Source, dst.Attributes[ceKeyObjectUrl], dst.Id, err))
func (l logging) Convert(src io.Reader, dst *pb.CloudEvent, internal bool) (err error) {
err = l.svc.Convert(src, dst, internal)
l.log.Log(context.TODO(), util.LogLevel(err), fmt.Sprintf("converter.Convert(source=%s, objectUrl=%s, evtId=%s, internal=%t): %s", dst.Source, dst.Attributes[ceKeyObjectUrl], dst.Id, internal, err))
return
}
55 changes: 42 additions & 13 deletions service/converter/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,26 @@ import (
"errors"
"fmt"
"github.com/PuerkitoBio/goquery"
"github.com/awakari/int-email/config"
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
"github.com/jhillyerd/enmime"
"github.com/microcosm-cc/bluemonday"
"github.com/segmentio/ksuid"
"google.golang.org/protobuf/types/known/timestamppb"
"io"
"regexp"
"strings"
"time"
)

type Service interface {
Convert(src io.Reader, dst *pb.CloudEvent) (err error)
Convert(src io.Reader, dst *pb.CloudEvent, internal bool) (err error)
}

type svc struct {
evtType string
htmlPolicy *bluemonday.Policy
evtType string
htmlPolicy *bluemonday.Policy
writerInternalCfg config.WriterInternalConfig
}

const ceKeyLenMax = 20
Expand All @@ -32,7 +35,6 @@ const ceKeyAttFileNames = "attachmentfilenames"
const ceSpecVersion = "1.0"

var ErrParse = errors.New("failed to parse message")

var headerBlacklist = map[string]bool{
"bcc": true,
"cc": true,
Expand All @@ -53,27 +55,29 @@ var headerBlacklist = map[string]bool{
"xmailgunvariables": true,
"xreceived": true,
}
var reUrlTail = regexp.MustCompile(`\?[a-zA-Z0-9_\-]+=[a-zA-Z0-9_\-~.%&/#+]*`)

func NewConverter(evtType string, htmlPolicy *bluemonday.Policy) Service {
func NewConverter(evtType string, htmlPolicy *bluemonday.Policy, writerInternalCfg config.WriterInternalConfig) Service {
return svc{
evtType: evtType,
htmlPolicy: htmlPolicy,
evtType: evtType,
htmlPolicy: htmlPolicy,
writerInternalCfg: writerInternalCfg,
}
}

func (c svc) Convert(src io.Reader, dst *pb.CloudEvent) (err error) {
func (c svc) Convert(src io.Reader, dst *pb.CloudEvent, internal bool) (err error) {
var e *enmime.Envelope
e, err = enmime.ReadEnvelope(src)
switch err {
case nil:
err = c.convert(e, dst)
err = c.convert(e, dst, internal)
default:
err = fmt.Errorf("%w: %s", ErrParse, err)
}
return
}

func (c svc) convert(src *enmime.Envelope, dst *pb.CloudEvent) (err error) {
func (c svc) convert(src *enmime.Envelope, dst *pb.CloudEvent, internal bool) (err error) {

for _, k := range src.GetHeaderKeys() {
v := src.GetHeader(k)
Expand Down Expand Up @@ -102,7 +106,7 @@ func (c svc) convert(src *enmime.Envelope, dst *pb.CloudEvent) (err error) {
case "listurl":
dst.Source = c.convertAddr(v)
default:
if !headerBlacklist[ceKey] && v != "" {
if internal || !headerBlacklist[ceKey] && v != "" {
v = c.convertAddr(v)
dst.Attributes[ceKey] = &pb.CloudEventAttributeValue{
Attr: &pb.CloudEventAttributeValue_CeString{
Expand All @@ -125,8 +129,16 @@ func (c svc) convert(src *enmime.Envelope, dst *pb.CloudEvent) (err error) {
if src.HTML != "" {
err = c.handleHtml(src.HTML, dst)
if err == nil {
dst.Data = &pb.CloudEvent_TextData{
TextData: c.htmlPolicy.Sanitize(src.HTML),
switch internal {
case true:
dst.Data = &pb.CloudEvent_TextData{
TextData: src.HTML,
}
default:
txt := reUrlTail.ReplaceAllString(src.HTML, "\"")
dst.Data = &pb.CloudEvent_TextData{
TextData: c.htmlPolicy.Sanitize(txt),
}
}
}
}
Expand Down Expand Up @@ -181,6 +193,14 @@ func (c svc) convert(src *enmime.Envelope, dst *pb.CloudEvent) (err error) {
}
}

if internal {
dst.Attributes[c.writerInternalCfg.Name] = &pb.CloudEventAttributeValue{
Attr: &pb.CloudEventAttributeValue_CeInteger{
CeInteger: c.writerInternalCfg.Value,
},
}
}

return
}

Expand All @@ -200,6 +220,10 @@ func (c svc) convertAddr(src string) (dst string) {
dst = dst[:len(dst)-1]
}
}
urlEnd := strings.Index(dst, "?")
if urlEnd > 0 {
dst = dst[:urlEnd]
}
return
}

Expand All @@ -220,11 +244,16 @@ func (c svc) handleHtml(src string, evt *pb.CloudEvent) (err error) {
}
}
if urlOrig != "" {
urlEnd := strings.Index(urlOrig, "?")
if urlEnd > 0 {
urlOrig = urlOrig[:urlEnd]
}
evt.Attributes[ceKeyObjectUrl] = &pb.CloudEventAttributeValue{
Attr: &pb.CloudEventAttributeValue_CeUri{
CeUri: urlOrig,
},
}
break
}
}
}
Expand Down
Loading

0 comments on commit 1f9f605

Please sign in to comment.