任务3:实现 SMTP 服务 数据库版本
1. 项目背景
该项目基于 TIO 框架,提供了一个完整的 SMTP 服务实现,支持以下特性:
- 使用 SMTP 协议处理邮件收发
- 支持 EHLO/HELO、AUTH LOGIN、MAIL FROM、RCPT TO、DATA、RSET、QUIT 等命令
- 将接收的邮件内容持久化到数据库(通过
MailboxService
) - 多线程、高并发处理,使用 TIO 进行异步网络通信
2. 启动配置
package com.tio.mail.wing.config;
import java.io.IOException;
import com.litongjava.tio.server.ServerTioConfig;
import com.litongjava.tio.server.TioServer;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.tio.mail.wing.handler.SmtpServerAioHandler;
import com.tio.mail.wing.listener.SmtpServerAioListener;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SmtpServerConfig {
public void startSmtpServer() {
SmtpServerAioHandler serverHandler = new SmtpServerAioHandler();
SmtpServerAioListener serverListener = new SmtpServerAioListener();
ServerTioConfig serverTioConfig = new ServerTioConfig("smtp-server");
serverTioConfig.checkAttacks = false;
serverTioConfig.ignoreDecodeFail = true;
serverTioConfig.setServerAioHandler(serverHandler);
serverTioConfig.setServerAioListener(serverListener);
serverTioConfig.setHeartbeatTimeout(-1);
serverTioConfig.setWorkerThreads(4);
TioServer tioServer = new TioServer(serverTioConfig);
try {
int port = EnvUtils.getInt("mail.server.smtp.port", 25);
tioServer.start(null, port);
log.info("Started SMTP server on port: {}", port);
} catch (IOException e) {
log.error("Failed to start SMTP server", e);
}
}
}
- ServerTioConfig:配置 TIO 服务的基本属性,如线程数、心跳、解码失败策略等。
- EnvUtils:从环境变量或配置文件读取
mail.server.smtp.port
,默认端口 25。 - 启动时,会输出日志:
Started SMTP server on port: xxx
。
3. 会话上下文
所有与单个连接相关的状态都保存在 SmtpSessionContext
中:
package com.tio.mail.wing.handler;
import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class SmtpSessionContext {
// SMTP 会话状态机
public enum State {
/** 初始连接,等待 HELO/EHLO */
CONNECTED,
/** 已问候,等待认证或邮件事务 */
GREETED,
/** 已发送 AUTH LOGIN,等待 Base64 用户名 */
AUTH_WAIT_USERNAME,
/** 已收到用户名,等待 Base64 密码 */
AUTH_WAIT_PASSWORD,
/** 已收到 MAIL FROM,等待 RCPT TO 或 DATA */
MAIL_FROM_RECEIVED,
/** 已收到 RCPT TO,等待更多 RCPT TO 或 DATA */
RCPT_TO_RECEIVED,
/** 正在接收邮件内容 */
DATA_RECEIVING,
/** 准备关闭 */
QUIT
}
private State state = State.CONNECTED;
private boolean authenticated = false;
private String username; // 认证后的用户名
private Long userId;
// 用于一封邮件的临时数据
private String fromAddress;
private List<String> toAddresses = new ArrayList<>();
private StringBuilder mailContent = new StringBuilder();
/**
* 重置邮件事务状态,以便在同一连接中发送下一封邮件
*/
public void resetTransaction() {
this.fromAddress = null;
this.toAddresses.clear();
this.mailContent.setLength(0);
// 认证状态保留,但事务状态回到 GREETED
this.state = State.GREETED;
}
}
- State:定义了 SMTP 会话的各个阶段。
- authenticated:认证结果标志。
- mailContent:在 DATA 阶段临时存储邮件正文,直到接收到单行 “
.
” 为止。 - resetTransaction():用于在同一连接中连续处理多封邮件。
4. 连接监听器
通过实现 ServerAioListener
,在客户端连接/断开时发送和记录日志:
package com.tio.mail.wing.listener;
import com.litongjava.aio.Packet;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.server.intf.ServerAioListener;
import com.tio.mail.wing.handler.SmtpSessionContext;
import com.tio.mail.wing.packet.SmtpPacket;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SmtpServerAioListener implements ServerAioListener {
@Override
public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception {
if (isConnected) {
log.info("SMTP client connected: {}", channelContext.getClientNode());
// 1. 创建会话上下文
SmtpSessionContext sessionContext = new SmtpSessionContext();
channelContext.set("sessionContext", sessionContext);
// 2. 发送欢迎消息 (220)
String reply = 220 + " " + "tio-mail-wing ESMTP Service ready\r\n";
Tio.send(channelContext, new SmtpPacket(reply));
log.info("SMTP >>> 220 Welcome message sent to {}", channelContext.getClientNode());
}
}
@Override
public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception {
log.info("SMTP client disconnected: {}", channelContext.getClientNode());
}
// 其他方法可按需扩展
@Override public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception {}
@Override public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception {}
@Override public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception {}
@Override public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception {}
@Override public boolean onHeartbeatTimeout(ChannelContext channelContext, Long interval, int heartbeatTimeoutCount) { return false; }
}
- onAfterConnected:创建
SmtpSessionContext
并发送220 Service ready
欢迎消息。 - onBeforeClose:记录断开日志。
- 其它回调方法留空,可根据需求添加流量统计、解码检查等。
5. 数据报文解码与分发
SmtpServerAioHandler
负责将字节流解码为一行行的 SMTP 命令,并分发到 SmtpService
:
package com.tio.mail.wing.handler;
import java.nio.ByteBuffer;
import com.litongjava.aio.Packet;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.core.TioConfig;
import com.litongjava.tio.core.exception.LengthOverflowException;
import com.litongjava.tio.core.exception.TioDecodeException;
import com.litongjava.tio.core.utils.ByteBufferUtils;
import com.litongjava.tio.server.intf.ServerAioHandler;
import com.tio.mail.wing.packet.SmtpPacket;
import com.tio.mail.wing.service.SmtpService;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SmtpServerAioHandler implements ServerAioHandler {
private SmtpService smtpService = Aop.get(SmtpService.class);
@Override
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws TioDecodeException {
String charset = channelContext.getTioConfig().getCharset();
String line = null;
try {
line = ByteBufferUtils.readLine(buffer, charset);
} catch (LengthOverflowException e) {
e.printStackTrace();
}
if (line == null) {
return null;
}
return new SmtpPacket(line);
}
@Override
public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
String charset = tioConfig.getCharset();
SmtpPacket smtpPacket = (SmtpPacket) packet;
try {
return ByteBuffer.wrap(smtpPacket.getLine().getBytes(charset));
} catch (Exception e) {
log.error("Encoding error", e);
return null;
}
}
@Override
public void handler(Packet packet, ChannelContext ctx) throws Exception {
SmtpPacket smtpPacket = (SmtpPacket) packet;
String line = smtpPacket.getLine().trim();
log.info("SMTP <<< {}", line);
SmtpSessionContext session = (SmtpSessionContext) ctx.get("sessionContext");
// DATA 模式下的特殊处理
if (session.getState() == SmtpSessionContext.State.DATA_RECEIVING) {
String reply = smtpService.handleDataReceiving(line, session);
if (reply != null) {
Tio.send(ctx, new SmtpPacket(reply));
}
return;
}
String[] parts = line.split("\\s+", 2);
String command = parts[0].toUpperCase();
String reply;
switch (command) {
case "HELO":
case "EHLO":
reply = smtpService.handleEhlo(parts, session);
break;
case "AUTH":
reply = smtpService.handleAuth(parts, session);
break;
case "MAIL":
reply = smtpService.handleMail(line, session);
break;
case "RCPT":
reply = smtpService.handleRcpt(line, session);
break;
case "DATA":
reply = smtpService.handleData(session);
break;
case "QUIT":
reply = smtpService.handleQuit(session);
if (reply != null) {
Tio.send(ctx, new SmtpPacket(reply));
}
Tio.close(ctx, "quit");
return;
case "RSET":
reply = smtpService.handleRset(session);
break;
default:
// 处理 AUTH 阶段的 Base64 数据
if (session.getState() == SmtpSessionContext.State.AUTH_WAIT_USERNAME
|| session.getState() == SmtpSessionContext.State.AUTH_WAIT_PASSWORD) {
reply = smtpService.handleAuthData(line, session);
} else {
reply = 500 + " Command not recognized\r\n";
}
}
if (reply != null) {
Tio.send(ctx, new SmtpPacket(reply));
}
}
}
- decode/encode:把 TCP 流按行拆分、封装/解封为
SmtpPacket
。 - handler:根据当前状态和命令分发到对应的
SmtpService
方法。
6. 核心业务逻辑
SmtpService
中实现了各个 SMTP 命令的处理,并借助 MwUserService
和 MailboxService
完成用户认证与邮件持久化。
package com.tio.mail.wing.service;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.UUID;
import com.litongjava.jfinal.aop.Aop;
import com.tio.mail.wing.handler.SmtpSessionContext;
public class SmtpService {
private final MwUserService userService = Aop.get(MwUserService.class);
private final MailboxService mailboxService = Aop.get(MailboxService.class);
public String handleEhlo(String[] parts, SmtpSessionContext session) {
if (session.getState() != SmtpSessionContext.State.CONNECTED) {
return "503 Bad sequence of commands\r\n";
}
String domain = parts.length > 1 ? parts[1] : "unknown";
session.setState(SmtpSessionContext.State.GREETED);
StringBuilder sb = new StringBuilder();
sb.append("250-tio-mail-wing says hello to ").append(domain).append("\r\n");
sb.append("250 AUTH LOGIN\r\n");
return sb.toString();
}
public String handleAuth(String[] parts, SmtpSessionContext session) {
if (session.getState() != SmtpSessionContext.State.GREETED) {
return "503 Bad sequence of commands\r\n";
}
if (parts.length > 1 && "LOGIN".equalsIgnoreCase(parts[1])) {
session.setState(SmtpSessionContext.State.AUTH_WAIT_USERNAME);
String challenge = Base64.getEncoder().encodeToString("Username:".getBytes(StandardCharsets.UTF_8));
return "334 " + challenge + "\r\n";
} else {
return "504 Authentication mechanism not supported\r\n";
}
}
public String handleAuthData(String line, SmtpSessionContext session) {
try {
String decoded = new String(Base64.getDecoder().decode(line), StandardCharsets.UTF_8);
if (session.getState() == SmtpSessionContext.State.AUTH_WAIT_USERNAME) {
session.setUsername(decoded);
session.setState(SmtpSessionContext.State.AUTH_WAIT_PASSWORD);
String challenge = Base64.getEncoder().encodeToString("Password:".getBytes(StandardCharsets.UTF_8));
return "334 " + challenge + "\r\n";
} else if (session.getState() == SmtpSessionContext.State.AUTH_WAIT_PASSWORD) {
Long userId = userService.authenticate(session.getUsername(), decoded);
if (userId != null) {
session.setAuthenticated(true);
session.setUserId(userId);
session.setState(SmtpSessionContext.State.GREETED);
return "235 Authentication successful\r\n";
} else {
session.resetTransaction();
session.setState(SmtpSessionContext.State.GREETED);
return "535 Authentication failed\r\n";
}
}
} catch (Exception e) {
session.resetTransaction();
session.setState(SmtpSessionContext.State.GREETED);
return "501 Invalid base64 data\r\n";
}
return "501 Authentication sequence error\r\n";
}
public String handleMail(String line, SmtpSessionContext session) {
if (session.getState() != SmtpSessionContext.State.GREETED || !session.isAuthenticated()) {
return "503 Bad sequence of commands or not authenticated\r\n";
}
int start = line.indexOf('<'), end = line.lastIndexOf('>');
if (start < 0 || end <= start + 1) {
return "501 Invalid address\r\n";
}
session.setFromAddress(line.substring(start + 1, end));
session.setState(SmtpSessionContext.State.MAIL_FROM_RECEIVED);
return "250 OK\r\n";
}
public String handleRcpt(String line, SmtpSessionContext session) {
SmtpSessionContext.State st = session.getState();
if (st != SmtpSessionContext.State.MAIL_FROM_RECEIVED && st != SmtpSessionContext.State.RCPT_TO_RECEIVED) {
return "503 Bad sequence of commands\r\n";
}
int start = line.indexOf('<'), end = line.lastIndexOf('>');
String to = (start >= 0 && end > start + 1) ? line.substring(start + 1, end) : "";
if (userService.userExists(to)) {
session.getToAddresses().add(to);
session.setState(SmtpSessionContext.State.RCPT_TO_RECEIVED);
return "250 OK\r\n";
} else {
return "550 No such user here\r\n";
}
}
public String handleData(SmtpSessionContext session) {
if (session.getState() != SmtpSessionContext.State.RCPT_TO_RECEIVED) {
return "503 Bad sequence of commands\r\n";
}
session.setState(SmtpSessionContext.State.DATA_RECEIVING);
return "354 Start mail input; end with <CRLF>.<CRLF>\r\n";
}
public String handleDataReceiving(String line, SmtpSessionContext session) {
if (".".equals(line)) {
String mailData = session.getMailContent().toString();
for (String recipient : session.getToAddresses()) {
mailboxService.saveEmail(recipient, mailData);
}
String id = UUID.randomUUID().toString();
session.resetTransaction();
return "250 OK: queued as " + id + "\r\n";
} else {
session.getMailContent().append(line).append("\r\n");
return null;
}
}
public String handleRset(SmtpSessionContext session) {
session.resetTransaction();
return "250 OK\r\n";
}
public String handleQuit(SmtpSessionContext session) {
session.setState(SmtpSessionContext.State.QUIT);
return "221 Bye\r\n";
}
}
- handleEhlo:返回多行
250-
响应,支持AUTH LOGIN
。 - handleAuth / handleAuthData:使用 Base64 交互用户名/密码,并调用
MwUserService
完成认证。 - handleMail / handleRcpt / handleData / handleDataReceiving:处理邮件事务,最终调用
mailboxService.saveEmail(...)
将邮件存入数据库。 - handleRset:重置当前邮件事务。
- handleQuit:关闭会话。
7. 启动与测试
在
application.properties
(或环境变量)中配置:mail.server.smtp.port=25
在 Spring 或容器启动时,调用:
new SmtpServerConfig().startSmtpServer();
使用常见的 SMTP 客户端(如
telnet
、openssl s_client
或真实邮箱软件)连接测试:# 以 telnet 示例 telnet localhost 25 EHLO example.com AUTH LOGIN <Base64-用户名> <Base64-密码> MAIL FROM:<alice@example.com> RCPT TO:<bob@example.com> DATA Subject: Test Hello, world! . QUIT
8. 小结
本文档全面介绍了基于 TIO 的 SMTP 服务实现,包括连接管理、命令解析、状态机、认证和邮件持久化流程。
核心组件:
- SmtpServerConfig:服务启动配置
- SmtpSessionContext:会话状态与事务数据
- SmtpServerAioListener:连接/断开事件钩子
- SmtpServerAioHandler:IO 解码/分发
- SmtpService:各命令业务实现
通过以上结构化设计,你可以快速定位各功能模块,按需扩展(如添加 TLS 支持、多机制认证、垃圾邮件过滤等)。祝开发顺利!