使用Rabbitmq实现异步营销邮件发送
营销邮件往往是需要向客户发送大量的邮件,这个过程要消耗大量的时间,不可能让用户一直去等,因此需要设置为异步的。但设置为异步的又要考虑邮件的发送状态,比如如何知道其是否全部发送完毕,需要考虑的因素较多,这里使用的是Rabbitmq,使用消息队列来异步处理邮件发送的问题。
一、两种邮件发送方式
在讨论如何实现异步发送前,我们先来看看如何用Java实现邮件发送。
对于Java而言,实现邮件发送的方式主要有两个:
1.使用 MimeMessageHelper 发送邮件
MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage, true);
//邮件发信人
mimeMessageHelper.setFrom(mailSender);
//邮件收信人 1或多个
mimeMessageHelper.setTo(to.split(","));
//邮件主题
mimeMessageHelper.setSubject(subject);
//邮件内容
mimeMessageHelper.setText(text, true);
//邮件发送时间
mimeMessageHelper.setSentDate(new Date());
//发送邮件
javaMailSender.send(mimeMessageHelper.getMimeMessage());
这种方式实现相对简单,不需要设置太多的参数,但灵活性相对较差,该方法我仅仅用来发送事务性邮件(因为发送事务性邮件发送方的配置比较固定,本身不需要太多设置)。
2.使用 Transport.send() 发送邮件
final Properties props = new Properties();
// 表示SMTP发送邮件,需要进行身份验证
props.put("mail.smtp.auth", "true");
props.put("mail.smtp.host", smtpServer);
//加密方式:
props.put("mail.smtp.socketFactory.class", "javax.net.ssl.SSLSocketFactory");
props.put("mail.smtp.socketFactory.port", smtpPort);
props.put("mail.smtp.port", smtpPort);
props.put("mail.smtp.from", mailSender);
props.put("mail.user", mailSender);
props.put("mail.password", password);
props.setProperty("mail.smtp.ssl.enable", "true");
// 构建授权信息,用于进行SMTP进行身份验证
Authenticator authenticator = new Authenticator() {
@Override
protected PasswordAuthentication getPasswordAuthentication() {
// 用户名、密码
String userName = props.getProperty("mail.user");
String password = props.getProperty("mail.password");
return new PasswordAuthentication(userName, password);
}
};
// 使用环境属性和授权信息,创建邮件会话
Session mailSession = Session.getInstance(props, authenticator);
final String messageIDValue = genMessageID(props.getProperty("mail.user"));
//创建邮件消息
MimeMessage message = new MimeMessage(mailSession) {
@Override
protected void updateMessageID() throws MessagingException {
//设置自定义Message-ID值
setHeader("Message-ID", messageIDValue);//创建Message-ID
}
};
try {
InternetAddress from = new InternetAddress(mailSender, mailSender);
message.setFrom(from);
message.setSentDate(new Date()); // 设置时间
//设置邮件标题
message.setSubject(subject);
message.setContent(content, "text/html");
message.setRecipients(Message.RecipientType.TO, to);
// 发送邮件
Transport.send(message);
} catch (MessagingException | UnsupportedEncodingException e) {
e.printStackTrace();
throw new CusobException(ResultCodeEnum.EMAIL_SEND_FAIL);
}
可以看出,这种发送方式是相对比较复杂的,因为要设置smtpserver(这里为我们自建的邮件服务器的smtp server),smtp port(这里使用的是标准TLS,端口为463),mailSender和Password(前者就是发送方Sender,这里的Password是邮件发送的密码,一般由邮件服务提供商提供,也可由我们自己的邮件服务器提供,如果配置为我们的企业邮箱,那么该密码由我们提供)。
最终选择的策略是第二种,因为客户使用我们的产品自然需要使用我们的代发,对于代发而言第二种是更合适的,因为其可以通过指定smtp server来有选择的控制去使用我们自己的邮件服务器。
二、Rabbitmq简介
RabbitMQ是一款流行的开源消息代理(message broker)软件,主要用于实现消息的异步传递和分布式系统之间的通信。它基于AMQP标准,实现了一个强大、灵活且可扩展的消息传递系统。
1.核心概念
- Producer(生产者):消息的发送方,负责将消息发送到交换器(Exchange)。
- Consumer(消费者):消息的接收方,从队列(Queue)中读取和处理消息。
- Message(消息):数据的载体,由生产者发送并由消费者接收处理。
- Queue(队列):用于存储消息的缓冲区,消费者从队列中读取消息。
- Exchange(交换器):接收来自生产者的消息,并根据绑定规则将消息路由到一个或多个队列。常见的交换器类型包括:
- direct:根据消息的路由键将消息发送到绑定的队列。
- fanout:将消息广播到所有绑定的队列。
- topic:根据路由键模式匹配将消息发送到绑定的队列。
- headers:根据消息头属性进行路由。
- Binding(绑定):交换器和队列之间的连接,定义了消息如何从交换器路由到队列的规则。
- Connection(连接):应用程序和RabbitMQ之间的TCP连接。
- Channel(信道):通过一个单一的TCP连接建立的轻量级连接,进行消息的读写操作。
2.工作原理
第一步,消息发送:生产者将消息发送到交换器,交换器根据绑定规则将消息路由到一个或多个队列。
第二步,消息存储:消息到达队列后存储在队列中,等待消费者处理。
第三步,消息接收:消费者从队列中读取消息,处理后确认消息,从队列中移除。
三、异步实现营销邮件发送
在经过前两步的铺垫后,现在可以对整个流程进行梳理:
1.每个客户在进行完一个营销邮件活动发送时,所有的需要发送的信件(包含收件方、发送方、主题、正文内容等)都作为待消费的服务,进入到rabbitmq的队列中进行依次消费。
2.发送方能够选择发送的时间,也就是需要控制进入mq的时间。
3.需要更新相关的记录,如发送的历史记录等。
下面是核心代码:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = MqConst.QUEUE_MASS_MAILING, durable = "true"),
exchange = @Exchange(value = MqConst.EXCHANGE_MAIL_DIRECT),
key = {MqConst.ROUTING_MASS_MAILING}
))
public void massMailing(Campaign campaign, Message message, Channel channel) throws IOException {
try {
if (campaign!=null){
Date sendTime = campaign.getSendTime();
Date now = new Date();
if (now.after(sendTime)){
campaignService.MassMailing(campaign);
campaignService.updateStatus(campaign.getId(), Campaign.COMPLETED); //更新发送状态
}else {
long delay = sendTime.getTime() - now.getTime();
ScheduledThreadPoolExecutor executor =
new ScheduledThreadPoolExecutor(2, new ThreadPoolExecutor.CallerRunsPolicy());
executor.schedule(() -> {
campaignService.MassMailing(campaign);
campaignService.updateStatus(campaign.getId(), Campaign.COMPLETED);
}, delay, TimeUnit.MILLISECONDS);
executor.shutdown();
}
}
} catch (Exception e) {
System.out.println(e);
} finally {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
在@RabbitListener中,
value:指定了队列的名称
exchange:指定了交换机的名称,以及使用默认的交换机类型direct
key:指定路由键名称
这里的定时方案采用的是ScheduledThreadPoolExecutor类来实现,它专门用于调度和执行延迟任务或周期性任务。其构造函数接受两个参数:核心线程池大小和拒绝策略(
指在任务提交到线程池时,如果线程池已经达到饱和状态所采取的策略)。在该方案中,核心线程池大小为2,这意味着线程池中会保持至少2个线程,即使它们处于空闲状态。使用
的CallerRunsPolicy表示务将由提交任务的线程(调用者线程)来运行。
值得一提的是,在这里的finally进行了手动确认消息的行为,这是防止当发送邮件出现问题,或者收件方信息有问题时,该服务一直堵塞在该处导致其他邮件都无法发送,
从而带来严重的问题。当有了手动确认后,该邮件不管是发送成功还是失败,其都不会尝试再次发送。但这并不是唯一的解决方案,这只是比较便捷的解决方案。
如果需要设置最大尝试次数,只有达到该次数时才手动确认,那么可以使用死信队列,可以起到同样的效果。