短連接
概念
client與server通過(guò)三次握手建立連接,client發(fā)送請(qǐng)求消息,server返回響應(yīng),一次連接就完成了。
這時(shí)候雙方任意都可以發(fā)起close操作,不過(guò)一般都是client先發(fā)起close操作。由于短連接一般只會(huì)在 client/server 間傳遞一次請(qǐng)求操作,因此短連接的特點(diǎn)是連接生命周期短暫,連接建立和斷開(kāi)的開(kāi)銷較大,適用于單次請(qǐng)求響應(yīng)的場(chǎng)景。
短連接的優(yōu)缺點(diǎn)
管理起來(lái)比較簡(jiǎn)單,存在的連接都是有用的連接,不需要額外的控制手段。
使用場(chǎng)景
通常情況下,當(dāng)瀏覽器訪問(wèn)服務(wù)器時(shí),采用的是短連接的方式。
對(duì)于服務(wù)端而言,長(zhǎng)連接會(huì)消耗大量的資源,而且用戶使用瀏覽器對(duì)服務(wù)端的訪問(wèn)頻率相對(duì)較低。如果同時(shí)存在幾十萬(wàn)甚至上百萬(wàn)的連接,則服務(wù)端的壓力將非常巨大,甚至可能導(dǎo)致崩潰。
因此,針對(duì)并發(fā)量高但請(qǐng)求頻率低的情況,建議使用短連接。
為了優(yōu)化這種情況,可以考慮以下方法:
1. 進(jìn)行連接池管理:使用連接池來(lái)管理與服務(wù)端的連接,避免每次請(qǐng)求都建立和關(guān)閉連接,減少資源的消耗。
2. 使用緩存機(jī)制:將一些不經(jīng)常變動(dòng)且占用資源較多的數(shù)據(jù)進(jìn)行緩存,減少對(duì)服務(wù)端的請(qǐng)求,提高性能。
3. 引入負(fù)載均衡:通過(guò)負(fù)載均衡技術(shù)將請(qǐng)求分發(fā)到多個(gè)服務(wù)器上,均衡服務(wù)器的壓力,提高整體的處理能力。
4. 優(yōu)化服務(wù)端架構(gòu):對(duì)服務(wù)端進(jìn)行優(yōu)化,如增加服務(wù)器的處理能力、調(diào)整服務(wù)器配置等,以提高服務(wù)端的并發(fā)處理能力。
長(zhǎng)連接
什么是長(zhǎng)連接
客戶端向服務(wù)器發(fā)起連接,服務(wù)器接受客戶端連接并建立雙方連接。
客戶端和服務(wù)器完成一次讀寫后,它們之間的連接不會(huì)主動(dòng)關(guān)閉,并可以繼續(xù)使用該連接進(jìn)行后續(xù)的讀寫操作。
長(zhǎng)連接的生命周期
在正常情況下,一條TCP長(zhǎng)連接建立后,只要雙方不提出關(guān)閉請(qǐng)求并且不出現(xiàn)異常情況,這條連接會(huì)一直存在。操作系統(tǒng)不會(huì)主動(dòng)關(guān)閉它,即使在經(jīng)過(guò)物理網(wǎng)絡(luò)拓?fù)涞母淖冎笕匀豢梢允褂?。因此,一條連接可以保持幾天、幾個(gè)月、幾年甚至更長(zhǎng)時(shí)間,只要沒(méi)有異常情況或用戶(應(yīng)用層)主動(dòng)關(guān)閉。
客戶端和服務(wù)端可以一直使用該連接進(jìn)行數(shù)據(jù)通信。
長(zhǎng)連接的優(yōu)點(diǎn)
使用長(zhǎng)連接可以減少TCP建立和關(guān)閉操作,從而減少網(wǎng)絡(luò)阻塞。即使發(fā)生錯(cuò)誤,也不需要關(guān)閉連接就能進(jìn)行提示,這樣可以減少CPU和內(nèi)存的使用,因?yàn)椴恍枰l繁地建立和關(guān)閉連接。
長(zhǎng)連接的缺點(diǎn)
連接數(shù)過(guò)多時(shí),影響服務(wù)端的性能和并發(fā)數(shù)量。
使用場(chǎng)景
數(shù)據(jù)庫(kù)的連接就是采用TCP長(zhǎng)連接.
RPC,遠(yuǎn)程服務(wù)調(diào)用,在服務(wù)器,一個(gè)服務(wù)進(jìn)程頻繁調(diào)用另一個(gè)服務(wù)進(jìn)程,可使用長(zhǎng)連接,減少連接花費(fèi)的時(shí)間。
總結(jié)
1.對(duì)于長(zhǎng)連接和短連接的使用是需要根據(jù)應(yīng)用場(chǎng)景來(lái)判斷的
2.長(zhǎng)連接并不是萬(wàn)能的,也是需要維護(hù)的,
長(zhǎng)連接的實(shí)現(xiàn)
心跳機(jī)制
應(yīng)用層協(xié)議通常會(huì)使用心跳機(jī)制來(lái)保持客戶端與服務(wù)器的連接,并確??蛻舳巳匀辉诰€。典型的心跳協(xié)議如IM協(xié)議(例如QQ、MSN、飛信)會(huì)定期發(fā)送數(shù)據(jù)包給服務(wù)器,同時(shí)傳輸一些可能必要的數(shù)據(jù)。
在TCP協(xié)議中,也有一個(gè)心跳機(jī)制,即TCP選項(xiàng)中的SO_KEEPALIVE。系統(tǒng)默認(rèn)設(shè)置為2小時(shí)發(fā)送一次心跳包。但是這個(gè)機(jī)制無(wú)法檢測(cè)機(jī)器斷電、網(wǎng)線拔出或防火墻等導(dǎo)致的斷線情況。此外,邏輯層處理斷線情況也可能不夠完善。通常情況下,如果只是用于?;钅康?,SO_KEEPALIVE機(jī)制仍然是可以接受的。
請(qǐng)注意以下優(yōu)化建議:
1. 調(diào)整心跳頻率:根據(jù)實(shí)際情況,可以根據(jù)應(yīng)用需求調(diào)整心跳頻率。太頻繁的心跳包可能造成額外的網(wǎng)絡(luò)負(fù)擔(dān),而太不頻繁則可能延遲檢測(cè)到斷線情況。
2. 使用應(yīng)用層心跳機(jī)制:考慮使用應(yīng)用層心跳機(jī)制,而不僅僅依賴于TCP的SO_KEEPALIVE。應(yīng)用層心跳機(jī)制能夠更靈活地處理不同情況下的斷線問(wèn)題,并能夠傳遞更多的必要數(shù)據(jù)。
3. 完善斷線處理邏輯:在應(yīng)用層實(shí)現(xiàn)斷線處理邏輯,包括重新連接、重發(fā)未成功的數(shù)據(jù)等。確保斷線后客戶端能夠盡快恢復(fù)連接,并保持?jǐn)?shù)據(jù)的完整性和一致性。
4. 測(cè)試和監(jiān)控:定期測(cè)試心跳機(jī)制的有效性,并監(jiān)控?cái)嗑€情況以及處理效果。及時(shí)發(fā)現(xiàn)并解決可能存在的問(wèn)題。
為什么需要心跳機(jī)制?
由于網(wǎng)絡(luò)的不可靠性,TCP長(zhǎng)連接可能會(huì)在某些突發(fā)情況下斷開(kāi),例如網(wǎng)線被拔出或突然掉電。在這種情況下,如果服務(wù)器和客戶端之間沒(méi)有交互,它們不能立即發(fā)現(xiàn)對(duì)方已掉線。為解決這個(gè)問(wèn)題,可以引入心跳機(jī)制。
TCP協(xié)議的KeepAlive機(jī)制
默認(rèn)KeepAlive狀態(tài)是不打開(kāi)的。
需要將setsockopt將SOL_SOCKET.SO_KEEPALIVE設(shè)置為1才是打開(kāi)KeepAlive狀態(tài),
并且可以設(shè)置三個(gè)參數(shù):
tcp_keepalive_time ,tcp_keepalive_probes , tcp_keepalive_intvl,
分別表示:連接閑置多久開(kāi)始發(fā)keepalive的ack包、發(fā)幾個(gè)ack包不回復(fù)才當(dāng)對(duì)方已斷線、兩個(gè)ack包之間的間隔。
很多網(wǎng)絡(luò)設(shè)備,尤其是NAT路由器,由于其硬件的限制(例如內(nèi)存、CPU處理能力),無(wú)法保持其上的所有連接,因此在必要的時(shí)候,會(huì)在連接池中選擇一些不活躍的連接踢掉。
典型做法是LRU,把最久沒(méi)有數(shù)據(jù)的連接給T掉。
通過(guò)使用TCP的KeepAlive機(jī)制(修改那個(gè)time參數(shù)),可以讓連接每隔一小段時(shí)間就產(chǎn)生一些ack包,以降低被踢掉的風(fēng)險(xiǎn),當(dāng)然,這樣的代價(jià)是額外的網(wǎng)絡(luò)和CPU負(fù)擔(dān)。
如何實(shí)現(xiàn)心跳機(jī)制?
兩種方式實(shí)現(xiàn)心跳機(jī)制:
- 使用 TCP 協(xié)議層面的 keepalive 機(jī)制.
- 在應(yīng)用層上實(shí)現(xiàn)自定義的心跳機(jī)制.
雖然在 TCP 協(xié)議層面上, 提供了 keepalive 保活機(jī)制, 但是使用它有幾個(gè)缺點(diǎn):
- 它不是 TCP 的標(biāo)準(zhǔn)協(xié)議, 并且是默認(rèn)關(guān)閉的.
- TCP keepalive 機(jī)制依賴于操作系統(tǒng)的實(shí)現(xiàn), 默認(rèn)的 keepalive 心跳時(shí)間是 兩個(gè)小時(shí), 并且對(duì) keepalive 的修改需要系統(tǒng)調(diào)用(或者修改系統(tǒng)配置), 靈活性不夠.
- TCP keepalive 與 TCP 協(xié)議綁定, 因此如果需要更換為 UDP 協(xié)議時(shí), keepalive 機(jī)制就失效了.
使用 TCP 層面的 keepalive 機(jī)制比自定義的應(yīng)用層心跳機(jī)制節(jié)省流量,
本文的主要介紹應(yīng)用層方面實(shí)現(xiàn)心跳機(jī)制,使用netty實(shí)現(xiàn)心跳和斷線重連。
netty實(shí)現(xiàn)心跳機(jī)制
netty對(duì)心跳機(jī)制提供了機(jī)制,實(shí)現(xiàn)的關(guān)鍵是IdleStateHandler先來(lái)看一下他的構(gòu)造函數(shù)
public IdleStateHandler(
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
實(shí)例化一個(gè) IdleStateHandler 需要提供三個(gè)參數(shù):
- readerIdleTimeSeconds, 讀超時(shí). 即當(dāng)在指定的時(shí)間間隔內(nèi)沒(méi)有從 Channel 讀取到數(shù)據(jù)時(shí), 會(huì)觸發(fā)一個(gè) READER_IDLE 的 IdleStateEvent 事件.
- writerIdleTimeSeconds, 寫超時(shí). 即當(dāng)在指定的時(shí)間間隔內(nèi)沒(méi)有數(shù)據(jù)寫入到 Channel 時(shí), 會(huì)觸發(fā)一個(gè) WRITER_IDLE 的 IdleStateEvent 事件.
- allIdleTimeSeconds, 讀和寫都超時(shí). 即當(dāng)在指定的時(shí)間間隔內(nèi)沒(méi)有讀并且寫操作時(shí), 會(huì)觸發(fā)一個(gè) ALL_IDLE 的 IdleStateEvent 事件.
netty心跳流程
1. 客戶端成功連接服務(wù)端。
2.在客戶端中的ChannelPipeline中加入IdleStateHandler,設(shè)置寫事件觸發(fā)事件為5s.
3.客戶端超過(guò)5s未寫數(shù)據(jù),觸發(fā)寫事件,向服務(wù)端發(fā)送心跳包,
4.同樣,服務(wù)端要對(duì)心跳包做出響應(yīng),其實(shí)給客戶端最好的回復(fù)就是“不回復(fù)”,減輕服務(wù)端的壓力
5.超過(guò)三次,1過(guò)0s服務(wù)端都會(huì)收到來(lái)自客戶端的心跳信息,服務(wù)端可以認(rèn)為客戶端掛了,可以close鏈路。
6.客戶端恢復(fù)正常,發(fā)現(xiàn)鏈路已斷,重新連接服務(wù)端。
代碼實(shí)現(xiàn)
服務(wù)端handler:
package com.heartbreak.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.util.Random;
/\*\*
\* @author janti
\* @date 2018/6/10 12:21
\*/
public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> {
// 失敗計(jì)數(shù)器:未收到client端發(fā)送的ping請(qǐng)求
private int unRecPingTimes = 0;
// 定義服務(wù)端沒(méi)有收到心跳消息的最大次數(shù)
private static final int MAX\_UN\_REC\_PING\_TIMES = 3;
private Random random = new Random(System.currentTimeMillis());
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
if (msg!=null && msg.equals("Heartbeat")){
System.out.println("客戶端"+ctx.channel().remoteAddress()+"--心跳信息--");
}else {
System.out.println("客戶端----請(qǐng)求消息----:"+msg);
String resp \= "商品的價(jià)格是:"+random.nextInt(1000);
ctx.writeAndFlush(resp);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event \= (IdleStateEvent) evt;
if (event.state()==IdleState.READER\_IDLE){
System.out.println("===服務(wù)端===(READER\_IDLE 讀超時(shí))");
// 失敗計(jì)數(shù)器次數(shù)大于等于3次的時(shí)候,關(guān)閉鏈接,等待client重連
if (unRecPingTimes >= MAX\_UN\_REC\_PING\_TIMES) {
System.out.println("===服務(wù)端===(讀超時(shí),關(guān)閉chanel)");
// 連續(xù)超過(guò)N次未收到client的ping消息,那么關(guān)閉該通道,等待client重連
ctx.close();
} else {
// 失敗計(jì)數(shù)器加1
unRecPingTimes++;
}
}else {
super.userEventTriggered(ctx,evt);
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
System.out.println("一個(gè)客戶端已連接");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.out.println("一個(gè)客戶端已斷開(kāi)連接");
}
}
服務(wù)端server:
package com.heartbreak.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.\*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/\*\*
\* @author tangj
\* @date 2018/6/10 10:46
\*/
public class HeartBeatServer {
private static int port = 9817;
public HeartBeatServer(int port) {
this.port = port;
}
ServerBootstrap bootstrap \= null;
ChannelFuture f;
// 檢測(cè)chanel是否接受過(guò)心跳數(shù)據(jù)時(shí)間間隔(單位秒)
private static final int READ\_WAIT\_SECONDS = 10;
public static void main(String args\[\]) {
HeartBeatServer heartBeatServer \= new HeartBeatServer(port);
heartBeatServer.startServer();
}
public void startServer() {
EventLoopGroup bossgroup \= new NioEventLoopGroup();
EventLoopGroup workergroup \= new NioEventLoopGroup();
try {
bootstrap \= new ServerBootstrap();
bootstrap.group(bossgroup, workergroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HeartBeatServerInitializer());
// 服務(wù)器綁定端口監(jiān)聽(tīng)
f = bootstrap.bind(port).sync();
System.out.println("server start ,port: "+port);
// 監(jiān)聽(tīng)服務(wù)器關(guān)閉監(jiān)聽(tīng),此方法會(huì)阻塞
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossgroup.shutdownGracefully();
workergroup.shutdownGracefully();
}
}
private class HeartBeatServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline \= ch.pipeline();
// 監(jiān)聽(tīng)讀操作,讀超時(shí)時(shí)間為5秒,超過(guò)5秒關(guān)閉channel;
pipeline.addLast("ping", new IdleStateHandler(READ\_WAIT\_SECONDS, 0, 0, TimeUnit.SECONDS));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new HeartbeatServerHandler());
}
}
}
客戶端handler
package com.heartbreak.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/\*\*
\* @author tangj
\* @date 2018/6/11 22:55
\*/
public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String>{
private HeartBeatClient client;
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd");
private static final ByteBuf HEARTBEAT\_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
CharsetUtil.UTF\_8));
public HeartBeatClientHandler(HeartBeatClient client) {
this.client = client;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("收到服務(wù)端回復(fù):"+msg);
if (msg.equals("Heartbeat")) {
ctx.write("has read message from server");
ctx.flush();
}
ReferenceCountUtil.release(msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state \= ((IdleStateEvent) evt).state();
if (state == IdleState.WRITER\_IDLE) {
ctx.writeAndFlush(HEARTBEAT\_SEQUENCE.duplicate());
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.err.println("客戶端與服務(wù)端斷開(kāi)連接,斷開(kāi)的時(shí)間為:"+format.format(new Date()));
// 定時(shí)線程 斷線重連
final EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(new Runnable() {
@Override
public void run() {
client.doConncet();
}
}, 10, TimeUnit.SECONDS);
}
}
客戶端啟動(dòng):
package com.heartbreak.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.\*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/\*\*
\* @author tangj
\* @date 2018/6/10 16:18
\*/
public class HeartBeatClient {
private Random random = new Random();
public Channel channel;
public Bootstrap bootstrap;
protected String host = "127.0.0.1";
protected int port = 9817;
public static void main(String args\[\]) throws Exception {
HeartBeatClient client \= new HeartBeatClient();
client.run();
client.sendData();
}
public void run() throws Exception {
EventLoopGroup group \= new NioEventLoopGroup();
try {
bootstrap \= new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new SimpleClientInitializer(HeartBeatClient.this));
doConncet();
} catch (Exception e) {
e.printStackTrace();
}
}
/\*\*
\* 發(fā)送數(shù)據(jù)
\* @throws Exception
\*/
public void sendData() throws Exception {
BufferedReader in \= new BufferedReader(new InputStreamReader(System.in));
while (true){
String cmd \= in.readLine();
switch (cmd){
case "close" :
channel.close();
break;
default:
channel.writeAndFlush(in.readLine());
break;
}
}
}
/\*\*
\* 連接服務(wù)端
\*/
public void doConncet() {
if (channel != null && channel.isActive()) {
return;
}
ChannelFuture channelFuture \= bootstrap.connect(host, port);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture futureListener) throws Exception {
if (channelFuture.isSuccess()) {
channel \= futureListener.channel();
System.out.println("connect server successfully");
} else {
System.out.println("Failed to connect to server, try connect after 10s");
futureListener.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
doConncet();
}
}, 10, TimeUnit.SECONDS);
}
}
});
}
private class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {
private HeartBeatClient client;
public SimpleClientInitializer(HeartBeatClient client) {
this.client = client;
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline \= socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 5, 0));
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("handler", new HeartBeatClientHandler(client));
}
}
}
運(yùn)行結(jié)果:
1.客戶端長(zhǎng)時(shí)間未發(fā)送心跳包,服務(wù)端關(guān)閉連接
server start ,port: 9817
一個(gè)客戶端已連接
\===服務(wù)端===(READER\_IDLE 讀超時(shí))
\===服務(wù)端===(READER\_IDLE 讀超時(shí))
\===服務(wù)端===(READER\_IDLE 讀超時(shí))
\===服務(wù)端===(READER\_IDLE 讀超時(shí))
\===服務(wù)端===(讀超時(shí),關(guān)閉chanel)
一個(gè)客戶端已斷開(kāi)連接
2.客戶端發(fā)送心跳包,服務(wù)端和客戶端保持心跳信息
一個(gè)客戶端已連接
客戶端/127.0.0.1:55436--心跳信息--
客戶端/127.0.0.1:55436--心跳信息--
客戶端/127.0.0.1:55436--心跳信息--
客戶端/127.0.0.1:55436--心跳信息--
3.服務(wù)單宕機(jī),斷開(kāi)連接,客戶端進(jìn)行重連
客戶端與服務(wù)端斷開(kāi)連接,斷開(kāi)的時(shí)間為:2018-06-12 23:47:12
Failed to connect to server, try connect after 10s
Failed to connect to server, try connect after 10s
Failed to connect to server, try connect after 10s
connect server successfully
代碼地址: