本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
前置阅读文章:
我们先来看下 《Dubbo 用户指南 —— Redis 注册中心》 文档,内容如下:
基于 Redis 实现的注册中心。
使用 Redis 的 Key/Map 结构存储数据结构:
- 主 Key 为服务名和类型
- Map 中的 Key 为 URL 地址
- Map 中的 Value 为过期时间,用于判断脏数据,脏数据由监控中心删除
- 横向来看,和基于 Zookeeper 实现的注册中心,也是分成 Root、Service、Type、URL 四层。
- 使用 Redis Map 的数据结构,聚合相同服务和类型( Root + Service + Type )。
- 不使用 Redis 的自动过期机制,而是通过监控中心,实现过期机制。因为,Redis Key 自动过期时,不存在相应的事件通知。
- 服务提供者和消费者,定时延长其注册的 URL 地址的过期时间。
使用 Redis 的 Publish/Subscribe 事件通知数据变更:
- 通过事件的值区分事件类型:
register,unregister- 普通消费者直接订阅指定服务提供者的 Key,只会收到指定服务的变更事件
- 监控中心通过 psubscribe 功能订阅
/dubbo/*,会收到所有服务的所有变更事件
- 服务实例的启动或关闭,会写入或删除对应的 Redis Map 中,并发起对应的
register,unregister事件,从而保证实时性。 - 通过监控中心,轮询 Key 过期,保证未正常关闭的服务实例的 URL 的删除,并发起对应的
unregister事件,从而保证最终一致性。
调用过程:
【一】服务提供方
- 1、服务提供方启动时,向
Key:/dubbo/com.foo.BarService/providers下,添加当前提供者的地址- 2、并向
Channel:/dubbo/com.foo.BarService/providers发送register事件【二】服务消费方
- 3、服务消费方启动时,从
Channel:/dubbo/com.foo.BarService/providers订阅register和unregister事件- 4、并向
Key:/dubbo/com.foo.BarService/providers下,添加当前消费者的地址
服务消费方收到register和unregister事件后,从Key:/dubbo/com.foo.BarService/providers下获取提供者地址列表【三】服务监控中心
- 5、服务监控中心启动时,从
Channel:/dubbo/*订阅register和unregister,以及subscribe和unsubsribe事件- 6、服务监控中心收到
register和unregister事件后,从Key:/dubbo/com.foo.BarService/providers下获取提供者地址列表- 7、服务监控中心收到
subscribe和unsubsribe事件后,从Key:/dubbo/com.foo.BarService/consumers下获取消费者地址列表
本文涉及仅有 RedisRegistry 一个类,类图如下:
2. RedisRegistry
com.alibaba.dubbo.registry.redis.RedisRegistry ,实现 FailbackRegistry 抽象类,基于 Redis 实现的注册中心实现类。
2.1 构造方法
1: /** |
jedisPools属性,JedisPool 集合,其中键为ip:port。在【第 64 至 84 行】和【第 93 至 99 行】和【101 至 121 行】初始化。root属性,Redis 根节点,即首图的 Root 层。在【第 126 至 134 行】初始化。replicate属性,是否复制模式。在【第 86 至 90 行】文档说明如下:可通过
<dubbo:registry cluster="replicate" />设置 redis 集群策略,缺省为failover:failover: 只写入和读取任意一台,失败时重试另一台,需要服务器端自行配置数据同步。replicate: 在客户端同时写入所有服务器,只读取单台,服务器端不需要同步,注册中心集群增大,性能压力也会更大。
notifiers属性,通知器集合,其中键为 Root + Service 。Notifier ,用于 Redis Publish/Subscribe 机制中的订阅,实时监听数据的变化。reconnectPeriod属性,重连周期,单位:毫秒。在【第 91 行】初始化。用于订阅发生 Redis 连接异常时,Notifier sleep ,等待重连上。
expireExecutor属性,Redis Key 过期机制执行器。expirePeriod属性,Redis Key 过期周期,单位:毫秒。在【第 137 行】初始化。expireFuture属性,Redis Key 过期机制任务的 Future 。在【第 138 至 146 行】初始化。- 该任务主要有两个逻辑:1)延长未过期的 Key ;2)删除过期的 Key 。
- 任务间隔为
expirePeriod的一半,避免过于频繁,对 Redis 的压力过大;同时,避免过于不频繁,每次执行时,都过期了。
admin属性,是否监控中心,在#clean(Jedis)方法,看到具体的使用。
2.2 doRegister
1: |
- 第 3 行:调用
#toCategoryPath(url)方法,获得分类路径作为 Key 。 - 第 4 行:调用
URL#toFullString()方法,获得 URL 字符串作为 Value 。 - 第 6 行:计算过期时间,当前时间 +
expirePeriod。 - 第 9 至 30 行:向 Redis 注册。
- 第 16 行:调用
Jedis#hset(key, value, expire)方法,写入 Redis Map 中。注意,过期时间,作为 Map 的值。 - 第 18 行:调用
Jedis#publish(channel, message)方法,发布register事件。这样订阅该 Key 的服务消费者和监控中心,就会实时从 Redis 读取该服务的最新数据。 - 第 21 至 23 行:如果非
replicate,意味着 Redis 服务器端已同步数据,只需写入单台机器。因此,结束循环。否则,满足replicate,向所有 Redis 写入。
- 第 16 行:调用
- 第 31 至 38 行:处理异常。这块代码胖友自己看下,注意下
exception和success赋值的地方。这块的打印告警日志的处理方式,也适用于多次重试某个操作,结果发生异常,但是最终成功。例如,HTTP 请求远程服务。
2.1.1 toCategoryPath
/** |
2.3 doUnregister
|
- 当服务消费者或服务提供者,关闭时,会调用
#doUnregister(url)方法,取消注册。在该方法中,会删除对应 Map 中的键 + 发布unregister事件,从而实时通知订阅者们。因此,正常情况下,就无需监控中心,做脏数据删除的工作。 - 🙂 代码比较简单,和
#doRegister()方法,逻辑相反。
2.4 doSubscribe
1: |
- ========== 【第一步】Notifier 部分 ==========
- 第 4 行:调用
#toServicePath(url)方法,获得服务路径,例如:/dubbo/com.alibaba.dubbo.demo.DemoService。 - 第 6 行:获得通知器 Notifier 对象。
- 第 8 至 15 行:若不存在,则创建 Notifier 对象,并调用
Notifier#start()方法。 - ========== 【第二步】获取初始化数据,并进行通知 ==========
- 第 19 行:循环
jedisPools,向 Redis 发起订阅,直到一个成功。我们会看到代码,分成两个部分。 - 【第二部分】第 46 至 49 行,适用服务提供者和服务消费者,处理指定 Service 层的初始化数据:
- 第 48 行:调用
Jedis#keys(pattern)方法,获得指定 Service 层下的所有 URL 们。例如/dubbo/com.alibaba.dubbo.demo.DemoService/*。 - 第 48 行:调用
#doNotify(jedis, keys, url, listeners)方法,通知监听器,初始的数据。 - 【第一部分】第 25 至 45 行,适用注册中心,处理所有 Service 层的初始化数据:
- 第 26 行:标记
admin = true。因为,只有注册中心,才清理脏数据。 - 第 28 行:调用
Jedis#keys(pattern)方法,获得所有 Service 层下的所有 URL 们。例如/dubbo/*。 - 第 31 至 39 行:按照服务聚合 URL 集合。
- 第 42 至 44 行:循环
serviceKeys,调用#doNotify(jedis, keys, url, listeners)方法,按照每个 Service 层,通知监听器,初始的数据。此处,就和【第 48 行】类似。
另外,订阅动作(【第一步】)一定要在获取初始化数据(【第二步】)之前。如果反过来,可能获取数据完后,处理的过程中,有数据的变更,我们就无法收到 register unregister 的事件。
2.4.1 toServicePath
/** |
2.4.2 toServicePath
/** |
2.5 doUnsubscribe
|
- 此处目前并未实现,艿艿觉得,此处应该增加取消向 Redis 的订阅( Subscribe ) 。在 ZookeeperRegistry 的该方法中,是移除了对应的监听器。
2.6 doNotify
1: // @params key 分类数组,例如:`/dubbo/com.alibaba.dubbo.demo.DemoService/providers` |
两个重载的
#doNotify(...)方法,主要差异点在前者少url和listeners方法参数。所以:第 3 行:我们可以看到调用
#getSubscribed()方法,获得所有监听器。代码如下:/**
* 订阅 URL 的监听器集合
*
* key:订阅者的 URL ,例如消费者的 URL
*/
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();- x
- 第 3 至 5 行:循环调用
#doNotify(jedis, keys, url, listeners)方法,进行通知。但是呢?这样一来,通知的事件(key)和监听器未必匹配,因此在【第 20 至 30 行】的代码,进行匹配。
- 第 15 行:获得分类层( Category ),即分类数组。在 《精尽 Dubbo 源码分析 —— 注册中心(二)之 Zookeeper》「4. 调用」 小节中,我们也看到,不同角色关注不同的分类数据。
- 服务消费者,关注
providersconfigurationsroutes。 - 服务提供者,关注
consumers。 - 监控中心,关注所有。
- 服务消费者,关注
- 第 18 行:循环分类层,即每个元素为 Root + Service + Type ,例如:
/dubbo/com.alibaba.dubbo.demo.DemoService/providers。 - 第 32 至 44 行:调用
Jedis#hgetAll(key)方法,获得所有 URL 数组。并且,获取完成后,会过滤掉已过期的动态节点。 - 第 45 至 51 行:若不存在匹配,则创建
empty://的 URL返回,用于清空该服务的该分类。 - 第 52 行:添加到
result中。 - 第 60 至 63 行:全量数据获取完成时,调用
super#notify(...)方法,回调 NotifyListener 。该方法,在 《精尽 Dubbo 源码分析 —— 注册中心(一)之抽象 API》 有详细解析。
2.6.1 toServiceName
/** |
2.6.2 toCategoryName
/** |
2.7 deferExpired
1: private void deferExpired() { |
- 被
expireExecutor中的定时调用,整体逻辑类似#doRegister()方法。 第 8 行:调用
#getRegistered()方法,获得已注册的 URL 集合。代码如下:/**
* 已注册 URL 集合。
*
* 注意,注册的 URL 不仅仅可以是服务提供者的,也可以是服务消费者的
*/
private final Set<URL> registered = new ConcurrentHashSet<URL>();第 8 行:循环 URL 集合。
- 第 10 行:判断是否为动态节点,只有动态节点需要延长过期时间。
- 第 14 行:调用
Jedis#hset(key, value, expire)方法,写入 Redis Map 中。注意,过期时间,作为 Map 的值。 - 第 16 行:若【第 14 行】写入返回的值为 1 ,说明 Map 中该键对应的值不存在(例如,多写 Redis 节点时,有个节点写入失败),发布
register事件。 - 第 21 至 23 行:若是注册中心(
admin = true) 时,调用#clean(Jedis)方法,清理过期脏数据。 - 第 25 至 27 行:如果服务器端已同步数据,只需写入单台机器。
2.7.1 clean
private void clean(Jedis jedis) { |
- 整体逻辑类似
#doUnregister()方法。 - 🙂 胖友自己看方法的注释哈。
2.8 isAvailable
|
2.9 destroy
|
3. Notifier
Notifier 是 RedisRegistry 的内部类。
Notifier ,继承 Thread 类,负责向 Redis 发起订阅逻辑。
3.1 构造方法
1: /** |
service属性,服务名 Root + Service。first属性,是否首次。在#run()方法中,查看。【第 13 至 32 行】的属性,相当于重连策略,用于和 Redis 断开时,忽略一定次数和 Redis 的连接,避免空跑。涉及方法如下:
#isSkip()方法,判断是否忽略本次对 Redis 的连接。代码如下:1: private boolean isSkip() {
2: // 获得需要忽略连接的总次数。如果超过 10 ,则加上一个 10 以内的随机数。
3: int skip = connectSkip.get(); // Growth of skipping times
4: if (skip >= 10) { // If the number of skipping times increases by more than 10, take the random number
5: if (connectRandom == 0) {
6: connectRandom = random.nextInt(10);
7: }
8: skip = 10 + connectRandom;
9: }
10: // 自增忽略次数。若忽略次数不够,则继续忽略。
11: if (connectSkiped.getAndIncrement() < skip) { // Check the number of skipping times
12: return true;
13: }
14: // 增加需要忽略的次数
15: connectSkip.incrementAndGet();
16: // 重置已忽略次数和随机数
17: connectSkiped.set(0);
18: connectRandom = 0;
19: return false;
20: }- 第 2 至 9 行:获得需要忽略连接的总次数。如果超过 10 ,则加上一个 10 以内的随机数。思路是,连接失败的次数越多,每一轮加大需要忽略的总次数,并且带有一定的随机性。
- 第 10 至 13 行:自增忽略次数。若忽略次数不够,则继续忽略,即返回
true。 第 15 行:增加需要忽略的次数。也就是说,下一轮,不考虑随机数,会多一次。如下是一次模拟:
第一轮
- connectSkip: 0; connectSkiped: 0
第二轮
- connectSkip: 1; connectSkiped: 0
- connectSkip: 1; connectSkiped: 1
第三轮
- connectSkip: 2; connectSkiped: 0
- connectSkip: 2; connectSkiped: 1
- connectSkip: 2; connectSkiped: 2
- 当超过十轮后,增加随机数。
第 16 至 18 行:重置已忽略次数和随机数。
#resetSkip()方法,重置忽略连接的信息。代码如下:private void resetSkip() {
// 重置需要连接的次数
connectSkip.set(0);
// 重置已忽略次数和随机数
connectSkiped.set(0);
connectRandom = 0;
}- x
3.2 run
1: |
- 第 3 行:循环执行,直到关闭。
- 第 6 行:调用
#isSkip()方法,判断是否跳过本次 Redis 连接。But ,即使跳过,也没有执行类似 sleep 的逻辑,有点奇怪。这样,会导致实际即使跳过,也会快速向 Redis 发起订阅。【TODO 8032】Redis 重连逻辑 - 第 8 行:循环连接池,发起订阅,直到一个成功。
- ======================================================
【第一种情况】第 14 至 26 行:监控中心
第 15 至 24 行:目前这块代码有一些问题?!初始时
first=true,那么这块代码永远无法执行到。笔者猜测这块的意图是,在 ZookeeperRegistry 中,可以实现对连接状态的监听,从而实现断开重连成功后,从 Zookeeper 获取到最新的数据。代码如下:zkClient.addStateListener(new StateListener() {
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
```
* 而 JedisPool 中,不提供这样的连接监控机制。那么如果订阅 Redis 发生了异常,我们可以认为 Redis 连接断开了,需要重新发起订阅,并且需要**重新**从 Redis 中获取到最新的数据。
* 那么此处的代码可以这样改:
* 第 16 行: `first = true;`
* 第 42 行:增加 `first = false;`
* 第 26 行:调用 `Jedis#psubscribe(JedisPubSub jedisPubSub, String... patterns)` 方法,订阅**所有 Service 层**。
* 【第二种情况】第 27 至 36 行:服务提供者或消费者。
* 第 29 至 34 行:和【第 15 至 24 行】**类似**。
* 第 35 行:调用 `Jedis#psubscribe(JedisPubSub jedisPubSub, String... patterns)` 方法,订阅**指定 Service 层**。
* ======================================================
* 第 37 至 40 行:无法执行到,因为 `Jedis#psubscribe(JedisPubSub jedisPubSub, String... patterns)` 方法,是**阻塞**的。这也是为什么 Notifier 是一个 **Thread** 的原因。
* 第 41 至 45 行:发生异常,说明 Redis 连接断开了。因此,调用 `#sleep(millis)` 方法,等待 Redis 重连成功。通过这样的方式,避免执行,占用大量的 CPU 资源。
## 3.3 shutdown
```Java
public void shutdown() {
try {
// 停止运行
running = false;
// Jedis 断开连接
jedis.disconnect();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
4. NotifySub
NotifySub 是 RedisRegistry 的内部类。
NotifySub ,实现 redis.clients.jedis.JedisPubSub 抽象类,通知订阅实现类。
1: private class NotifySub extends JedisPubSub { |
- 实现了
#onMessage(key, msg)和#onPMessage(pattern, key, msg)方法,收到registerunregister事件,调用#doNotify(jedis, key)方法,通知监听器,数据变化,从而实现实时更新。
5. 可靠性
FROM 《Dubbo 用户指南 —— Redis 注册中心》
阿里内部并没有采用 Redis 做为注册中心,而是使用自己实现的基于数据库的注册中心,即:Redis 注册中心并没有在阿里内部长时间运行的可靠性保障,此 Redis 桥接实现只为开源版本提供,其可靠性依赖于 Redis 本身的可靠性。
FROM 《Dubbo 用户指南 —— 成熟度》
Redis注册中心
- Maturity:Stable
- Strength:支持基于客户端双写的集群方式,性能高
- Problem:要求服务器时间同步,用于检查心跳过期脏数据
- Advise:可用于生产环境
做个小笔记,Redis 主从复制的情况下,从节点的订阅( Subscribe ),可以收到主节点的发布( Publish ) 。做这个笔记的原因是,原来担心 "failover" 模式下,Redis 主节点挂了,如果订阅从节点,会不会出现,Redis 主节点恢复后,收不到在其上的发布事件。
666. 彩蛋
🙂 看了 Redis 的注册中心的实现,收获还是蛮大的。原来的一直纠结,如何解决 Redis 自动过期,怎么监听到。原来,自己的思路错了!!!