diff --git a/cmd/inbound/inbound.go b/cmd/inbound/inbound.go index ab12d47..b481581 100644 --- a/cmd/inbound/inbound.go +++ b/cmd/inbound/inbound.go @@ -24,7 +24,7 @@ import ( var ( listenPort int healthPort int - rspamAddr string + rspamAddr string enableTLS bool ) diff --git a/go.mod b/go.mod index 4487b32..f49f44a 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/emersion/go-smtp v0.16.0 github.com/oklog/ulid v1.3.1 github.com/pkg/errors v0.9.1 - github.com/stretchr/readcaster v0.0.0-20140428013627-9c14a60f85dc go.uber.org/zap v1.24.0 golang.org/x/time v0.3.0 ) diff --git a/go.sum b/go.sum index ff41c08..73e463e 100644 --- a/go.sum +++ b/go.sum @@ -24,8 +24,6 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/readcaster v0.0.0-20140428013627-9c14a60f85dc h1:Wf5kc1t8U/5js/4iB1jsmHwgn1xuCweQVH7P62RAriE= -github.com/stretchr/readcaster v0.0.0-20140428013627-9c14a60f85dc/go.mod h1:rYtbMROG2CU051Mt3UBUEESYLe0vS1K+Oj2MzBk0V6Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= diff --git a/server/server.go b/server/server.go index 62ef674..46ed9e5 100644 --- a/server/server.go +++ b/server/server.go @@ -1,21 +1,19 @@ package server import ( - "bytes" "crypto/rand" "errors" "fmt" "io" "net" + "os" "strings" - "sync" "time" "github.com/emersion/go-message/mail" "github.com/emersion/go-milter" "github.com/emersion/go-smtp" "github.com/oklog/ulid" - "github.com/stretchr/readcaster" "go.uber.org/zap" "golang.org/x/time/rate" ) @@ -186,31 +184,24 @@ func (session *Session) Rcpt(to string) error { func (session *Session) Data(r io.Reader) error { session.log.Debug("receiving DATA") - caster := readcaster.New(r) - msgReader := caster.NewReader() - rspamReader := caster.NewReader() + f, err := os.CreateTemp("", "inbound-") + if err != nil { + session.log.Error("failed to create temporary file for message data", zap.Error(err)) - wg := new(sync.WaitGroup) + return ErrBadGateway + } - msg := new(bytes.Buffer) - - wg.Add(1) - go func() { - defer wg.Done() - - n, err := msg.ReadFrom(msgReader) - if err != nil { - session.log.Error("failed to read message data into message", zap.Error(err)) - - return + defer func() { + if err := f.Close(); err != nil { + session.log.Error("failed to close temporary message file", zap.Error(err)) } - if n == 0 { - session.log.Warn("received empty message") + if err := os.Remove(f.Name()); err != nil { + session.log.Error("failed to remove temporary message data file", zap.Error(err)) } }() - _, action, err := session.rspam.BodyReadFrom(rspamReader) + _, action, err := session.rspam.BodyReadFrom(f) if err != nil { session.log.Error("rspamd failed to process message body", zap.Error(err)) @@ -225,8 +216,11 @@ func (session *Session) Data(r io.Reader) error { session.log.Debug("message accepted for delivery") - // ensure the message has been received to msg - wg.Wait() + if _, err := f.Seek(0, 0); err != nil { + session.log.Error("failed to seek to beginning of temporary message data file", zap.Error(err)) + + return err + } // Forward to LMTP destination conn, err := net.DialTimeout("tcp", session.b.LMTPAddress, defaultLMTPTimeout) @@ -248,7 +242,7 @@ func (session *Session) Data(r io.Reader) error { return errors.New("failed to forward message") } - if err := lc.SendMail(session.from, session.to, msg); err != nil { + if err := lc.SendMail(session.from, session.to, f); err != nil { session.log.Error("failed to forward message to LMTP server", zap.Error(err)) return err