隨筆 - 365  文章 - 0  評論 - 893  0

加拿大快乐8预测开奖结果: 微言Netty:分布式服務框架

韩国快乐8开奖结果查询 www.wcdoq.com 1. 前言

幾年前,我就一直想著要設計一款自己的實時通訊框架,于是出來了TinySocket,她是基于微軟的SocketAsyncEventArgs來實現的,由于此類提供的功能很簡潔,所以當時自己實現了緩沖區處理,粘包拆包等,彼時的.net平臺還沒有一款成熟的即時通訊框架出來,所以當這款框架出來的時候,將當時公司的商業項目的核心競爭力提升至行業前三。但是后來隨著.net平臺上越來越多的即時通訊框架出來,TinySocket也是英雄暮年,經過了諸多版本迭代和諸多團隊經手,她不僅變得臃腫,而且也不符合潮流。整體的重構勢在必行了。但是我還在等,在等一款真正的即時通訊底層庫出來。

都說念念不忘,必有回響。通過不停的摸索后,我發現了netty這套底層通訊庫(對號入座,.net下對應的是dotnetty),憑借著之前的經驗,第一感覺這就是我要找尋的東西。后來寫了一些demo徹底印證了我的猜想,簡直是欣喜若狂,想著如果早點發現這個框架,也許就不會那么被動的踩坑了。就這樣,我算是開啟了自己的netty之旅。

微言netty系列,就是我的netty之旅的一些產出,它結合了我過往的經驗來產出一些對大家有用的東西,希望不會讓大家失望。

注:本文原理講解并非以某一種語言為主,但是對于具體場景分析,用的是Java,讀者可以類推到其他語言。同時本文并不提供源碼級別的原理性講解,如讀者有興趣,可以自行查找實踐。

2. 整體架構模型

言歸正傳,我們繼續netty之旅吧。

分布式服務框架,特點在于分布式,功能在于服務提供,目標在于即時通訊框架整合。由于其能夠讓服務端和客戶端進行解耦,讓調用方和被調用方處于網絡的兩端但是通訊毫無障礙,從而能夠擴充整體的業務規模。對于一些業務場景稍微大一些的公司,一般都會采用分布式服務框架。包括目前興起的微服務設計,更是讓分布式服務框架炙手可熱。那么我們今天的目標,就是來打造一款手寫的分布式服務框架TinyWhale,中文名巨小鯨(手寫作品,本文講解專用, 暫無更多精力打造成開源^_^),接下來讓我們開始吧。

說道目前比較流行的分布式服務框架,朗朗上口的有Dubbo,gRpc,Spring cloud等。這些框架無一例外都有著如下圖所示的整體架構模型:

3cd145d7-1bad-4740-ab97-5615b04e03c5

整體流程解釋如下:

1. 啟動注冊,指服務端開始啟動并將服務注冊到注冊中心中。注冊中心一般用zookeeper或者consul實現。

2. 啟動并監聽,指客戶端啟動并監聽注冊中心的服務列表。

3. 有變更則通知,指客戶端訂閱的服務列表發生改變,則將更新客戶端緩存。

4. 接口調用,指客戶端進行接口調用,此調用將首先會向服務端發起連接操作,然后進行鑒權,之后發起接口調用操作。

5. 客戶端數據監控,指監控端會監控客戶端的行為和數據并做記錄。

6. 服務端數據監控,指監控端會監控服務端的行為和數據并做記錄。

7. 數據分析并衍生出其他業務策略,指監控端會根據服務端和客戶端調用數據,來衍生出新的業務策略,比如熔斷,防刷,異地多活等。

當然,上面的流程是比較標準的分布式服務框架所涉及的環節。在實際設計過程中,可以根據具體的使用方式進行調整,比如監控端只監控服務端數據,因為客戶端我不用關心?;蛘嚦突Ф瞬簧柚梅竦刂妨斜砘捍?,每次調用前都從注冊中心重新獲取最新的服務地址列表等。

TinyWhale,由于設計的初衷是簡單,可靠,高性能,所以這里我們去除了監控端,所以流程5,流程6,流程7都會拿掉,如果有需要使用到監控端的,可以自行根據提供的接口來實現一套,這里將不再對監控端做過多的贅述。

3. 即時通訊框架設計涉及要素

編解碼設計

編解碼設計任何通訊類框架,編解碼處理是無法繞過的一個話題。因為網絡上只能流淌字節流,所以這種特性催生了很多的框架。由于這塊的工具非常多,諸如ProtoBuf,Marshalling,Msgpack等,所以喜歡用哪個,全憑喜好。這里我用使用ProtoStuff來作為我們的編解碼工具,原因有二:其一是易用性,無需編寫描述文件;其二是高性能,性能屬于T0級別梯隊。下面來具體看看吧:

首先看看我們的編解碼類:

a1c7ca52-390c-497a-964e-143e980963f7

其中serialize方法,用于將類對象編碼成字節數據,然后通過本機發送出去。而deserialize方法,則用于將緩沖區中的字節數據還原為類對象??悸塹繳杓頻募蚪嘈?,我這里并未抽象出一個公共的codecInterface和codecFactory來適配不同的編解碼工具,大家可以自行來進行設計和適配。

有了編解碼的輔助類了,如何集成到Netty中呢?

在Netty中,將對象編碼成字節數據的操作,我們可以使用已有的MessageToByteEncoder類來進行操作,繼承自此類,然后override encode方法,即可利用自己實現的protostuff工具類來進行編碼操作。

40907776-cbc0-4888-9c94-6f4e28fbdec2

同樣的,將字節數據解碼成對象的操作,我們可以使用已有的ByteToMessageDecoder類來進行操作,繼承自此類,然后override decode方法,即可利用自己實現的protostuff工具類來進行解碼操作。

粘包拆包設計

之前章節已經講過,我們直接拿來展示下。

粘包拆包,顧名思義,粘包,就是指數據包黏在一塊了;拆包,則是指完整的數據包被拆開了。由于TCP通訊過程中,會將數據包進行合并后再發出去,所以會有這兩種情況發生,但是UDP通訊則不會。下面我們以兩個數據包A,B來講解具體的粘包拆包過程:

bb0a0099-2e12-4191-be8a-1d4c031552be

第一種情況,A數據包和B數據包被分別接收且都是整包狀態,無粘包拆包情況發生,此種情況最佳。

f9b580f9-bd49-4dd0-b9e7-742e63f6276f

第二種情況,A數據包和B數據包在一塊兒且一起被接收,此種情況,即發生了粘包現象,需要進行數據包拆分處理。

a1f0cd86-ac4d-45a7-b79f-20c8dba93fed

第三種情況,A數據包和B數據包的一部分先被接收,然后收到B數據包的剩余部分,此種情況,即發生了拆包現象,即B數據包被拆分。

a1cfac00-7f70-4183-a57e-becbb95ac9e1

第四種情況,A數據包的一部分先被接收,然后收到A數據包的剩余部分和B數據包的完整部分,此種情況,即發生了拆包現象,即A數據包被拆分。

fd5757c1-63dd-4c8b-a28d-24c9c4b88fe9

第五種情況,也是最復雜的一種,先收到A數據包的部分,然后收到A數據包剩余部分和B數據包的一部分,最后收到B數據包的剩余部分,此種情況也發生了拆包現象。

至于為什么會發生這種問題,根本原因在于緩沖區中的數據,Server端不大可能一次性的全部發出去,Client端也不大可能一次性正好把數據全部接收完畢。所以針對這些發生了粘包或者拆包的數據,我們需要找到合適的手段來讓其形成整包,以便于進行業務處理。好消息是,Netty已經為我們準備了多種處理工具,我們只需要簡單的動動代碼,就可以了,他們分別是:LineBasedFrameDecoder,StringDecoder,LengthFieldBasedFrameDecoder,DelimiterBasedFrameDecoder,FixedLengthFrameDecoder。

由于上節中,我們講解了其大概用法,所以這里我們以LengthFieldBasedFrameDecoder來著重講解其使用方式。

LengthFieldBasedFrameDecoder:顧名思義,固定長度的粘包拆包器,此解碼器主要是通過消息頭部附帶的消息體的長度來進行粘包拆包操作的。由于其配置參數過多(maxFrameLength,lengthFieldOffset,lengthFieldLength,lengthAdjustment,initialBytesToStrip等),所以可以最大程度的保證能用消息體長度字段來進行消息的解碼操作。這些不同的配置參數可以組合出不同的粘包拆包處理效果,在此Rpc框架的設計過程中,我的使用方式如下:

be24597e-daed-4c17-bad8-7c78bd29bc63

是不是代碼很簡單?

翻閱LengthFieldBasedFrameDecoder源碼,實現原理一覽無余,由于網上講解足夠多,而且源碼中的講解也足夠詳細,所以這里不再做過多闡釋。具體的原理解釋可以看這里:LengthFieldBasedFrameDecoder

自定義協議設計

在進行網絡通訊的時候,數據包從一端傳輸到另一端,然后被解析,被消化。這里就涉及到一個知識點,數據包是怎樣定義的,才能讓另一方識別出數據包所代表的業務含義。這就涉及到自定義傳輸協議的設計,我們來看看具體怎么設計。

首先,我們需要明確自己定義的協議需要承載哪些業務數據,一般說來包含如下的業務要點:

1. 自定義協議需要讓雙端識別出哪些包是心跳包

2. 自定義協議需要讓雙端識別出哪些包是鑒權包

3. 自定義協議需要讓雙端識別出哪些包是具體的業務包

4. 自定義協議需要讓雙端識別出哪些包是上下線包等等(本條規則適用于IM系統)

不同的系統在設計的時候,自定義協議的設計是不一樣的,比如分布式服務框架,其業務包則需要包含客戶端調用了哪個方法,入參中傳入了哪些參數等。物聯網采集框架,其業務包則需要包含底層采集硬件上傳的數據中,哪些數值代表空氣溫度,哪些數值代表光照強度等。同樣的,IM系統則需要知道當前的聊天是誰發出的,想發給誰等等。正是由于不同系統承載的業務不同,所以導致自定義協議種類繁多,不一而足。性能表現也是錯落有致。復雜程度更是簡繁并舉。

那么針對要講解的分布式服務框架,我們來詳細看一下設計方式。

首先定義一個NettyMessage泛型類,此泛型類是一個基礎類,包含了會話ID,消息類型,消息體三個字段。這三個字段是服務端和客戶端進行數據交換過程中,必傳的三個字段,所以整體抽取出來,放到了這里。

98e12d4a-8932-4bd6-a2d0-420484b271e4

然后,針對客戶端,定義一個NettyRequest類,包含基本的請求ID,調用的類名稱,方法名稱,入參類型,入參值。

e7c0174d-2858-40e7-a103-462879edc247

最后,客戶端的請求傳送到服務端,服務端需要反射調用方法并將結果返回,服務端的NettyResponse類,則包含了請求ID,用于識別請求來自于哪個客戶端,error錯誤,result結果三個字段:

78f8dee2-8f31-4a91-b1e1-99b1bd3d4950

當服務端調用完畢,就會把結果封裝到此類中,然后將結果返回給客戶端,客戶端還原此類,即可拿出自己想要的數據來。

那么這個稍顯冗雜的自定義協議就設計完畢了,有人會問,心跳包用這個協議如何識別呢?其實直接實例化NettyMessage類,然后在其type字段中塞入心跳標記值即可,類似如下:

1b082d53-92e8-4fea-8fa3-420d80d3a5c0

而上下線包和鑒權包則也是類似的構造,不通點在于,鑒權包 可能需要往body屬性里面放一些鑒權用的用戶token等。

鑒權設計

顧名思義,就是進行客戶端登錄的認證操作。由于客戶端不是隨意就能連接上來的,所以需要對客戶端連接的合法性進行過濾操作,否則很容易造成各種業務或者非業務類的問題,比如數據被盜竊,服務器被壓垮等等。那么一般說來,如何進行鑒權設計呢?

e7876f6b-6221-4a28-9e57-2582a8d8e07e

可以看到,上面的鑒權??槔錈嬗腥鍪糶?,一個是已登錄的用戶列表clientList,一個是用戶白名單whiteIP,一個是用戶黑名單blackIP,在進行用戶認證的時候,會通過用戶token,白名單,黑名單做驗證。由于不同業務的認證方式不一樣,所以這里的設計方式也是五花八門。一般說來,分布式服務框架的認證方式依賴于token,也就是服務端的provider啟動的時候,會給當前服務分配一個token,客戶端進行請求的時候,需要附帶上這個token才能夠請求成功。由于我這里只是做演示效果,并未利用token進行驗證,實際設計的時候,可以附帶上token驗證即可。

心跳包設計

傳統的心跳包設計,基本上是服務端和客戶端同時維護Scheduler,然后雙方互相接收心跳包信息,然后重置雙方的上下線狀態表。此種心跳方式的設計,可以選擇在主線程上進行,也可以選擇在心跳線程中進行,由于在進行業務調用過程中,此種心跳包是沒有必要進行發送的,所以在一定程度上會造成資源浪費。嚴重的甚至會影響業務線程的操作。但是在netty中,心跳包的設計并非按照如上的方式來進行。而是通過檢測鏈路的空閑與否在進行的。鏈路分為讀操作空閑檢測,寫操作空閑檢測,讀寫操作空閑檢測。如果一段時間沒有收到客戶端的信息,則會觸發服務端發送心跳包到客戶端,稱為讀操作空閑檢測;如果一段時間沒有向客戶端發送任何消息,則稱為寫操作空閑檢測;如果一段時間服務端和客戶端沒有任何的交互行為,則稱為讀寫操作空閑檢測。由于空閑檢測本身只有在通道空閑的時候才進行檢測,而不是固定頻率的進行心跳包通訊,所以可以節省網絡帶寬,同時對業務的影響也很小。

那么就讓我們看看在netty中,怎么實現高效的心跳檢測吧。

在netty中,進行讀寫操作空閑檢測,需要引入IdleStateHandler類,然后需要我們實現自己的心跳處理Handler,具體設計方式如下:

首先,引入IdleStateHandler和服務端心跳處理Handler

fb267adc-dec0-4f36-9d35-055d0d960f0e

其中讀空閑檢測為45秒,寫空閑檢測為45秒,讀寫空閑檢測為120秒,也就是說,如果服務器45秒沒有收到客戶端發來的消息,就會觸發一個回調事件,另外兩個同理。具體觸發什么事件了呢?我們來看看服務端心跳處理Handler:HeartBeatResponseHandler

fe66f758-317f-456a-8ba0-1d83fcdc188d

可以看到,檢測到讀空閑,會調用processReadIdle方法來處理,我們進來看看具體處理方式:

62e323ee-003a-4aa2-a995-28649b91a5c8

可以看到,服務端發現一段時間沒收到客戶端消息后,就會主動給客戶端發一次心跳,確認客戶端是否存活。如果在第90秒內還沒有收到客戶端的回復心跳,則會嘗試再發一條,同時在客戶端上下線狀態表中,將當前客戶端的未響應次數加一;如果在第135后認為收到客戶端的回復心跳,則會嘗試重發一條,同時未響應次數再加一,當次數累積到三次的時候,則認為此客戶端掉線,此時將會踢掉此客戶端。如果是IM系統的話,此時服務端就可以將此客戶端的信息告知其他在線用戶掉線,這樣其他用戶就可以在自己的客戶端列表中刪掉掉線用戶。

至于processWriteIdle和processAllIdle方法,均是如上類似原理,至于需要處理,怎么去處理,均是業務自己定制,相當靈活。

很遺憾,在翻閱很多基于Netty的源碼中,并未發現此樣的實現方式,這也是相當可惜的。

斷線重連設計

在實際網絡通訊過程中,客戶端可能由于網絡原因未能及時的響應服務端的心跳請求,從而被服務端踢下線。之所有有這種機制,一方面是為了節省服務端資源,剔除死鏈接;另一方面則是出于業務要求,比如IM系統中,用戶掉線了,但是服務端沒有及時剔除,會導致其他用戶認為此用戶在線,從而可能造成誤解等。

那么就需要有一種機制來保證客戶端網絡掉線后,能夠及時的感知并進行重連,從而保證服務的可用性。之前我們介紹了心跳包,它是專門用來保持服務端和客戶端的通道連接保持的。假設當客戶端因為網絡原因,被服務端踢下線后,客戶端是無感知的,并不知道自己已經被服務端踢下線,所以這時候如果客戶端依舊向服務端發送數據,將會失敗。此時這就是斷線重連應該工作的地方了。具體設計如下:

75ddab25-f90a-45c7-bf11-d456fc6f1489

可以看到,我們依舊用了netty原生的IdleState類來檢測空閑通道。當客戶端一段時間沒收到服務端的消息,將會首先嘗試給服務端發送一次心跳,由于此時客戶端已經被服務端踢掉了,所以三次心跳均未獲得回應,此時,客戶端突然想明白了:“哦,我想我已經掉線了”。于是客戶端將會利用ctx對象進行服務端重連操作。

此種方式簡單易行,雖然不具有實時性,但是效果很好,可以有效地避免因為網絡抖動等未知原因導致的掉線問題。

以上幾種特性,是設計通信框架過程中,基本上都繞不開。雖然不同的通信框架由于承載的業務不同而造成設計上的差異,但是正是因為這些特性的存在,才能保證整個通信過程中的穩定性和可靠性。

接下來我們將焦點轉移到服務端和客戶端的設計上來。

先說說服務端和客戶端,基本上的通訊模型為,服務端bind本地端口,然后進行listen監聽??突Ф薱onnect服務端套接字,然后進行通訊。用netty打造的雙端,也繞不開這種通訊模型。其實如果讀者有過通信框架的設計經驗的話,將會對此十分熟悉。不過就通訊方式來說,也是很統一的,一般都是一端發送數據,另一端接收處理,然后看具體業務再決定需不需要返回數據回去。那么這里就涉及到一個要點,因為數據的返回有同步和異步之分,一般說來同步等待數據返回的性能要比異步獲取數據的性能要差一些,但是具體能差異多少,完全由設計者自己把握。

同步等待數據返回這塊,我就無需多說了,基本上就是如下示例代碼:

3c1d4c35-e7cd-4897-925e-760c2eed0521

異步獲取返回數據這塊,則設計上要復雜一些,因為設計方式是多種多樣的。有用雙Queue來做異步化(任務quque和應答queue), 有用Future來做異步化,當然也有用多線程來做異步化等。TinyWhale的異步化處理,采用的是后者,在客戶端講解那塊,將會做詳細的解釋。

再說說netty框架,由于其純異步化模型,所以獲取的各種結果對象基本上是各種Future,如果之前對這種模型接觸比較少的話,將會不太習慣netty的這種設計思維。具體的使用方式,將會在接下來的設計中進行詳細講解。

服務端設計

首先說道服務端,是指提供服務的一方,一般用來處理客戶端請求。由于netty這塊,已經將底層封裝的特別好,所以這里無需多余設計,只需要了解netty的異步模型即可。那么何為netty的異步模型呢?

既然說到了同步異步,那么不免就會提起阻塞非阻塞,我就說下個人的理解吧。同步異步的區別,個人認為,只要不是一個時間只能做一件事兒的,均可稱為異步。實現異步有多種方式,而多線程只是異步的一種實現方式而已。比如我們用兩個queue模擬生產消費行為,也可以稱之為異步。阻塞非阻塞的區別,個人認為,主要體現在對資源的爭搶等待上面,發生了資源爭搶等待,則被阻塞,反之為非阻塞。比如http請求遠程結果,阻塞等待等。個人意見,如有謬誤,還請指教。接下來讓我們進入正題。

首先要從同步阻塞模型說起。

同步阻塞 

相信大家都聽說過這個模型,客戶端請求到服務端,服務端里面有個Acceptor接收請求,然后針對每個請求都創建一個Thread來處理,典型的一對一通信處理方式??聰戮嚀宓哪P褪疽饌跡?/p>

32c59e26-a4ef-4c46-b192-8985c8fa5142

首先,客戶端請求達到Acceptor,Acceptor接收并處理,然后Acceptor為每個請求創建一個線程來處理。這樣后續的請求處理工作就在各自的線程上進行處理了。此種方式最簡便,代碼也非常好寫,但是帶來的問題就是一個請求對應一個線程,無法做到高性能,而且由于線程開銷較大,對服務器的穩定運行也有一定的影響,隨時都有可能出現內存耗盡,創建線程失敗等,最終的結果就是因為宕機等緣故造成生產問題。

由于上述問題,后來產生了偽異步處理模型,其實就是講Acceptor里面為每個請求分配一個線程,改成了線程池這種池化方式來處理,總體上性能比之前要好很多,而且機器運行也穩定很多,相對之前的模型,有了不小的提升。但是從本質上來將,此種方式和之前方式相比,并未有質的改變,之所以稱為偽異步,緣由在此吧。

非阻塞

同步阻塞模型由于性能不好,可靠性低,所以催生了非阻塞模型的產生。目前非阻塞模型有兩種,一種是NIO,另一種是AIO,然而AIO雖可以稱得上為真正的異步非阻塞IO模型,代碼也很簡便,但是并未大規模的應用,料想應該有它自身的短板,所以我們著重來講解NIO模型。首先來看看NIO模型示意圖:

938a5b17-bc55-45cb-be25-87a3b550e824

上面這幅圖是網上流傳比較廣的一幅圖,因為被大家所熟知,所以這里我就直接拿來用了,這幅圖的出處在這里。具體來看一下。

首先,從圖中可以看出,client為客戶端請求,mainReactor主要接收客戶端請求,然后調用acceptor進行處理,acceptor查到已經就緒的連接,則交由subReactor進行處理。subReactor這里會負責已連接客戶端的讀寫網絡操作,也就是如果有讀寫操作,會反映到subReactor中來,至于業務處理部分,則直接扔給ThreadPool進行業務處理。一般說來,subReactor的個數大概和CPU的核數是一致的。從這里還可以看出mainReactor和subReactor都有派發器的意味。

由于此NIO模型使用了事件驅動,而且以linux底層作為通訊支持,完全使用了epoll高性能的特點,所以整體表現堪稱完美。這里我要推薦一座金礦,大名鼎鼎的C10k問題,諸位看官如果有興趣,可以探索一番。

然后來具體說下服務端設計吧:

public class NettyServer {

    /**
     * 服務端帶參構造
     * @param serverAddress
     * @param serviceRegistry
     * @param serverBeans
     */
    public NettyServer(String serverAddress, ServerRegistry serviceRegistry, Map<String, Object> serverBeans) {
        this.serverAddress = serverAddress;
        this.serviceRegistry = serviceRegistry;
        this.serverBeans = serverBeans;
    }

    /**
     * 日志記錄
     */
    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

    /**
     * 服務端綁定地址
     */
    private String serverAddress;

    /**
     * 服務注冊
     */
    private ServerRegistry serviceRegistry;

    /**
     * 服務端加載的bean列表
     */
    private Map<String, Object> serverBeans;

    /**
     * 主事件池
     */
    private EventLoopGroup bossGroup = new NioEventLoopGroup();

    /**
     * 副事件池
     */
    private EventLoopGroup workerGroup = new NioEventLoopGroup();

    /**
     * 服務端通道
     */
    private Channel serverChannel;

    /**
     * 綁定本機監聽
     *
     * @throws Exception
     */
    public void bind() throws Exception {

        //啟動器
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //為Acceptor設置事件池,為客戶端接收設置事件池
        serverBootstrap.group(bossGroup, workerGroup)
                //工廠模式,創建NioServerSocketChannel類對象
                .channel(NioServerSocketChannel.class)
                //等待隊列大小
                .option(ChannelOption.SO_BACKLOG, 100)
                //地址復用
                .option(ChannelOption.SO_REUSEADDR, true)
                //開啟Nagle算法,
                //網絡好的時候:對響應要求比較高的業務,不建議開啟,比如玩游戲,鍵盤數據,鼠標響應等,需要實時呈現;
                //            對響應比較低的業務,建議開啟,可以有效減少小數據包傳輸。
                //網絡差的時候:不建議開啟,否則會導致整體效果更差。
                .option(ChannelOption.TCP_NODELAY, true)
                //日志記錄組件的level
                .handler(new LoggingHandler(LogLevel.INFO))
                //各種業務處理handler
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        //空閑檢測handler,用于檢測通道空閑狀態
                        channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(45, 45, 120));
                        //編碼器
                        channel.pipeline().addLast("nettyMessageDecoder", new NettyMessageDecoder(1024, 4, 4));
                        //解碼器
                        channel.pipeline().addLast("nettyMessageEncoder", new NettyMessageEncoder());
                        //心跳包業務處理,一般需要配置idleStateHandler一起使用
                        channel.pipeline().addLast("heartBeatHandler", new HeartBeatResponseHandler());
                        //服務端先進行鑒權,然后處理業務
                        channel.pipeline().addLast("loginAuthResponseHandler", new LoginAuthResponseHandler());
                        //業務處理handler
                        channel.pipeline().addLast("nettyHandler", new ServerHandler(serverBeans));
                    }
                });

        //獲取ip和端口
        String[] array = serverAddress.split(":");
        String host = array[0];
        int port = Integer.parseInt(array[1]);

        //綁定端口,同步等待成功
        ChannelFuture future = serverBootstrap.bind(host, port).sync();

        //注冊連接事件監聽器
        future.addListener(cfl -> {
            if (cfl.isSuccess()) {
                logger.info("服務端[" + host + ":" + port + "]已上線...");
                serverChannel = future.channel();
            }
        });

        //注冊關閉事件監聽器
        future.channel().closeFuture().addListener(cfl -> {
            //關閉服務端
            close();
            logger.info("服務端[" + host + ":" + port + "]已下線...");
        });

        //注冊服務地址
        if (serviceRegistry != null) {
            serviceRegistry.register(serverBeans.keySet(), host, port);
        }
    }

    /**
     * 關閉server
     */
    public void close() {
        //關閉套接字
        if(serverChannel!=null){
            serverChannel.close();
        }
        //關閉主線程組
        if (bossGroup != null) {
            bossGroup.shutdownGracefully();
        }
        //關閉副線程組
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
    }
}
服務端源碼

由于代碼做了具體的注釋,我這里就不針對性的進行解釋了。需要注意的是,當服務啟動之后,會注冊兩個監聽器,一個綁定時間監聽,一個關閉事件監聽,當事件被觸發的時候,會回調兩個事件內部的邏輯。最后服務端正常啟動,會被注冊到注冊中心中,以便于客戶端調用。需要注意的是,一般情況下,業務Handler最好和心跳包Handler等非業務性的Handler處理分開,避免業務高峰時期,因為心跳包等Handler的處理來耗費捉襟見肘的內存資源或者CPU資源等,造成服務器性能下降。來看一下ServerHandler的具體設計:

5ca2ea97-bcac-45bb-8c22-49ca6599a2c3

從這里可以看出,我們用了一個線程池來將業務處理進行池化,這樣做就不會受到心跳包等其他非業務處理Handler的影響,最大限度的保證系統的穩定性。

更多關于同步異步,阻塞非阻塞的設計,請參見Doug Lea:Scalable IO in Java

客戶端設計

再來說說客戶端,是指消費服務的一方,一般用來實現特定的消費業務。同樣的,netty這塊已經將底層封裝的很好,所以直接編寫業務即可。和編寫服務端不同的是,這里不需要分BossGroup和WorkerGroup,因為對于客戶端來說,只需要連接服務端,然后發送數據并監聽即可,不存在影響性能的問題。具體的寫法看看吧:

public class NettyClient {

    /**
     * 日志記錄
     */
    private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);

    /**
     * 客戶端請求Future列表
     */
    private Map<String, TinyWhaleFuture> clientFutures = new ConcurrentHashMap<>();


    /**
     * 客戶端業務處理handler
     */
    private ClientHandler clientHandler = new ClientHandler(clientFutures);

    /**
     * 事件池
     */
    private EventLoopGroup group = new NioEventLoopGroup();

    /**
     * 啟動器
     */
    private Bootstrap bootstrap = new Bootstrap();

    /**
     * 客戶端通道
     */
    private Channel clientChannel;

    /**
     * 客戶端連接
     * @param host
     * @param port
     * @throws InterruptedException
     */
    public NettyClient(String host, int port) throws InterruptedException {
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        //通道空閑檢測
                        channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(45, 45, 120));
                        //解碼器
                        channel.pipeline().addLast("nettyMessageDecoder", new NettyMessageDecoder(1024 * 1024, 4, 4));
                        //編碼器
                        channel.pipeline().addLast("nettyMessageEncoder", new NettyMessageEncoder());
                        //心跳處理
                        channel.pipeline().addLast("heartBeatHandler", new HeartBeatRequestHandler());
                        //業務處理
                        channel.pipeline().addLast("clientHandler", clientHandler);
                        //鑒權處理
                        channel.pipeline().addLast("loginAuthHandler", new LoginAuthRequestHandler());
                    }
                });

        //發起同步連接操作
        ChannelFuture channelFuture = bootstrap.connect(host, port);

        //注冊連接事件
        channelFuture.addListener((ChannelFutureListener)future -> {
            //如果連接成功
            if (future.isSuccess()) {
                logger.info("客戶端[" + channelFuture.channel().localAddress().toString() + "]已連接...");
                clientChannel = channelFuture.channel();
            }
            //如果連接失敗,嘗試重新連接
            else{
                logger.info("客戶端[" + channelFuture.channel().localAddress().toString() + "]連接失敗,重新連接中...");
                future.channel().close();
                bootstrap.connect(host, port);
            }
        });

        //注冊關閉事件
        channelFuture.channel().closeFuture().addListener(cfl -> {
            close();
            logger.info("客戶端[" + channelFuture.channel().localAddress().toString() + "]已斷開...");
        });
    }

    /**
     * 客戶端關閉
     */
    private void close() {
        //關閉客戶端套接字
        if(clientChannel!=null){
            clientChannel.close();
        }
        //關閉客戶端線程組
        if (group != null) {
            group.shutdownGracefully();
        }
    }

    /**
     * 客戶端發送消息,將獲取的Future句柄保存到clientFutures列表
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    public TinyWhaleFuture send(NettyMessage<NettyRequest> request) {
        TinyWhaleFuture rpcFuture = new TinyWhaleFuture(request);
        rpcFuture.addCallback(new TinyWhaleAsyncCallback() {
            @Override
            public void success(Object result) {
            }

            @Override
            public void fail(Exception e) {
                logger.error("發送失敗", e);
            }
        });
        clientFutures.put(request.getBody().getRequestId(), rpcFuture);
        clientHandler.sendMessage(request);
        return rpcFuture;
    }
}
客戶端源碼

由于代碼中,我也做了諸多的注釋,所以這里不再一一解釋。需要注意的是,和編寫服務端類似,我這里添加了兩個監聽事件,監聽連接成功事件,監聽關閉事件,響應的業務場景如果觸發了這兩個事件,將會執行事件內部的邏輯。

這里需要提一下消息發送的場景。一般說來,客戶端向服務端發送數據,然后服務端處理功能后返回給客戶端,客戶端接收到消息后再進行后續處理。這個流程一般有兩種實現方式,一種是同步的實現方式,另一種是異步的實現方式,具體來呈現以下:

首先是同步實現方式,顧名思義,客戶端發送數據給服務端,服務端在處理完畢并返回數據之前,客戶端一直處于阻塞等待狀態,send方法的代碼設計如下:

e0387579-7276-4d74-bd83-31c1d974ede2

來看看clientHandler里面的sendMessage方法:

4b2d29fd-55fa-4159-873c-0a9ea9cd553a

在開始發送之前,我們先拿到當前ctx的promise句柄,然后將數據寫入到緩沖區,最后將此句柄返回給send方法,send方法接收到此句柄后,將會等待promise執行完畢,如何判斷promise執行完畢呢?當客戶端接收到服務端返回,就可以將promise置為完成狀態:

677e5ed7-63f4-4e75-a7bd-763304e1dcd4

可以看到,通過重置promise的setSuccess方法,即可將promise置為完成態,這樣操作之后,send方法里面就可以正常的拿到數據并返回了。否則將會一直處于阻塞狀態。

可以看到,在netty中實現阻塞的方式來接收服務端返回,處理起來還是挺麻煩的,根本原因在于netty完全異步化的模型,所以只能用如上的方式來進行同步化處理。

再來說說異步化處理吧, 這也是netty很推崇的方式。

首先來看看send方法:

9d017228-d265-4b58-b735-9618856758a5

從上面代碼中可以看到,當我們將消息發送出去后,會立即獲得一個TinyWhaleFuture的句柄,不會再有阻塞等待的場景。我們看看clientHandler.sendMessage的具體實現:

2974eedd-4e1c-46c0-96a7-1735a4ad1916

可以看到,只是單純的將數據推送到緩沖區而已。

還記得我們的TinyWhaleFuture句柄嗎?既然返回給我們了這個句柄,那么我們肯定是可以從此句柄中取出我們想要獲取的數據的,我們看看客戶端如果收到服務端的返回結果,該如何處理呢?

50149181-14be-4cce-9fdc-1393e0cd70b3

可以看到,這里利用了一個Map來保存用戶每個發送請求,一旦當服務端返回數據后,就會將請求置為完成態,同時從Map中將已完成的操作刪掉。這樣,客戶端拿到TinyWhaleFuture句柄后,通過提供的get方法即可在想獲取結果的地方來獲取返回結果。這樣做,是不會阻塞其他業務執行的。

其實不僅僅是netty中,在設計其他框架的時候,也可以利用此思想來實現真正意義上的異步執行邏輯。當然,能夠實現這種執行邏輯的方式有很多種,至于更好的實現方式,還請君細細斟酌吧,這里只起到拋磚引玉的作用。

4. 動態調用設計

服務注冊和服務發現

先來上個大致的類設計圖,ServerCache接口提供基礎的本地緩存操作;ServerBase提供基礎的連接注冊中心,關閉注冊中心連接操作;ServerRegistry為服務注冊類;ServerDiscovery為服務發現類,下面是類UML圖,我們來具體的說一說:

33f8e329-fe57-4223-8fb9-8e61865ce163

首先是注冊中心,這個就不必說了,一般都是使用zookeeper或者consul等框架來實現,這里我們使用zookeeper。但是我們這里并不是用原生的zookeeper sdk來操作,而是使用curator來操作,curator是什么呢?在其介紹頁面有句很經典的話:Guava is to Java what Curator is to Zookeeper,相當的簡潔明了吧。來看下具體的使用方式吧。

首先定義用于加載注冊中心服務套接字的共享緩存,客戶端啟動的時候,此共享緩存會從注冊中心拉取服務器列表到本地保存:

1bbffe83-acca-4c28-962b-20c9ea286b3a

然后,定義服務治理的公共操作類:

8074c8cc-fb0c-44ca-9941-95ba65d32c19

可以看到,此基類中,open方法和close方法,用于連接zk服務器,關閉和zk服務器的連接。之后便是對接口中操作本地緩存的實現。

由于服務治理這塊包含了服務注冊和服務發現功能,所以這里,我們分別定義ServerRegistry類和ServerDiscovery類來進行處理。

ServerRegistry類,顧名思義,表示服務注冊,也就是當我們的服務端啟動之后,綁定了本機端口之后,會將承載的服務注冊到zk中。

44220ee6-9d3b-44af-b06f-546ea2771065

ServerDiscovery類,顧名思義,服務發現,那么此類中的discovery方法則就是根據用戶傳入的接口名稱來找到對應的服務器,然后將結果返回。需要注意的是,服務發現的過程,需要涉及到負載均衡,之所以涉及到這個,主要是為了讓每臺服務器收到的請求均勻一些,以達到均衡的目的,這樣就不會因為請求打的不均勻導致有些服務器負載太大,有些服務器負載幾乎沒有的情況。負載均衡,我將在后面的章節講解,先繼續看看服務發現這塊:

43f13e2c-9f5e-4bd5-89a9-8998f9f92923

可以看到,我用了一個watchNode方法來檢測節點的改動,此方法內部設置了一個Listener,只要有節點的改動,都會推送到此Listener中,然后我就可以根據改動的類型來決定是否對本地緩存進行更新操作。

更具體的服務注冊和服務發現使用方式,可以參考curator官網:Service register and Service discovery

負載均衡

前面說到了服務治理這塊,由于里面涉及到負載均衡這塊,這里就詳細說一下。

一般說來,有三種負載均衡模型是繞不開的,分別是一致性哈希,此模型可以讓帶有業務標記的請求每次請求都會導向到指定的服務器上。輪詢,此模型主要是對服務器列表進行順序訪問。隨機,此模型主要是隨機獲取服務器并返回。其他的模型還有很多,可以根據具體的業務進行衍生,這里不做一一的展示。

首先來看看負載均衡基類:

6db1acf2-c315-48a9-b11a-599193ee965b

然后看看三種模型的實現:

一致性哈希實現,直接對服務端的size進行取余操作:

92cd52ab-0d2d-44ec-960a-0cbd383eb652

輪詢實現,對訪問過的服務器進行計數累加,然后把此計數作為下標并獲取元素返回:

f3915660-1dbe-4346-9fb8-e2592e58be9c

隨機實現,對服務器進行隨機選?。?/p>

9fc58dcf-bcd2-4458-96b3-080c7ad5a66a

你也許會問,為什么你設計的負載均衡里面沒有權重操作呢?其實如果愿意,也是可以加上權重操作的,這樣就會衍生出來其他的負載均衡模型,比如服務訪問不同,權重-10,服務能訪問通,權重+1,這樣就可以通過權重,選取一些權重較高的服務器優先返回,而對那些權重較低的服務器,可以少分一些請求,讓其慢慢恢復到正常狀態之后,再多分配一些請求過來等等。

總之,你可以在此基礎上進行自己的設計,但是大體思想就是讓服務器獲得的負載越均衡越好。

容災處理

此處整合Hystrix進行的設計,可以對請求做FailFast處理,RetryOnece處理,RetryTwice處理等, 具體細節可以翻看Hystrix設計即可。這里就不詳解(哈哈哈,其實是因為寫著寫著,寫的懶了,這塊就不想講了,畢竟基本上都是Hystrix那套)。

反射調用

最后要說的部分就是反射調用這塊了。我們知道,當客戶端發送待調用的方法發送給服務端,服務端接收后,需要通過反射調用方法,然后將結果返回給客戶端。首先來看看服務端業務處理Handler:

c04f7664-506d-470b-b2cd-56d1d934ae6a

可以看到,此業務處理handler會讀取客戶端的請求,然后分析數據包內容,最后利用反射來調用相應的方法獲取結果并壓入緩沖區中,之后發送給客戶端。

再來看看handle方法是如何進行反射調用并得到結果的:

a3e6bb50-4421-43c6-be3b-413cec9424f8

可以看到,很經典的反射調用場景,這里就不細說了。

從這里,我們可以看出,服務端的處理方式如上,非常的簡單。但是客戶端是怎么發送請求消息給服務端,又是如何接收服務端的返回數據的呢?

3acc13f4-28cc-4de4-965d-336074b6a6f5

從上面可以看出,我們用了javassist組件的反射(java自身的反射也是類似的使用方式)來構建完整的類對象,然后利用callback回調來發送請求給服務端獲取數據,然后獲取服務端返回的數據,最后將返回的數據拆解后,返回給客戶端。如果用java自帶的反射來實現,編碼也是差不多的:

00ff2b8e-0261-4cc4-896e-ab7ea42ee7a1

這里需要注意的是,此處用了動態反射的功能來實現,性能并不是特別好,如果能用上字節碼技術,性能會再提升一個臺階。具體的字節碼實現方式,可以參見我后續的文章。

5. 跑起來吧??!

好了,我們終于把一切都準備好了,那么就讓我們運行起來吧。

在服務端,首先可以看到如下的注冊中心上線日志:

068cefb2-70c4-44ac-8176-17e7d7b48963

然后可以看到客戶端登錄日志:

57981f83-d3a9-429b-a956-75c7ab4a8861

在客戶端,我們可以看到如下的日志:

2f6f67b2-4a52-4d78-b235-c9381fe23cdf

可以看到,客戶端連接上來后,先發送鑒權請求,鑒權成功后,將會發送服務調用請求,調用完畢后,得到返回結果,整個過程耗費18ms,最后客戶端退出。

當我們在客戶端調用的時候,加上Thread.Sleep來觸發心跳探活,可以看到如下的檢測結果:

b2eca78b-c48b-4a06-b5f3-a5b7b3b81da7

可以看到,每隔5秒鐘,我們都能收到客戶端的心跳,然后我們模擬網絡差,客戶端掉線,看看服務端如何檢測:

ee8a82c9-902e-4d54-9b8f-2ee0727ceabf

可以看到客戶端被踢掉了,此時我們再去看看客戶端日志,可以看出來,客戶端確實被服務端踢掉線了:

d7498e9b-8f60-4d72-b52b-7596b126c8f6

最后,東西做完后,補一個benchmark吧,由于我的機器性能比較差,而且測試是直接開啟IDEA這個IDE來測試的,所以性能并不見得很好:

57366a4e-c94a-41b7-a8dc-9224f7c71654

然后來看看benchmark結果吧:

0f003fc7-3e98-4488-8037-6bf5dc50c02a

性能并不是特別好,關鍵有以下幾個地方是耗時大戶:編解碼,反射,同步等待服務端返回

編解碼這個只能找性能比較好的組件來解決

反射可以通過字節碼來實現,性能會再提升一個檔次,但是難度也會提升不少。

同步等待服務返回,可以通過完全異步化實現來解決,那么剛剛展示的

a5b71565-ff20-4f5f-bafb-d4271f48a0c0

調用方式,會被改變成:

71bc7cea-5640-411b-a8a9-a75ac57ce63f

雖然這樣速度會快很多,但是用戶能否接受這種調用方式,則是另一個頭疼的問題。性能和易用,本身就具有相悖性,所以只能在進退之間做平衡了。

寫到這里,整體介紹差不多了,但是還有很多東西沒有接入,譬如說kafka,mq,redis等。如果能把這些東西接入,則會讓其整體顯得更加豐滿,同時功能也更豐富,應用場景也會更廣闊一些。

6.總結

寫到這里,利用netty打造分布式服務框架的要點就基本上完結了。通篇看來,知識點很多,但是都是我們耳熟能詳的東西,能把它們串在一起,組成一個可以用的框架,則需要一定的思考。

文中所有內容基本上為原創,如需轉載,請標明 轉載自博客園程序詩人 字樣,算是對本家付出的辛苦的一點尊重吧。

posted on 2019-06-06 11:07 程序詩人 閱讀(...) 評論(...) 編輯 收藏