use temp file instead of ram buffer
This commit is contained in:
parent
8a92216ce6
commit
db22435efb
4 changed files with 19 additions and 28 deletions
|
@ -24,7 +24,7 @@ import (
|
|||
var (
|
||||
listenPort int
|
||||
healthPort int
|
||||
rspamAddr string
|
||||
rspamAddr string
|
||||
|
||||
enableTLS bool
|
||||
)
|
||||
|
|
1
go.mod
1
go.mod
|
@ -8,7 +8,6 @@ require (
|
|||
github.com/emersion/go-smtp v0.16.0
|
||||
github.com/oklog/ulid v1.3.1
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/stretchr/readcaster v0.0.0-20140428013627-9c14a60f85dc
|
||||
go.uber.org/zap v1.24.0
|
||||
golang.org/x/time v0.3.0
|
||||
)
|
||||
|
|
2
go.sum
2
go.sum
|
@ -24,8 +24,6 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
|
|||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||
github.com/stretchr/readcaster v0.0.0-20140428013627-9c14a60f85dc h1:Wf5kc1t8U/5js/4iB1jsmHwgn1xuCweQVH7P62RAriE=
|
||||
github.com/stretchr/readcaster v0.0.0-20140428013627-9c14a60f85dc/go.mod h1:rYtbMROG2CU051Mt3UBUEESYLe0vS1K+Oj2MzBk0V6Q=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
|
|
|
@ -1,21 +1,19 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/emersion/go-message/mail"
|
||||
"github.com/emersion/go-milter"
|
||||
"github.com/emersion/go-smtp"
|
||||
"github.com/oklog/ulid"
|
||||
"github.com/stretchr/readcaster"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
@ -186,31 +184,24 @@ func (session *Session) Rcpt(to string) error {
|
|||
func (session *Session) Data(r io.Reader) error {
|
||||
session.log.Debug("receiving DATA")
|
||||
|
||||
caster := readcaster.New(r)
|
||||
msgReader := caster.NewReader()
|
||||
rspamReader := caster.NewReader()
|
||||
f, err := os.CreateTemp("", "inbound-")
|
||||
if err != nil {
|
||||
session.log.Error("failed to create temporary file for message data", zap.Error(err))
|
||||
|
||||
wg := new(sync.WaitGroup)
|
||||
return ErrBadGateway
|
||||
}
|
||||
|
||||
msg := new(bytes.Buffer)
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
n, err := msg.ReadFrom(msgReader)
|
||||
if err != nil {
|
||||
session.log.Error("failed to read message data into message", zap.Error(err))
|
||||
|
||||
return
|
||||
defer func() {
|
||||
if err := f.Close(); err != nil {
|
||||
session.log.Error("failed to close temporary message file", zap.Error(err))
|
||||
}
|
||||
|
||||
if n == 0 {
|
||||
session.log.Warn("received empty message")
|
||||
if err := os.Remove(f.Name()); err != nil {
|
||||
session.log.Error("failed to remove temporary message data file", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
_, action, err := session.rspam.BodyReadFrom(rspamReader)
|
||||
_, action, err := session.rspam.BodyReadFrom(f)
|
||||
if err != nil {
|
||||
session.log.Error("rspamd failed to process message body", zap.Error(err))
|
||||
|
||||
|
@ -225,8 +216,11 @@ func (session *Session) Data(r io.Reader) error {
|
|||
|
||||
session.log.Debug("message accepted for delivery")
|
||||
|
||||
// ensure the message has been received to msg
|
||||
wg.Wait()
|
||||
if _, err := f.Seek(0, 0); err != nil {
|
||||
session.log.Error("failed to seek to beginning of temporary message data file", zap.Error(err))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Forward to LMTP destination
|
||||
conn, err := net.DialTimeout("tcp", session.b.LMTPAddress, defaultLMTPTimeout)
|
||||
|
@ -248,7 +242,7 @@ func (session *Session) Data(r io.Reader) error {
|
|||
return errors.New("failed to forward message")
|
||||
}
|
||||
|
||||
if err := lc.SendMail(session.from, session.to, msg); err != nil {
|
||||
if err := lc.SendMail(session.from, session.to, f); err != nil {
|
||||
session.log.Error("failed to forward message to LMTP server", zap.Error(err))
|
||||
|
||||
return err
|
||||
|
|
Loading…
Reference in a new issue