NameServer原理
...大约 4 分钟
NameServer原理
架构设计
- NameServer 启动后每隔 10s 扫描一下 brokerLiveTable,检测表中上次心跳包的时间,如果间隔超过 120s 则认为该 broker 不可用,移出路由表中与该 broker 相关的路由信息。
- broker 启动后每过 30s 向 NameServer 发送心跳包,并传输路由信息,NameServer 进行保存。
- 生产者启动后会和 NameServer、broker 进行长连接。生产者每隔 30s 向 NameServer 获取路由信息。
- 消费者每 30s 向 NameServer 获取路由信息
启动流程
总的流程为:运行 NameServer 中 main 方法---> 创建 NameServer---> 启动 NameServer;
创建 NameServer
主要逻辑为通过配置文件创建 NameServer对象。
启动 NameServer
主要的过程为:NameServer#initialize和NameServer#start
initialize
- 加载 kv 配置
- 创建 netty remote server
- 注册处理器
- 每隔 10s 扫描存活的 broker,去除间隔超过 120s broker 的相关信息
- 每隔 10 分钟打印一次 kv 信息
- 判断是否禁用 tls,并添加文件监控,如果文件发生改变则重新加载 ssl 文件
start
- 启动 netty remote server
- 启动文件监控
源码
public class NamesrvStartup {
private static InternalLogger log;
private static Properties properties = null;
private static CommandLine commandLine = null;
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
// 根据参数创建 nameSrvController
NamesrvController controller = createNamesrvController(args);
// 启动 nameSrvController
start(controller);
//打印日志
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
/**
* 创建namesrv控制器
*
* @param args arg游戏
* @return {@link NamesrvController}
* @throws IOException ioexception
* @throws JoranException joran例外
*/
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
// 设置版本
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
// 创建命令行选项
Options options = ServerUtil.buildCommandlineOptions(new Options());
// 解析命令行
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
// 没有命令行则退出
if (null == commandLine) {
System.exit(-1);
return null;
}
// NameSrv 的配置
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// NettyServer 的配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// 设置监听端口
nettyServerConfig.setListenPort(9876);
// 如果存在 c 命令则读取文件配置 nettyConfig 和 nameSrvConfig
if (commandLine.hasOption('c')) {
// 获取配置文件路径
String file = commandLine.getOptionValue('c');
// 如果配置文件不为空
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
// 配置文件加载为 properties
properties = new Properties();
properties.load(in);
// 将配置文件中的配置加载到 namesrvConfig 和 nettyServerConfig 中
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
// 设置配置文件路径
namesrvConfig.setConfigStorePath(file);
// 打印日志
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
// 如果命令行中有p 则打印配置
if (commandLine.hasOption('p')) {
// 加载 console 日志
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
// 打印配置
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
// 退出
System.exit(0);
}
// 命令行填充到 nameSrvConfig
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
// 检查是否设置了 rocketMQ 的 Home
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// 设置 log
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
// 打印配置
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
// 创建 nameSrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// 注册配置文件
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
// 返回 controller
return controller;
}
public static NamesrvController start(final NamesrvController controller) throws Exception {
// 如果 controller 为空则抛出异常
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// controller 初始化
boolean initResult = controller.initialize();
// 初始化失败则退出
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 停止钩子
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
controller.shutdown();
return null;
}));
controller.start();
return controller;
}
public static void shutdown(final NamesrvController controller) {
controller.shutdown();
}
public static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("c", "configFile", true, "Name server config properties file");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("p", "printConfigItem", false, "Print all config items");
opt.setRequired(false);
options.addOption(opt);
return options;
}
public static Properties getProperties() {
return properties;
}
}
public class NamesrvController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvConfig namesrvConfig;
private final NettyServerConfig nettyServerConfig;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
private final KVConfigManager kvConfigManager;
private final RouteInfoManager routeInfoManager;
private RemotingServer remotingServer;
private BrokerHousekeepingService brokerHousekeepingService;
private ExecutorService remotingExecutor;
private Configuration configuration;
private FileWatchService fileWatchService;
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
/**
* 初始化
*
* @return boolean
*/
public boolean initialize() {
// 加载 KV 配置管理
this.kvConfigManager.load();
// 创建Netty远程Server
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 创建远程线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册处理器
this.registerProcessor();
// 每隔10s 扫描Broker 是否存活,移除不存活的Broker
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);
// 每隔十分钟定时打印KV配置
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);
// 若未禁用 tls 则开启
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
// 文件监听器
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
public void start() throws Exception {
// netty 启动
this.remotingServer.start();
// tls 文件监控服启动
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
public void shutdown() {
this.remotingServer.shutdown();
this.remotingExecutor.shutdown();
this.scheduledExecutorService.shutdown();
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
}
public NamesrvConfig getNamesrvConfig() {
return namesrvConfig;
}
public NettyServerConfig getNettyServerConfig() {
return nettyServerConfig;
}
public KVConfigManager getKvConfigManager() {
return kvConfigManager;
}
public RouteInfoManager getRouteInfoManager() {
return routeInfoManager;
}
public RemotingServer getRemotingServer() {
return remotingServer;
}
public void setRemotingServer(RemotingServer remotingServer) {
this.remotingServer = remotingServer;
}
public Configuration getConfiguration() {
return configuration;
}
}
路由注册、故障剔除
路由元信息
路由删除
路由发现
Powered by Waline v3.1.3