1.metaq的数据传输基于gecko
2.metaq的RemotingClientWrapper是gecko的RemotingClient的包装类,通过ConcurrentHashMap<String/* url */, Set<Object>/* references */>成员变量添加了连接的建立和关闭计数功能。
3.remoteclient的连接语句如下:
for (int i = 0; i < connCount; i++) {
try {
final TimerRef timerRef = new TimerRef(((ClientConfig) this.config).getConnectTimeout(), null);
final Future<NioSession> future =
((GeckoTCPConnectorController) this.controller).connect(remoteAddress, groupSet, remoteAddress,
timerRef);
final CheckConnectFutureRunner runnable =
new CheckConnectFutureRunner(future, remoteAddress, groupSet, this);
timerRef.setRunnable(runnable);
this.insertTimer(timerRef);
}
catch (final Exception e) {
log.error("连接" + RemotingUtils.getAddrString(remoteAddress) + "失败,启动重连任务", e);
this.reconnectManager.addReconnectTask(new ReconnectTask(groupSet, remoteAddress));
}
}
在连接建立时,会新建一个定时任务,进行连接检测,连接的代码如下:
@Override
public void run() {
try {
if (!this.future.isDone() && this.future.get(10, TimeUnit.MILLISECONDS) == null) {
final ReconnectManager reconnectManager = this.remotingClient.getReconnectManager();
reconnectManager.addReconnectTask(new ReconnectTask(this.groupSet, this.remoteAddress));
}
}
catch (final Exception e) {
log.error("连接" + this.remoteAddress + "失败", e);
}
}
插入到TimerRefQueue对象中,reactor类实例使用访问者模式遍历queue对象,实现重连调用,代码如下:
private void processMoveTimer() {
final long now = this.getTime();
// 距离上一次检测时间超过1秒
if (now - this.lastMoveTimestamp >= TIMEOUT_THRESOLD && !this.timerQueue.isEmpty()) {
this.lastMoveTimestamp = now;
this.timerQueue.iterateQueue(new TimerQueueVisitor(now));
}
}
如果需要重连,插入重连任务队列LinkedBlockingQueue<ReconnectTask>tasks,进行重连,代码如下:
private void doReconnectTask(final ReconnectTask task) throws IOException, NotifyRemotingException {
log.info("Try to reconnect to " + RemotingUtils.getAddrString(task.getRemoteAddress()));
final TimerRef timerRef = new TimerRef(ReconnectManager.this.clientConfig.getConnectTimeout(), null);
try {
final Future<NioSession> future =
ReconnectManager.this.connector.connect(task.getRemoteAddress(), task.getGroupSet(),
task.getRemoteAddress(), timerRef);
final DefaultRemotingClient.CheckConnectFutureRunner runnable =
new DefaultRemotingClient.CheckConnectFutureRunner(future, task.getRemoteAddress(),
task.getGroupSet(), ReconnectManager.this.remotingClient);
timerRef.setRunnable(runnable);
ReconnectManager.this.remotingClient.insertTimer(timerRef);
// 标记这个任务完成
task.setDone(true);
}
catch (final Exception e) {
this.readdTask(task);
}
}
分享到:
相关推荐
Metamorphosis是淘宝开源的一个Java消息中间件,他类似apache-kafka,但不是一个简单的山寨拷贝,而是做了很多改进和优化,项目的主页在淘蝌蚪上。服务端、客户端、javadoc都包含在内。
metaq-server-1.4.6.2服务端+客户端+javadoc文档,打包于一个压缩包
metamorphosis(metaq) 服务端1.4.3版本 包括客户端 发送一个序列化对象
整理后的Metaq原理应用文档,欢迎大家看看。
metaQ向spark传数据
Metaq在JDk 7下的异常及解决方案,希望可以帮助学习者!
metaq-server-1.4.6.2.tar.gz
metaQ的安装包
metaq--1.4.6.2.zip 和原版一样就是换了个名字,方便大家一起学习.
MetaQ 分布式消息服务中间件.pdf
Memorphosis是一个消息中间件,它是linkedin开源MQ——kafka的Java版本,针对淘宝内部应用做了定制和优化。Metamorphosis的设计原则 ...• 消费状态保存在客户端 • 分布式,生产者、服务器和消费者都可分布
Metamorphosis(MetaQ)是一个高性能、高可用、可扩展的分布式消息中间件,类似于LinkedIn的Kafka,具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,在...
Metamorphosis, 一种高可用高性能的分布式 #新闻MetaQ 1.4.6.2 发布。更新日志MetaQ 1.4.6.1 发布。更新日志MetaQ 1.4.5.1 发布。更新日志MetaQ 1.4.5发布。更新日志meta: 一个用于的ruby 客户端。 源代码
rocketMQ
生产者负载均衡:metaq发送消息的时候,生产者在发送消息的时候必须选择一台broker上的一个分区来发送消息,因此metaq在运行过程中,会把所有broker和对应的分区信息全部注册到ZK指定节点上,默认的策略是一个依次...
分享一下 RocketMq的文档RocketMQ运维指令 rocketmq在阿里内部叫metaq
RocketMQ是一个纯java、分布式、队列模型的开源消息中间件,前身是Metaq,当 Metaq 3.0发布时,产品名称改为 RocketMQ。 具有以下特点: 1、能够保证严格的消息顺序 2、提供丰富的消息拉取模式 3、高效的订阅者水平...
该文档为storm模拟项目系列文档之一,是MetaQ与storm接口的说明文档,主要介绍了如何集成MetaQ到项目代码中。软件(阿里),其对应的许多技术文档还是比较容易看的,并且Github提供了许多的应用实例,所以使用MetaQ...