疯狂java


您现在的位置: 疯狂软件 >> 新闻资讯 >> 正文

Java接入微信公众号2:全异步消息处理架构设计


 

 
消息队列采用Kafka,因为此消息队列安装使用非常简单,而且性能也能够满足高并发的顺序数据处理,同时支持消息的持久化处理,不会丢失消息。本文主要讲解三个部分:架构设计、Kafka的简单使用、接收器的改造。
1.架构设计
核心是Kafka消息队列,每个层面都只是负责把自己的核心任务处理掉,其他不是自己要关心的事情,根本不考虑、甚至调用方法这样简单的事情都不去做,完全解耦。
这样做的好处:
a.如果某个部分处理比较耗时,直接通过消息的消费情况就可以直观看出来,此时只需要增加消费者节点的数量即可。
b.确保最快返回了一条信息给微信的用户,而且是无论如何都会返回一条信息给微信用户,避免后端的处理程序出现问题而导致没有回复用户。
 
下面这个图比较大,请仔细阅读,否则第三篇文章要看不懂了。
 
  
 
2.Kafka的简单使用
安装Kafka的文章,直接看着官方的Quick Start即可,这里主要是需要我们创建三个队列:
 
#下面的命令在Kafka目录下执行
 
# 启动Zookeeper
# 默认监听2181端口
./bin/zookeeper-server-start.sh  ./config/zookeeper.properties
 
# 启动Kafka服务器
# 启动之前需要先修改 ./config/server.properties 文件
# 把 【listeners=PLAINTEXT://:9092】之前的#去掉,让服务可以监听9092端口
./bin/kafka-server-start.sh  ./config/server.properties
 
# 创建Topics
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wechat-in-raw-message
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic wechat-in-message
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic wechat-out-message
 
 
 
3.接收器的改造
3.1.增加依赖
    org.springframework.kafka
    spring-kafka
 
 
 
3.2.在application.properties文件中配置Kafka
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=wechat
spring.kafka.consumer.auto-offset-reset=earliest
 
 
3.3.在ReceiverController里面增加一个成员变量注入
@Autowired
private KafkaTemplate
 
 
3.4.验证消息成功以后,把原始消息直接放入wechat-in-raw-message里面
if (verify(signature, timestamp, nonce))
{
    //发送原文到队列中
    template.send("wechat-in-raw-message", body);
}
 
 
 
 
4.测试能否从消息分发程序里面获取到加入的消息
消息分发程序里面,关于Kafka的配置跟消息接收器里面是一样的
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=wechat
spring.kafka.consumer.auto-offset-reset=earliest
 
配置以后直接箭头Kafka消息即可
 
package org.fkjava.ant.wechat.message.dispatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
/**
*
* @author 罗文强
*/
@Service
public class MessageDispatcher {
    private static final Logger LOG = LoggerFactory.getLogger(MessageDispatcher.class);
    /**
     * 通过KafkaListener注解指定要监听的队列
     *
     * @param cr
     * @throws Exception
     */
    @KafkaListener(topics = "wechat-in-raw-message")
    public void wechatRawMessageListener(ConsumerRecord
        LOG.info("{} - {} : {}", cr.topic(), cr.key(), cr.value());
    }
}
 
 
测试的时候,要按照以下的步骤执行:
启动Zookeeper -> 启动Kafka -> 启动消息接收器 -> 启动消息分发器 -> 微信客户端发送信息给公众号(可模拟测试)
 
【阅读原文】