more logging
This commit is contained in:
parent
c2656deb28
commit
e0fde15404
3 changed files with 53 additions and 12 deletions
1
go.mod
1
go.mod
|
@ -6,6 +6,7 @@ require (
|
|||
github.com/emersion/go-message v0.16.0
|
||||
github.com/emersion/go-milter v0.3.3
|
||||
github.com/emersion/go-smtp v0.16.0
|
||||
github.com/oklog/ulid v1.3.1
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/stretchr/readcaster v0.0.0-20140428013627-9c14a60f85dc
|
||||
go.uber.org/zap v1.24.0
|
||||
|
|
2
go.sum
2
go.sum
|
@ -15,6 +15,8 @@ github.com/emersion/go-textwrapper v0.0.0-20160606182133-d0e65e56babe/go.mod h1:
|
|||
github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594 h1:IbFBtwoTQyw0fIM5xv1HF+Y+3ZijDR839WMulgxCcUY=
|
||||
github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594/go.mod h1:aqO8z8wPrjkscevZJFVE1wXJrLpC5LtJG7fqLOsPb2U=
|
||||
github.com/martinlindhe/base36 v1.0.0/go.mod h1:+AtEs8xrBpCeYgSLoY/aJ6Wf37jtBuR0s35750M27+8=
|
||||
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
|
||||
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
|
|
|
@ -3,6 +3,7 @@ package server
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -14,6 +15,7 @@ import (
|
|||
"github.com/emersion/go-message/mail"
|
||||
"github.com/emersion/go-milter"
|
||||
"github.com/emersion/go-smtp"
|
||||
"github.com/oklog/ulid"
|
||||
"github.com/stretchr/readcaster"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/time/rate"
|
||||
|
@ -57,6 +59,17 @@ func (b *Backend) NewSession(conn *smtp.Conn) (smtp.Session, error) {
|
|||
ctx, cancel := context.WithTimeout(b.RootContext, ConnectionTimeout)
|
||||
defer cancel()
|
||||
|
||||
traceID, err := ulid.New(ulid.Now(), rand.Reader)
|
||||
if err != nil {
|
||||
b.Log.Error("failed to create traceID for new connection", zap.Error(err))
|
||||
|
||||
return nil, errors.New("internal error")
|
||||
}
|
||||
|
||||
log := b.Log.With(zap.String("traceid", traceID.String()), zap.String("remote", conn.Conn().RemoteAddr().String()))
|
||||
|
||||
log.Debug("received new connection", zap.String("remote", conn.Conn().RemoteAddr().String()))
|
||||
|
||||
if err := b.OverallLimiter.Wait(ctx); err != nil {
|
||||
return nil, ErrRateLimit
|
||||
}
|
||||
|
@ -69,6 +82,7 @@ func (b *Backend) NewSession(conn *smtp.Conn) (smtp.Session, error) {
|
|||
return &Session{
|
||||
b: b,
|
||||
conn: conn,
|
||||
log: log,
|
||||
rspam: rs,
|
||||
}, nil
|
||||
}
|
||||
|
@ -78,6 +92,7 @@ type Session struct {
|
|||
b *Backend
|
||||
conn *smtp.Conn
|
||||
rspam *milter.ClientSession
|
||||
log *zap.Logger
|
||||
|
||||
to []string
|
||||
from string
|
||||
|
@ -91,7 +106,7 @@ func (session *Session) Reset() {
|
|||
func (session *Session) Logout() error {
|
||||
if session.rspam != nil {
|
||||
if err := session.rspam.Close(); err != nil {
|
||||
session.b.Log.Error("failed to close rspam session", zap.Error(err))
|
||||
session.log.Error("failed to close rspam session", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -105,22 +120,38 @@ func (session *Session) AuthPlain(username string, password string) error {
|
|||
|
||||
// Set return path for currently processed message.
|
||||
func (session *Session) Mail(from string, opts *smtp.MailOptions) error {
|
||||
session.log = session.log.With(zap.String("from", from))
|
||||
|
||||
session.log.Debug("received MAIL FROM")
|
||||
|
||||
action, err := session.rspam.Mail(from, nil)
|
||||
if err != nil {
|
||||
session.log.Error("failed to process from with rspam", zap.Error(err))
|
||||
|
||||
return fmt.Errorf("filter failure: %w", err)
|
||||
}
|
||||
|
||||
if err := parseAction(action); err != nil {
|
||||
session.log.Info("rspam rejected from", zap.Error(err))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
session.from = from
|
||||
|
||||
session.log.Debug("accepted MAIL FROM")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add recipient for currently processed message.
|
||||
func (session *Session) Rcpt(to string) error {
|
||||
if session.to == nil {
|
||||
session.log = session.log.With(zap.String("to", to))
|
||||
}
|
||||
|
||||
session.log.Sugar().Debugf("received RCPT TO: %q", to)
|
||||
|
||||
if err := session.validateDomain(to); err != nil {
|
||||
return &smtp.SMTPError{
|
||||
Code: 550,
|
||||
|
@ -131,15 +162,21 @@ func (session *Session) Rcpt(to string) error {
|
|||
|
||||
action, err := session.rspam.Rcpt(to, nil)
|
||||
if err != nil {
|
||||
session.log.Error("failed to process RCPT TO with rspam", zap.Error(err))
|
||||
|
||||
return fmt.Errorf("filter failure: %w", err)
|
||||
}
|
||||
|
||||
if err := parseAction(action); err != nil {
|
||||
session.log.Info("rspam rejected RCPT TO", zap.Error(err))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
session.to = append(session.to, to)
|
||||
|
||||
session.log.Debug("accepted RCPT TO")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -147,10 +184,7 @@ func (session *Session) Rcpt(to string) error {
|
|||
//
|
||||
// r must be consumed before Data returns.
|
||||
func (session *Session) Data(r io.Reader) error {
|
||||
log := session.b.Log.With(
|
||||
zap.String("from", session.from),
|
||||
zap.Strings("to", session.to),
|
||||
)
|
||||
session.log.Debug("receiving DATA")
|
||||
|
||||
caster := readcaster.New(r)
|
||||
|
||||
|
@ -164,26 +198,30 @@ func (session *Session) Data(r io.Reader) error {
|
|||
|
||||
n, err := msg.ReadFrom(caster.NewReader())
|
||||
if err != nil {
|
||||
log.Error("failed to read message data into message", zap.Error(err))
|
||||
session.log.Error("failed to read message data into message", zap.Error(err))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if n == 0 {
|
||||
log.Warn("received empty message")
|
||||
session.log.Warn("received empty message")
|
||||
}
|
||||
}()
|
||||
|
||||
_, action, err := session.rspam.BodyReadFrom(caster.NewReader())
|
||||
if err != nil {
|
||||
session.log.Error("rspamd failed to process message body", zap.Error(err))
|
||||
|
||||
return fmt.Errorf("filter failure: %w", err)
|
||||
}
|
||||
|
||||
if err := parseAction(action); err != nil {
|
||||
session.log.Info("rspamd rejected mail data", zap.Error(err))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debug("message accepted for delivery")
|
||||
session.log.Debug("message accepted for delivery")
|
||||
|
||||
// ensure the message has been received to msg
|
||||
wg.Wait()
|
||||
|
@ -191,7 +229,7 @@ func (session *Session) Data(r io.Reader) error {
|
|||
// Forward to LMTP destination
|
||||
conn, err := net.DialTimeout("tcp", session.b.LMTPAddress, defaultLMTPTimeout)
|
||||
if err != nil {
|
||||
log.Error("failed to dial LMTP server",
|
||||
session.log.Error("failed to dial LMTP server",
|
||||
zap.String("lmtp", session.b.LMTPAddress),
|
||||
zap.Error(err),
|
||||
)
|
||||
|
@ -203,18 +241,18 @@ func (session *Session) Data(r io.Reader) error {
|
|||
|
||||
lc, err := smtp.NewClientLMTP(conn, host)
|
||||
if err != nil {
|
||||
log.Error("failed to construct LMTP client", zap.Error(err))
|
||||
session.log.Error("failed to construct LMTP client", zap.Error(err))
|
||||
|
||||
return errors.New("failed to forward message")
|
||||
}
|
||||
|
||||
if err := lc.SendMail(session.from, session.to, msg); err != nil {
|
||||
session.b.Log.Error("failed to forward message to LMTP server", zap.Error(err))
|
||||
session.log.Error("failed to forward message to LMTP server", zap.Error(err))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
session.b.Log.Debug("message forwarded to LMTP server")
|
||||
session.log.Debug("message forwarded to LMTP server")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue