springboot+kafka+郵件發送(最佳實踐)


  菜譜微信小程序,操作簡單,教程詳細,歡迎大家使用

  

 

  1. 導讀
    1. 集成spring-kafka,生產者生產郵件message,消費者負責發送
    2. 引入線程池,多線程發送消息
    3. 多郵件服務器配置
    4. 定時任務生產消息;計划郵件發送
  2. 實現過程
    1. 導入依賴
   <properties>
        <java.version>1.8</java.version>
        <mysql.version>5.1.38</mysql.version>
        <mapper.version>2.1.5</mapper.version>
        <mybatis.version>1.3.2</mybatis.version>
        <gson.version>2.8.2</gson.version>
        <lang3.version>3.4</lang3.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!-- Spring Test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>${mybatis.version}</version>
        </dependency>
        <!--數據庫驅動-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
        <!-- 通用Mapper啟動器 -->
        <dependency>
            <groupId>tk.mybatis</groupId>
            <artifactId>mapper-spring-boot-starter</artifactId>
            <version>${mapper.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <!-- 自定義配置文件需要 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <!-- 使用SLF4J + Logback 作為日志框架 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-mail-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-mail</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>${gson.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>${lang3.version}</version>
        </dependency>
    </dependencies>

2.application.yml配置kafka、郵箱、數據庫參數

本文采用的yml配置方式,若使用的properties的可使用https://www.toyaml.com/index.html這個在線工具轉換;

郵箱配置,若使用的163、或者qq等,自己百度怎么申請授權碼(一大堆教程);

數據庫采用的Hikari連接池,號稱java平台最快的;

# 配置Kafka集群IP地址,多個IP以逗號隔開:
spring:
kafka:
bootstrap-servers: 你的kafkaIP:端口號
producer:
retries: 2 #發送失敗后的重復發送次數
key-serializer: org.apache.kafka.common.serialization.StringSerializer #key序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer #value序列化方式
compression-type: gzip #壓縮格式
batch-size: 16384 #批量發送的消息數量
buffer-memory: 33554432 #32M的批處理緩沖區
consumer:
auto-offset-reset: earliest #最早未被消費的offset
enable-auto-commit: false #是否開啟自動提交
#auto-commit-interval: 1000 #自動提交的時間間隔
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #key解碼方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #value解碼方式
group-id: kafka.consumer.group.id.1
max-poll-records: 50
properties:
session-timeout-ms: 20000 #連接超時時間
max-poll-interval-ms: 15000 #手動提交設置與poll的心跳數,如果消息隊列中沒有消息,等待毫秒后,調用poll()方法。如果隊列中有消息,立即消費消息,每次消費的消息的多少可以通過max.poll.records配置。
max-partition-fetch-bytes: 15728640 #設置拉取數據的大小 15M
client-id: kafkacli
listener:
ack-mode: manual_immediate

datasource:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.jdbc.Driver
url: * #自己的
username: * #賬號
password: * #密碼
hikari:
minimum-idle: 5
# 空閑連接存活最大時間,默認600000(10分鍾)
idle-timeout: 180000
# 連接池最大連接數,默認是10
maximum-pool-size: 10
# 此屬性控制從池返回的連接的默認自動提交行為,默認值:true
auto-commit: true
# 連接池名稱
pool-name: MyHikariCP
# 此屬性控制池中連接的最長生命周期,值0表示無限生命周期,默認1800000即30分鍾
max-lifetime: 1800000
# 數據庫連接超時時間,默認30秒,即30000
connection-timeout: 30000
connection-test-query: SELECT 1

# 郵箱服務器配置,以163郵箱為例
mail:
host: smtp.163.com #郵箱服務器地址
port: 25 #端口
username: * #用戶名
password: * #授權密碼
default-encoding: UTF-8
properties:
from: * #用戶名
mail:
smtp:
connectiontimeout: 5000
timeout: 3000
writetimeout: 5000

# 郵件模板
thymeleaf:
cache: false
prefix: classpath:/views/
# 郵件附件
servlet:
multipart:
max-file-size: 10MB #限制單個文件大小
max-request-size: 50MB #限制請求總量


logging:
level:
com.example: debug
pattern:
# console: %d{yyyy/MM/dd-HH:mm:ss} [%thread] %-5level %logger- %msg%n
# file: %d{yyyy/MM/dd-HH:mm} [%thread] %-5level %logger- %msg%n
path: C:\log

# 郵件失敗重試次數
com:
example:
mail:
sendNumber: 3 #郵件發送失敗重試次數
threadKillTime: 60 #線程超時殺死

mybatis:
type-aliases-package: com.example.mail.entity
configuration:
map-underscore-to-camel-case: true
mapper-locations: mappers/*Mapper.xml


# 異步線程配置,配置核心線程數
async:
executor:
thread:
core_pool_size: 15 #核心線程數量,線程池創建時候初始化的線程數
max_pool_size: 15 #最大線程數,只有在緩沖隊列滿了之后才會申請超過核心線程數的線程
queue_capacity: 99999 #緩沖隊列,用來緩沖執行任務的隊列
keep_alive_seconds: 60 #當超過了核心線程出之外的線程在空閑時間到達之后會被銷毀
await_termination_seconds: 30 #設置線程池中任務的等待時間,如果超過這個時候還沒有銷毀就強制銷毀,以確保應用最后能夠被關閉,而不是阻塞住。
name:
prefix: async-service-
prefixson: async-service-son

3.郵件消息格式

{
  "mailUid": "郵件唯一標識",
  "fromName": "發件人別名",
  "fromMail": "發件人地址",
  "toMail": "收件人地址(多個郵箱則用逗號","隔開)",
  "ccMail": "抄送人地址(多個郵箱則用逗號","隔開)",
  "bccMail": "密送人地址(多個郵箱則用逗號","隔開)",
  "planSendTime": "計划郵件時間",
  "mailSubject": "郵件主題",
  "mailContent": "郵件正文",
  "sendNum": 發送次數,
  "serverFlag": "郵件服務器標識(多郵件服務器用)"
}

4.kafka生產者、消費者、mail發送類等主要方法代碼

//生產者
public void sendToKafkaStandardMessageAsync(MailDTO mailDTO) {

producer = new KafkaProducer<String, Object>(kafkaConfig.producerConfigs());

producer.send(new ProducerRecord<String, Object>(topicName, gson.toJson(mailDTO)), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
log.info("生產消息成功{},發送次數{},checksum:{},offset:{},partition:{},topic:{}", mailDTO.getMailUid(),mailDTO.getSendNum(),metadata.checksum(), metadata.offset(), metadata.partition(), metadata.topic());
}
if (exception != null) {
log.info("生產消息失敗{}", exception.getMessage());
}
}
});
producer.close();
}
//消費者
/**
* 監聽一個Kafka 主題
**/
@KafkaListener(topics = MQConstants.Topic.ITEM_EXCHANGE_NAME)
public void receiveMessageFromKafka(ConsumerRecord<?, ?> record, Acknowledgment ack) {
log.info("監聽消息,MailUid:{}", gson.fromJson(String.valueOf(record.value()), MailDTO.class).getMailUid());

Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
sendMessageService.sendMessages(gson.fromJson(String.valueOf(record.value()), MailDTO.class));
}
ack.acknowledge();//手動提交偏移量
}
//構建復雜郵件信息類
    public void sendMimeMail(MailVo mailVo) {
        try {
            MimeMessageHelper messageHelper = new MimeMessageHelper(mailSender.createMimeMessage(), true);//true表示支持復雜類型
            mailVo.setFrom("這里讀取配置文件中配的from地址");//郵件發信人從配置項讀取
            messageHelper.setFrom(mailVo.getFrom());//郵件發信人
            messageHelper.setSentDate(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2019-07-18 12:45:48"));
            messageHelper.setTo(mailVo.getTo().split(","));//郵件收信人
            messageHelper.setSubject(mailVo.getSubject());//郵件主題
            messageHelper.setText(mailVo.getText());//郵件內容
            if (!StringUtils.isEmpty(mailVo.getCc())) {//抄送
                messageHelper.setCc(mailVo.getCc().split(","));
            }
            if (!StringUtils.isEmpty(mailVo.getBcc())) {//密送
                messageHelper.setCc(mailVo.getBcc().split(","));
            }
            if (mailVo.getMultipartFiles() != null) {//添加郵件附件
                for (MultipartFile multipartFile : mailVo.getMultipartFiles()) {
                    messageHelper.addAttachment(multipartFile.getOriginalFilename(), multipartFile);
                }
            }
            if (StringUtils.isEmpty((CharSequence) mailVo.getSentDate())) {//發送時間
                mailVo.setSentDate(new Date());
                messageHelper.setSentDate(mailVo.getSentDate());
            }
            mailSender.send(messageHelper.getMimeMessage());//正式發送郵件
            mailVo.setStatus("ok");
            log.info("發送郵件成功:{}->{}", mailVo.getFrom(), mailVo.getTo());
        } catch (Exception e) {
            throw new RuntimeException(e);//發送失敗
        }
    }

 5.思路解析,畫圖吧,口述太費勁

 

完整代碼: https://github.com/wwt729/mail.git

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM