使用Rabbitmq实现异步营销邮件发送

使用Rabbitmq实现异步营销邮件发送

营销邮件往往是需要向客户发送大量的邮件,这个过程要消耗大量的时间,不可能让用户一直去等,因此需要设置为异步的。但设置为异步的又要考虑邮件的发送状态,比如如何知道其是否全部发送完毕,需要考虑的因素较多,这里使用的是Rabbitmq,使用消息队列来异步处理邮件发送的问题。

一、两种邮件发送方式

在讨论如何实现异步发送前,我们先来看看如何用Java实现邮件发送。

对于Java而言,实现邮件发送的方式主要有两个:

1.使用 MimeMessageHelper 发送邮件

MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage, true);
//邮件发信人
mimeMessageHelper.setFrom(mailSender);
//邮件收信人 1或多个
mimeMessageHelper.setTo(to.split(","));
//邮件主题
mimeMessageHelper.setSubject(subject);
//邮件内容
mimeMessageHelper.setText(text, true);
//邮件发送时间
mimeMessageHelper.setSentDate(new Date());
//发送邮件
javaMailSender.send(mimeMessageHelper.getMimeMessage());

这种方式实现相对简单,不需要设置太多的参数,但灵活性相对较差,该方法我仅仅用来发送事务性邮件(因为发送事务性邮件发送方的配置比较固定,本身不需要太多设置)。

2.使用 Transport.send() 发送邮件

final Properties props = new Properties();
// 表示SMTP发送邮件,需要进行身份验证
props.put("mail.smtp.auth", "true");
props.put("mail.smtp.host", smtpServer);
//加密方式:
props.put("mail.smtp.socketFactory.class", "javax.net.ssl.SSLSocketFactory");
props.put("mail.smtp.socketFactory.port", smtpPort);
props.put("mail.smtp.port", smtpPort);
props.put("mail.smtp.from", mailSender);
props.put("mail.user", mailSender);
props.put("mail.password", password);
props.setProperty("mail.smtp.ssl.enable", "true");

// 构建授权信息,用于进行SMTP进行身份验证
Authenticator authenticator = new Authenticator() {
    @Override
    protected PasswordAuthentication getPasswordAuthentication() {
        // 用户名、密码
        String userName = props.getProperty("mail.user");
        String password = props.getProperty("mail.password");
        return new PasswordAuthentication(userName, password);
    }
};

// 使用环境属性和授权信息,创建邮件会话
Session mailSession = Session.getInstance(props, authenticator);
final String messageIDValue = genMessageID(props.getProperty("mail.user"));
//创建邮件消息
MimeMessage message = new MimeMessage(mailSession) {
    @Override
    protected void updateMessageID() throws MessagingException {
        //设置自定义Message-ID值
        setHeader("Message-ID", messageIDValue);//创建Message-ID
    }
};

try {
    InternetAddress from = new InternetAddress(mailSender, mailSender);
    message.setFrom(from);
    message.setSentDate(new Date()); // 设置时间
    //设置邮件标题
    message.setSubject(subject);
    message.setContent(content, "text/html");
    message.setRecipients(Message.RecipientType.TO, to);
    // 发送邮件
    Transport.send(message);
} catch (MessagingException | UnsupportedEncodingException e) {
    e.printStackTrace();
    throw new CusobException(ResultCodeEnum.EMAIL_SEND_FAIL);
}

可以看出,这种发送方式是相对比较复杂的,因为要设置smtpserver(这里为我们自建的邮件服务器的smtp server),smtp port(这里使用的是标准TLS,端口为463),mailSender和Password(前者就是发送方Sender,这里的Password是邮件发送的密码,一般由邮件服务提供商提供,也可由我们自己的邮件服务器提供,如果配置为我们的企业邮箱,那么该密码由我们提供)。

最终选择的策略是第二种,因为客户使用我们的产品自然需要使用我们的代发,对于代发而言第二种是更合适的,因为其可以通过指定smtp server来有选择的控制去使用我们自己的邮件服务器。

二、Rabbitmq简介

RabbitMQ是一款流行的开源消息代理(message broker)软件,主要用于实现消息的异步传递和分布式系统之间的通信。它基于AMQP标准,实现了一个强大、灵活且可扩展的消息传递系统。

1.核心概念

  • Producer(生产者):消息的发送方,负责将消息发送到交换器(Exchange)。
  • Consumer(消费者):消息的接收方,从队列(Queue)中读取和处理消息。
  • Message(消息):数据的载体,由生产者发送并由消费者接收处理。
  • Queue(队列):用于存储消息的缓冲区,消费者从队列中读取消息。
  • Exchange(交换器):接收来自生产者的消息,并根据绑定规则将消息路由到一个或多个队列。常见的交换器类型包括:
    • direct:根据消息的路由键将消息发送到绑定的队列。
    • fanout:将消息广播到所有绑定的队列。
    • topic:根据路由键模式匹配将消息发送到绑定的队列。
    • headers:根据消息头属性进行路由。
  • Binding(绑定):交换器和队列之间的连接,定义了消息如何从交换器路由到队列的规则。
  • Connection(连接):应用程序和RabbitMQ之间的TCP连接。
  • Channel(信道):通过一个单一的TCP连接建立的轻量级连接,进行消息的读写操作。

2.工作原理

第一步,消息发送:生产者将消息发送到交换器,交换器根据绑定规则将消息路由到一个或多个队列。

第二步,消息存储:消息到达队列后存储在队列中,等待消费者处理。

第三步,消息接收:消费者从队列中读取消息,处理后确认消息,从队列中移除。

三、异步实现营销邮件发送

在经过前两步的铺垫后,现在可以对整个流程进行梳理:

1.每个客户在进行完一个营销邮件活动发送时,所有的需要发送的信件(包含收件方、发送方、主题、正文内容等)都作为待消费的服务,进入到rabbitmq的队列中进行依次消费。

2.发送方能够选择发送的时间,也就是需要控制进入mq的时间。

3.需要更新相关的记录,如发送的历史记录等。

下面是核心代码:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = MqConst.QUEUE_MASS_MAILING, durable = "true"),
        exchange = @Exchange(value = MqConst.EXCHANGE_MAIL_DIRECT),
        key = {MqConst.ROUTING_MASS_MAILING}
))
public void massMailing(Campaign campaign, Message message, Channel channel) throws IOException {
    try {
        if (campaign!=null){
            Date sendTime = campaign.getSendTime();
            Date now = new Date();
            if (now.after(sendTime)){
                campaignService.MassMailing(campaign);
                campaignService.updateStatus(campaign.getId(), Campaign.COMPLETED); //更新发送状态
            }else {
                long delay = sendTime.getTime() - now.getTime();
                ScheduledThreadPoolExecutor executor =
                        new ScheduledThreadPoolExecutor(2, new ThreadPoolExecutor.CallerRunsPolicy());
                executor.schedule(() -> {
                    campaignService.MassMailing(campaign);
                    campaignService.updateStatus(campaign.getId(), Campaign.COMPLETED);
                }, delay, TimeUnit.MILLISECONDS);
                executor.shutdown();
            }

        }
    } catch (Exception e) {
        System.out.println(e);
    } finally {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

在@RabbitListener中,
value:指定了队列的名称
exchange:指定了交换机的名称,以及使用默认的交换机类型direct
key:指定路由键名称
这里的定时方案采用的是ScheduledThreadPoolExecutor类来实现,它专门用于调度和执行延迟任务或周期性任务。其构造函数接受两个参数:核心线程池大小和拒绝策略(
指在任务提交到线程池时,如果线程池已经达到饱和状态所采取的策略)。在该方案中,核心线程池大小为2,这意味着线程池中会保持至少2个线程,即使它们处于空闲状态。使用
CallerRunsPolicy表示务将由提交任务的线程(调用者线程)来运行。
值得一提的是,在这里的finally进行了手动确认消息的行为,这是防止当发送邮件出现问题,或者收件方信息有问题时,该服务一直堵塞在该处导致其他邮件都无法发送,
从而带来严重的问题。当有了手动确认后,该邮件不管是发送成功还是失败,其都不会尝试再次发送。但这并不是唯一的解决方案,这只是比较便捷的解决方案。
如果需要设置最大尝试次数,只有达到该次数时才手动确认,那么可以使用死信队列,可以起到同样的效果。

留下回复