`
liudunxu2
  • 浏览: 30662 次
  • 性别: Icon_minigender_1
  • 来自: 青岛
文章分类
社区版块
存档分类
最新评论

metaq的客户端自动断线重连机制

 
阅读更多
1.metaq的数据传输基于gecko
2.metaq的RemotingClientWrapper是gecko的RemotingClient的包装类,通过ConcurrentHashMap<String/* url */, Set<Object>/* references */>成员变量添加了连接的建立和关闭计数功能。
3.remoteclient的连接语句如下:
for (int i = 0; i < connCount; i++) { 
            try { 
                final TimerRef timerRef = new TimerRef(((ClientConfig) this.config).getConnectTimeout(), null); 
                final Future<NioSession> future = 
                        ((GeckoTCPConnectorController) this.controller).connect(remoteAddress, groupSet, remoteAddress, 
                            timerRef); 
                final CheckConnectFutureRunner runnable = 
                        new CheckConnectFutureRunner(future, remoteAddress, groupSet, this); 
                timerRef.setRunnable(runnable); 
                this.insertTimer(timerRef); 
            } 
            catch (final Exception e) { 
                log.error("连接" + RemotingUtils.getAddrString(remoteAddress) + "失败,启动重连任务", e); 
                this.reconnectManager.addReconnectTask(new ReconnectTask(groupSet, remoteAddress)); 
            } 
        } 

在连接建立时,会新建一个定时任务,进行连接检测,连接的代码如下:
 @Override 
        public void run() { 
            try { 
                if (!this.future.isDone() && this.future.get(10, TimeUnit.MILLISECONDS) == null) { 
                    final ReconnectManager reconnectManager = this.remotingClient.getReconnectManager(); 
                    reconnectManager.addReconnectTask(new ReconnectTask(this.groupSet, this.remoteAddress)); 
                } 
            } 
            catch (final Exception e) { 
                log.error("连接" + this.remoteAddress + "失败", e); 
            } 
        }
插入到TimerRefQueue对象中,reactor类实例使用访问者模式遍历queue对象,实现重连调用,代码如下:
  private void processMoveTimer() { 
        final long now = this.getTime(); 
        // 距离上一次检测时间超过1秒 
        if (now - this.lastMoveTimestamp >= TIMEOUT_THRESOLD && !this.timerQueue.isEmpty()) { 
            this.lastMoveTimestamp = now; 
            this.timerQueue.iterateQueue(new TimerQueueVisitor(now)); 
        } 
    }
如果需要重连,插入重连任务队列LinkedBlockingQueue<ReconnectTask>tasks,进行重连,代码如下:
 private void doReconnectTask(final ReconnectTask task) throws IOException, NotifyRemotingException { 
            log.info("Try to reconnect to " + RemotingUtils.getAddrString(task.getRemoteAddress())); 
            final TimerRef timerRef = new TimerRef(ReconnectManager.this.clientConfig.getConnectTimeout(), null); 
            try { 
                final Future<NioSession> future = 
                        ReconnectManager.this.connector.connect(task.getRemoteAddress(), task.getGroupSet(), 
                            task.getRemoteAddress(), timerRef); 
                final DefaultRemotingClient.CheckConnectFutureRunner runnable = 
                        new DefaultRemotingClient.CheckConnectFutureRunner(future, task.getRemoteAddress(), 
                            task.getGroupSet(), ReconnectManager.this.remotingClient); 
                timerRef.setRunnable(runnable); 
                ReconnectManager.this.remotingClient.insertTimer(timerRef); 
                // 标记这个任务完成 
                task.setDone(true); 
            } 
            catch (final Exception e) { 
                this.readdTask(task); 
            } 
        }


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics