使用Rabbitmq实现异步营销邮件发送
营销邮件往往是需要向客户发送大量的邮件,这个过程要消耗大量的时间,不可能让用户一直去等,因此需要设置为异步的。但设置为异步的又要考虑邮件的发送状态,比如如何知道其是否全部发送完毕,需要考虑的因素较多,这里使用的是Rabbitmq,使用消息队列来异步处理邮件发送的问题。
一、两种邮件发送方式
在讨论如何实现异步发送前,我们先来看看如何用Java实现邮件发送。
对于Java而言,实现邮件发送的方式主要有两个:
1.使用 MimeMessageHelper
发送邮件
MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage, true);
//邮件发信人
mimeMessageHelper.setFrom(mailSender);
//邮件收信人 1或多个
mimeMessageHelper.setTo(to.split(","));
//邮件主题
mimeMessageHelper.setSubject(subject);
//邮件内容
mimeMessageHelper.setText(text, true);
//邮件发送时间
mimeMessageHelper.setSentDate(new Date());
//发送邮件
javaMailSender.send(mimeMessageHelper.getMimeMessage());
这种方式实现相对简单,不需要设置太多的参数,但灵活性相对较差,该方法我仅仅用来发送事务性邮件(因为发送事务性邮件发送方的配置比较固定,本身不需要太多设置)。
2.使用 Transport.send() 发送邮件
final Properties props = new Properties();
// 表示SMTP发送邮件,需要进行身份验证 props.put("mail.smtp.auth", "true"); props.put("mail.smtp.host", smtpServer); //加密方式: props.put("mail.smtp.socketFactory.class", "javax.net.ssl.SSLSocketFactory"); props.put("mail.smtp.socketFactory.port", smtpPort); props.put("mail.smtp.port", smtpPort); props.put("mail.smtp.from", mailSender); props.put("mail.user", mailSender); props.put("mail.password", password); props.setProperty("mail.smtp.ssl.enable", "true"); // 构建授权信息,用于进行SMTP进行身份验证 Authenticator authenticator = new Authenticator() { @Override protected PasswordAuthentication getPasswordAuthentication() { // 用户名、密码 String userName = props.getProperty("mail.user"); String password = props.getProperty("mail.password"); return new PasswordAuthentication(userName, password); } }; // 使用环境属性和授权信息,创建邮件会话 Session mailSession = Session.getInstance(props, authenticator); final String messageIDValue = genMessageID(props.getProperty("mail.user")); //创建邮件消息 MimeMessage message = new MimeMessage(mailSession) { @Override protected void updateMessageID() throws MessagingException { //设置自定义Message-ID值 setHeader("Message-ID", messageIDValue);//创建Message-ID } }; try { InternetAddress from = new InternetAddress(mailSender, mailSender); message.setFrom(from); message.setSentDate(new Date()); // 设置时间 //设置邮件标题 message.setSubject(subject); message.setContent(content, "text/html"); message.setRecipients(Message.RecipientType.TO, to); // 发送邮件 Transport.send(message); } catch (MessagingException | UnsupportedEncodingException e) { e.printStackTrace(); throw new CusobException(ResultCodeEnum.EMAIL_SEND_FAIL); }
可以看出,这种发送方式是相对比较复杂的,因为要设置smtpserver(这里为我们自建的邮件服务器的smtp server),smtp port(这里使用的是标准TLS,端口为463),mailSender和Password(前者就是发送方Sender,这里的Password是邮件发送的密码,一般由邮件服务提供商提供,也可由我们自己的邮件服务器提供,如果配置为我们的企业邮箱,那么该密码由我们提供)。
最终选择的策略是第二种,因为客户使用我们的产品自然需要使用我们的代发,对于代发而言第二种是更合适的,因为其可以通过指定smtp server来有选择的控制去使用我们自己的邮件服务器。
二、Rabbitmq简介
RabbitMQ是一款流行的开源消息代理(message broker)软件,主要用于实现消息的异步传递和分布式系统之间的通信。它基于AMQP标准,实现了一个强大、灵活且可扩展的消息传递系统。
1.核心概念
- Producer(生产者):消息的发送方,负责将消息发送到交换器(Exchange)。
- Consumer(消费者):消息的接收方,从队列(Queue)中读取和处理消息。
- Message(消息):数据的载体,由生产者发送并由消费者接收处理。
- Queue(队列):用于存储消息的缓冲区,消费者从队列中读取消息。
- Exchange(交换器):接收来自生产者的消息,并根据绑定规则将消息路由到一个或多个队列。常见的交换器类型包括:
- direct:根据消息的路由键将消息发送到绑定的队列。
- fanout:将消息广播到所有绑定的队列。
- topic:根据路由键模式匹配将消息发送到绑定的队列。
- headers:根据消息头属性进行路由。
- Binding(绑定):交换器和队列之间的连接,定义了消息如何从交换器路由到队列的规则。
- Connection(连接):应用程序和RabbitMQ之间的TCP连接。
- Channel(信道):通过一个单一的TCP连接建立的轻量级连接,进行消息的读写操作。
2.工作原理
第一步,消息发送:生产者将消息发送到交换器,交换器根据绑定规则将消息路由到一个或多个队列。
第二步,消息存储:消息到达队列后存储在队列中,等待消费者处理。
第三步,消息接收:消费者从队列中读取消息,处理后确认消息,从队列中移除。
三、异步实现营销邮件发送
在经过前两步的铺垫后,现在可以对整个流程进行梳理:
1.每个客户在进行完一个营销邮件活动发送时,所有的需要发送的信件(包含收件方、发送方、主题、正文内容等)都作为待消费的服务,进入到rabbitmq的队列中进行依次消费。
2.发送方能够选择发送的时间,也就是需要控制进入mq的时间。
3.需要更新相关的记录,如发送的历史记录等。
下面是核心代码:
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = MqConst.QUEUE_MASS_MAILING, durable = "true"), exchange = @Exchange(value = MqConst.EXCHANGE_MAIL_DIRECT), key = {MqConst.ROUTING_MASS_MAILING} )) public void massMailing(Campaign campaign, Message message, Channel channel) throws IOException { try { if (campaign!=null){ Date sendTime = campaign.getSendTime(); Date now = new Date(); if (now.after(sendTime)){ campaignService.MassMailing(campaign); campaignService.updateStatus(campaign.getId(), Campaign.COMPLETED); //更新发送状态 }else { long delay = sendTime.getTime() - now.getTime(); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, new ThreadPoolExecutor.CallerRunsPolicy()); executor.schedule(() -> { campaignService.MassMailing(campaign); campaignService.updateStatus(campaign.getId(), Campaign.COMPLETED); }, delay, TimeUnit.MILLISECONDS); executor.shutdown(); } } } catch (Exception e) { System.out.println(e); } finally { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
在@RabbitListener中,
value:指定了队列的名称
exchange:指定了交换机的名称,以及使用默认的交换机类型direct
key:指定路由键名称
这里的定时方案采用的是ScheduledThreadPoolExecutor类来实现,它专门用于调度和执行延迟任务或周期性任务。其构造函数接受两个参数:核心线程池大小和拒绝策略(
指在任务提交到线程池时,如果线程池已经达到饱和状态所采取的策略)。在该方案中,核心线程池大小为2,这意味着线程池中会保持至少2个线程,即使它们处于空闲状态。使用
的CallerRunsPolicy表示务将由提交任务的线程(调用者线程)来运行。
值得一提的是,在这里的finally进行了手动确认消息的行为,这是防止当发送邮件出现问题,或者收件方信息有问题时,该服务一直堵塞在该处导致其他邮件都无法发送,
从而带来严重的问题。当有了手动确认后,该邮件不管是发送成功还是失败,其都不会尝试再次发送。但这并不是唯一的解决方案,这只是比较便捷的解决方案。
如果需要设置最大尝试次数,只有达到该次数时才手动确认,那么可以使用死信队列,可以起到同样的效果。