Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server-side CHUNKING and BINARYMIME support #106

Merged
merged 3 commits into from
Jul 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,20 @@ type Backend interface {
AnonymousLogin(state *ConnectionState) (Session, error)
}

type BodyType string

const (
Body7Bit BodyType = "7BIT"
Body8BitMIME BodyType = "8BITMIME"
BodyBinaryMIME BodyType = "BINARYMIME"
)

// MailOptions contains custom arguments that were
// passed as an argument to the MAIL command.
type MailOptions struct {
// Value of BODY= argument, 7BIT, 8BITMIME or BINARYMIME.
Body BodyType

// Size of the body. Can be 0 if not specified by client.
Size int

Expand Down
258 changes: 230 additions & 28 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,19 @@ type ConnectionState struct {
}

type Conn struct {
conn net.Conn
text *textproto.Conn
server *Server
helo string
nbrErrors int
session Session
locker sync.Mutex
conn net.Conn
text *textproto.Conn
server *Server
helo string
nbrErrors int
session Session
locker sync.Mutex
binarymime bool

bdatPipe *io.PipeWriter
bdatStatus *statusCollector // used for BDAT on LMTP
dataResult chan error
bytesReceived int // counts total size of chunks when BDAT is used

fromReceived bool
recipients []string
Expand Down Expand Up @@ -133,6 +139,8 @@ func (c *Conn) handle(cmd string, arg string) {
case "RSET": // Reset session
c.reset()
c.WriteResponse(250, EnhancedCode{2, 0, 0}, "Session reset")
case "BDAT":
c.handleBdat(arg)
case "DATA":
c.handleData(arg)
case "QUIT":
Expand Down Expand Up @@ -169,9 +177,17 @@ func (c *Conn) SetSession(session Session) {
}

func (c *Conn) Close() error {
if session := c.Session(); session != nil {
session.Logout()
c.SetSession(nil)
c.locker.Lock()
defer c.locker.Unlock()

if c.bdatPipe != nil {
c.bdatPipe.CloseWithError(ErrDataReset)
c.bdatPipe = nil
}

if c.session != nil {
c.session.Logout()
c.session = nil
}

return c.conn.Close()
Expand Down Expand Up @@ -245,6 +261,9 @@ func (c *Conn) handleGreet(enhanced bool, arg string) {
if _, isTLS := c.TLSConnectionState(); isTLS && c.server.EnableREQUIRETLS {
caps = append(caps, "REQUIRETLS")
}
if c.server.EnableBINARYMIME {
caps = append(caps, "BINARYMIME")
}
if c.server.MaxMessageBytes > 0 {
caps = append(caps, fmt.Sprintf("SIZE %v", c.server.MaxMessageBytes))
}
Expand All @@ -261,6 +280,10 @@ func (c *Conn) handleMail(arg string) {
c.WriteResponse(502, EnhancedCode{2, 5, 1}, "Please introduce yourself first.")
return
}
if c.bdatPipe != nil {
c.WriteResponse(502, EnhancedCode{5, 5, 1}, "MAIL not allowed during message transfer")
return
}

if c.Session() == nil {
state := c.State()
Expand Down Expand Up @@ -297,6 +320,7 @@ func (c *Conn) handleMail(arg string) {

opts := MailOptions{}

c.binarymime = false
// This is where the Conn may put BODY=8BITMIME, but we already
// read the DATA as bytes, so it does not effect our processing.
if len(fromArgs) > 1 {
Expand Down Expand Up @@ -335,11 +359,18 @@ func (c *Conn) handleMail(arg string) {
opts.RequireTLS = true
case "BODY":
switch value {
case "BINARYMIME":
if !c.server.EnableBINARYMIME {
c.WriteResponse(504, EnhancedCode{5, 5, 4}, "BINARYMIME is not implemented")
return
}
c.binarymime = true
case "7BIT", "8BITMIME":
default:
c.WriteResponse(500, EnhancedCode{5, 5, 4}, "Unknown BODY value")
return
}
opts.Body = BodyType(value)
case "AUTH":
value, err := decodeXtext(value)
if err != nil {
Expand Down Expand Up @@ -433,6 +464,10 @@ func (c *Conn) handleRcpt(arg string) {
c.WriteResponse(502, EnhancedCode{5, 5, 1}, "Missing MAIL FROM command.")
return
}
if c.bdatPipe != nil {
c.WriteResponse(502, EnhancedCode{5, 5, 1}, "RCPT not allowed during message transfer")
return
}

if (len(arg) < 4) || (strings.ToUpper(arg[0:3]) != "TO:") {
c.WriteResponse(501, EnhancedCode{5, 5, 2}, "Was expecting RCPT arg syntax of TO:<address>")
Expand Down Expand Up @@ -583,6 +618,14 @@ func (c *Conn) handleData(arg string) {
c.WriteResponse(501, EnhancedCode{5, 5, 4}, "DATA command should not have any arguments")
return
}
if c.bdatPipe != nil {
c.WriteResponse(502, EnhancedCode{5, 5, 1}, "DATA not allowed during message transfer")
return
}
if c.binarymime {
c.WriteResponse(502, EnhancedCode{5, 5, 1}, "DATA not allowed for BINARYMIME messages")
return
}

if !c.fromReceived || len(c.recipients) == 0 {
c.WriteResponse(502, EnhancedCode{5, 5, 1}, "Missing RCPT TO command.")
Expand All @@ -604,7 +647,175 @@ func (c *Conn) handleData(arg string) {
r.limited = false
io.Copy(ioutil.Discard, r) // Make sure all the data has been consumed
c.WriteResponse(code, enhancedCode, msg)
}

func (c *Conn) handleBdat(arg string) {
args := strings.Fields(arg)
if len(args) == 0 {
c.WriteResponse(501, EnhancedCode{5, 5, 4}, "Missing chunk size argument")
return
}
if len(args) > 2 {
c.WriteResponse(501, EnhancedCode{5, 5, 4}, "Too many arguments")
return
}

if !c.fromReceived || len(c.recipients) == 0 {
c.WriteResponse(502, EnhancedCode{5, 5, 1}, "Missing RCPT TO command.")
return
}

last := false
if len(args) == 2 {
if !strings.EqualFold(args[1], "LAST") {
c.WriteResponse(501, EnhancedCode{5, 5, 4}, "Unknown BDAT argument")
return
}
last = true
}

// ParseUint instead of Atoi so we will not accept negative values.
size, err := strconv.ParseUint(args[0], 10, 32)
if err != nil {
c.WriteResponse(501, EnhancedCode{5, 5, 4}, "Malformed size argument")
return
}

if c.server.MaxMessageBytes != 0 && c.bytesReceived+int(size) > c.server.MaxMessageBytes {
c.WriteResponse(552, EnhancedCode{5, 3, 4}, "Max message size exceeded")

// Discard chunk itself without passing it to backend.
io.Copy(ioutil.Discard, io.LimitReader(c.text.R, int64(size)))

c.reset()
return
}

if c.bdatStatus == nil && c.server.LMTP {
c.bdatStatus = c.createStatusCollector()
}

if c.bdatPipe == nil {
var r *io.PipeReader
r, c.bdatPipe = io.Pipe()

c.dataResult = make(chan error, 1)

go func() {
defer func() {
if err := recover(); err != nil {
c.handlePanic(err, c.bdatStatus)

c.dataResult <- errPanic
r.CloseWithError(errPanic)
}
}()

var err error
if !c.server.LMTP {
err = c.Session().Data(r)
} else {
lmtpSession, ok := c.Session().(LMTPSession)
if !ok {
err = c.Session().Data(r)
for _, rcpt := range c.recipients {
c.bdatStatus.SetStatus(rcpt, err)
}
} else {
err = lmtpSession.LMTPData(r, c.bdatStatus)
}
}

c.dataResult <- err
r.CloseWithError(err)
}()
}

chunk := io.LimitReader(c.text.R, int64(size))
_, err = io.Copy(c.bdatPipe, chunk)
if err != nil {
// Backend might return an error early using CloseWithError without consuming
// the whole chunk.
io.Copy(ioutil.Discard, chunk)

c.WriteResponse(toSMTPStatus(err))

if err == errPanic {
c.Close()
}

c.reset()
return
}

c.bytesReceived += int(size)

if last {
c.bdatPipe.Close()

err := <-c.dataResult

if c.server.LMTP {
c.bdatStatus.fillRemaining(err)
for i, rcpt := range c.recipients {
code, enchCode, msg := toSMTPStatus(<-c.bdatStatus.status[i])
c.WriteResponse(code, enchCode, "<"+rcpt+"> "+msg)
}
} else {
code, enhancedCode, msg := toSMTPStatus(err)
c.WriteResponse(code, enhancedCode, msg)
}

if err == errPanic {
c.Close()
return
}

c.reset()
} else {
c.WriteResponse(250, EnhancedCode{2, 0, 0}, "Continue")
}
}

// ErrDataReset is returned by Reader pased to Data function if client does not
// send another BDAT command and instead closes connection or issues RSET command.
var ErrDataReset = errors.New("smtp: message transmission aborted")

var errPanic = &SMTPError{
Code: 421,
EnhancedCode: EnhancedCode{4, 0, 0},
Message: "Internal server error",
}

func (c *Conn) handlePanic(err interface{}, status *statusCollector) {
if status != nil {
status.fillRemaining(errPanic)
}

stack := debug.Stack()
c.server.ErrorLog.Printf("panic serving %v: %v\n%s", c.State().RemoteAddr, err, stack)
}

func (c *Conn) createStatusCollector() *statusCollector {
rcptCounts := make(map[string]int, len(c.recipients))

status := &statusCollector{
statusMap: make(map[string]chan error, len(c.recipients)),
status: make([]chan error, 0, len(c.recipients)),
}
for _, rcpt := range c.recipients {
rcptCounts[rcpt]++
}
// Create channels with buffer sizes necessary to fit all
// statuses for a single recipient to avoid deadlocks.
for rcpt, count := range rcptCounts {
status.statusMap[rcpt] = make(chan error, count)
}
for _, rcpt := range c.recipients {
status.status = append(status.status, status.statusMap[rcpt])
}

return status
}

type statusCollector struct {
Expand Down Expand Up @@ -651,24 +862,7 @@ func (s *statusCollector) SetStatus(rcptTo string, err error) {

func (c *Conn) handleDataLMTP() {
r := newDataReader(c)

rcptCounts := make(map[string]int, len(c.recipients))

status := &statusCollector{
statusMap: make(map[string]chan error, len(c.recipients)),
status: make([]chan error, 0, len(c.recipients)),
}
for _, rcpt := range c.recipients {
rcptCounts[rcpt]++
}
// Create channels with buffer sizes necessary to fit all
// statuses for a single recipient to avoid deadlocks.
for rcpt, count := range rcptCounts {
status.statusMap[rcpt] = make(chan error, count)
}
for _, rcpt := range c.recipients {
status.status = append(status.status, status.statusMap[rcpt])
}
status := c.createStatusCollector()

done := make(chan bool, 1)

Expand Down Expand Up @@ -779,9 +973,17 @@ func (c *Conn) reset() {
c.locker.Lock()
defer c.locker.Unlock()

if c.bdatPipe != nil {
c.bdatPipe.CloseWithError(ErrDataReset)
c.bdatPipe = nil
}
c.bdatStatus = nil
c.bytesReceived = 0

if c.session != nil {
c.session.Reset()
}

c.fromReceived = false
c.recipients = nil
}
6 changes: 5 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type Server struct {
// Should be used only if backend supports it.
EnableREQUIRETLS bool

// Advertise BINARYMIME (RFC 3030) capability.
// Should be used only if backend supports it.
EnableBINARYMIME bool

// If set, the AUTH command will not be advertised and authentication
// attempts will be rejected. This setting overrides AllowInsecureAuth.
AuthDisabled bool
Expand All @@ -78,7 +82,7 @@ func NewServer(be Backend) *Server {
Backend: be,
done: make(chan struct{}, 1),
ErrorLog: log.New(os.Stderr, "smtp/server ", log.LstdFlags),
caps: []string{"PIPELINING", "8BITMIME", "ENHANCEDSTATUSCODES"},
caps: []string{"PIPELINING", "8BITMIME", "ENHANCEDSTATUSCODES", "CHUNKING"},
auths: map[string]SaslServerFactory{
sasl.Plain: func(conn *Conn) sasl.Server {
return sasl.NewPlainServer(func(identity, username, password string) error {
Expand Down
Loading