net.go 8.3 KB


  1. package gnet
  2. import (
  3. "context"
  4. "errors"
  5. "math/rand/v2"
  6. "net"
  7. "sync"
  8. "time"
  9. )
  10. const (
  11. ClientReadTimeout = 10 * time.Second
  12. ClientWriteTimeout = 5 * time.Second
  13. )
  14. const (
  15. ServerReadTimeout = 60 * time.Second
  16. ServerWriteTimeout = 5 * time.Second
  17. )
  18. const (
  19. IdleTime = 1 * time.Second
  20. )
  21. const (
  22. DialTimeout = 10 * time.Second
  23. )
  24. const (
  25. MaxBuffSize = 4096
  26. )
  27. var (
  28. // ErrConnNotFound 连接不存在
  29. ErrConnNotFound = errors.New("gnet: connection not found")
  30. // ErrWaitingResponse 等待远程主机响应
  31. ErrWaitingResponse = errors.New("gnet: waiting for response from remote host")
  32. // ErrUnconnected 未能连接到远程主机; 用于开启 Config.Reconnect 后且需要告知上层逻辑真实的连接情况时使用
  33. ErrUnconnected = errors.New("gnet: connection not connected")
  34. )
  35. type Timeout struct {
  36. Msg string
  37. }
  38. func (t *Timeout) Timeout() bool { return true }
  39. func (t *Timeout) Error() string {
  40. if t.Msg == "" {
  41. return "network: timeout"
  42. }
  43. return t.Msg
  44. }
  45. // ReadMultiplexer 读取复用
  46. type ReadMultiplexer interface {
  47. // ReadMux 将读取的数据存储至内部切片中, b 则是内部切片的指针引用. ReadMux 被调用时, 总是会清除上一次保存的数据. 即你需要将 b 使用完毕
  48. // 以后再调用, 否则数据将会被覆盖.
  49. ReadMux() (b []byte, err error)
  50. }
  51. // Config 连接配置
  52. // 当任意Timeout未设定时则表示无超时
  53. type Config struct {
  54. ReadTimeout time.Duration
  55. WriteTimeout time.Duration
  56. Timeout time.Duration // Read and Write
  57. DialTimeout time.Duration
  58. Reconnect bool // Reconnect 自动重连. 仅用于客户端
  59. IgnoreError bool // IgnoreError 忽略首次连接时失败的错误, 用于 Reconnect 启用时. 仅用于客户端
  60. MuxBuff int // ReadMultiplexer.ReadMux Only
  61. }
  62. func (c *Config) Client() *Config {
  63. c.ReadTimeout = ClientReadTimeout
  64. c.WriteTimeout = ClientWriteTimeout
  65. c.DialTimeout = DialTimeout
  66. return c
  67. }
  68. func (c *Config) Server() *Config {
  69. c.ReadTimeout = ServerReadTimeout
  70. c.WriteTimeout = ServerWriteTimeout
  71. return c
  72. }
  73. type ConnStat interface {
  74. IsConnected() bool
  75. IsClosed() bool
  76. }
  77. func optimizationConn(conn net.Conn) net.Conn {
  78. if tcp, ok := conn.(*net.TCPConn); ok {
  79. _ = tcp.SetNoDelay(true)
  80. _ = tcp.SetKeepAlive(true)
  81. _ = tcp.SetKeepAlivePeriod(5 * time.Second)
  82. }
  83. return conn
  84. }
  85. type tcpAliveConn struct {
  86. address string
  87. net.Conn
  88. Config *Config
  89. buf []byte
  90. mu sync.Mutex
  91. handing bool
  92. closed bool
  93. }
  94. func (t *tcpAliveConn) IsConnected() bool {
  95. if t.Conn == nil {
  96. return false
  97. }
  98. if t.handing || t.closed {
  99. return false
  100. }
  101. return true
  102. }
  103. func (t *tcpAliveConn) IsClosed() bool {
  104. return t.closed
  105. }
  106. // hasAvailableNetFace
  107. // 检查当前操作系统中是否存在可用的网卡, 无可用的网卡时挂起重连操作
  108. // 修复部分操作系统(Windows)休眠后网卡状态异常导致 net.DialTimeout 锥栈溢出(然后panic)的问题
  109. func (t *tcpAliveConn) hasAvailableNetFace() bool {
  110. ift, err := net.Interfaces()
  111. if err != nil {
  112. return false
  113. }
  114. i := 0
  115. for _, ifi := range ift {
  116. // FlagUp 网线插入, FlagLoopback 本机循环网卡 FlagRunning 活动的网卡
  117. if ifi.Flags&net.FlagUp != 0 && ifi.Flags&net.FlagLoopback == 0 && ifi.Flags&net.FlagRunning != 0 {
  118. i++
  119. }
  120. }
  121. return i > 0
  122. }
  123. func (t *tcpAliveConn) Dial(address string, timeout time.Duration) (net.Conn, error) {
  124. tcpConn, err := net.DialTimeout("tcp", address, timeout)
  125. if err != nil {
  126. return nil, err
  127. }
  128. return optimizationConn(tcpConn), nil
  129. }
  130. func (t *tcpAliveConn) handleAlive() {
  131. if t.closed || t.handing {
  132. return
  133. }
  134. if !t.Config.Reconnect {
  135. _ = t.Close() // 如果未开启重连, 出现任何错误时都会主动关闭连接
  136. return
  137. }
  138. t.handing = true
  139. if t.Conn != nil {
  140. _ = t.Conn.Close() // 关掉旧的连接
  141. }
  142. for !t.closed {
  143. if !t.hasAvailableNetFace() {
  144. time.Sleep(DialTimeout)
  145. continue
  146. }
  147. conn, err := t.Dial(t.address, t.Config.DialTimeout)
  148. if err != nil {
  149. time.Sleep(DialTimeout)
  150. continue
  151. }
  152. t.mu.Lock()
  153. t.Conn = conn
  154. t.mu.Unlock()
  155. break
  156. }
  157. if t.closed { // 当连接被主动关闭时
  158. if t.Conn != nil {
  159. _ = t.Conn.Close() // 即使重连上也关闭
  160. }
  161. }
  162. t.handing = false
  163. }
  164. func (t *tcpAliveConn) handleErr(err error) error {
  165. if err == nil {
  166. return nil
  167. }
  168. if !t.Config.Reconnect || t.closed {
  169. return err
  170. }
  171. // 延迟后返回. 通常上层代码在 for 循环中调用 Read/Write. 如果重连期间的调用响应过快, 则会导致上层日志写入频繁
  172. // 如果已主动调用 Close 则保持不变
  173. t.randSleep()
  174. return &Timeout{Msg: err.Error()}
  175. }
  176. func (t *tcpAliveConn) randSleep() {
  177. minSleep := 900
  178. maxSleep := 3100
  179. randSleep := rand.IntN(maxSleep-minSleep) + minSleep
  180. time.Sleep(time.Duration(randSleep) * time.Millisecond)
  181. }
  182. func (t *tcpAliveConn) setReadTimeout() (err error) {
  183. if t.Config == nil {
  184. return
  185. }
  186. if t.Config.ReadTimeout > 0 {
  187. return t.Conn.SetReadDeadline(time.Now().Add(t.Config.ReadTimeout))
  188. }
  189. if t.Config.Timeout > 0 {
  190. return t.Conn.SetReadDeadline(time.Now().Add(t.Config.Timeout))
  191. }
  192. return
  193. }
  194. func (t *tcpAliveConn) setWriteTimeout() (err error) {
  195. if t.Config == nil {
  196. return
  197. }
  198. if t.Config.WriteTimeout > 0 {
  199. return t.Conn.SetWriteDeadline(time.Now().Add(t.Config.WriteTimeout))
  200. }
  201. if t.Config.Timeout > 0 {
  202. return t.Conn.SetWriteDeadline(time.Now().Add(t.Config.Timeout))
  203. }
  204. return
  205. }
  206. func (t *tcpAliveConn) Read(b []byte) (n int, err error) {
  207. t.mu.Lock()
  208. defer t.mu.Unlock()
  209. if err = t.setReadTimeout(); err != nil {
  210. return
  211. }
  212. if t.Conn == nil {
  213. return 0, t.handleErr(ErrWaitingResponse)
  214. }
  215. n, err = t.Conn.Read(b)
  216. if err != nil {
  217. go t.handleAlive()
  218. }
  219. return n, t.handleErr(err)
  220. }
  221. func (t *tcpAliveConn) Write(b []byte) (n int, err error) {
  222. t.mu.Lock()
  223. defer t.mu.Unlock()
  224. if err = t.setWriteTimeout(); err != nil {
  225. return
  226. }
  227. if t.Conn == nil {
  228. return 0, t.handleErr(ErrWaitingResponse)
  229. }
  230. n, err = t.Conn.Write(b)
  231. if err != nil {
  232. go t.handleAlive()
  233. }
  234. return n, t.handleErr(err)
  235. }
  236. func (t *tcpAliveConn) Close() error {
  237. if t.closed {
  238. return nil
  239. }
  240. t.closed = true
  241. var err error
  242. if t.Conn != nil {
  243. err = t.Conn.Close()
  244. }
  245. t.buf = nil
  246. t.Conn = nil
  247. return err
  248. }
  249. func (t *tcpAliveConn) ReadMux() (b []byte, err error) {
  250. if len(t.buf) == 0 {
  251. bufSize := t.Config.MuxBuff
  252. if bufSize <= 0 {
  253. bufSize = MaxBuffSize
  254. }
  255. t.buf = make([]byte, bufSize)
  256. }
  257. clear(t.buf)
  258. n, err := t.Read(t.buf)
  259. if err != nil {
  260. return nil, err
  261. }
  262. return t.buf[:n], nil
  263. }
  264. func DialTCP(address string) (net.Conn, error) {
  265. return DialTCPConfig(address, nil)
  266. }
  267. func DialTCPConfig(address string, config *Config) (net.Conn, error) {
  268. if _, err := net.ResolveTCPAddr("tcp", address); err != nil {
  269. return nil, err
  270. }
  271. if config == nil {
  272. config = (&Config{}).Client()
  273. }
  274. if config.DialTimeout <= 0 {
  275. config.DialTimeout = DialTimeout
  276. }
  277. if config.Reconnect && config.IgnoreError {
  278. conn := &tcpAliveConn{
  279. address: address,
  280. Conn: nil,
  281. Config: config,
  282. }
  283. go conn.handleAlive()
  284. return conn, nil
  285. }
  286. tcpConn, err := net.DialTimeout("tcp", address, config.DialTimeout)
  287. if err != nil {
  288. return nil, err
  289. }
  290. conn := &tcpAliveConn{
  291. address: address,
  292. Conn: optimizationConn(tcpConn),
  293. Config: config,
  294. }
  295. return conn, nil
  296. }
  297. func ReadWithContext(ctx context.Context, conn net.Conn, b []byte) (n int, err error) {
  298. done := make(chan struct{})
  299. stop := context.AfterFunc(ctx, func() {
  300. _ = conn.SetReadDeadline(time.Now())
  301. close(done)
  302. })
  303. n, err = conn.Read(b)
  304. if !stop() {
  305. <-done
  306. _ = conn.SetReadDeadline(time.Time{})
  307. if err == nil {
  308. err = ctx.Err()
  309. }
  310. return n, err
  311. }
  312. return n, err
  313. }
  314. func WriteWithContext(ctx context.Context, conn net.Conn, b []byte) (n int, err error) {
  315. done := make(chan struct{})
  316. stop := context.AfterFunc(ctx, func() {
  317. _ = conn.SetWriteDeadline(time.Now())
  318. close(done)
  319. })
  320. n, err = conn.Write(b)
  321. if !stop() {
  322. <-done
  323. _ = conn.SetWriteDeadline(time.Time{})
  324. if err == nil {
  325. err = ctx.Err()
  326. }
  327. return n, err
  328. }
  329. return n, err
  330. }
  331. type connWithContext struct {
  332. ctx context.Context
  333. net.Conn
  334. }
  335. func (c *connWithContext) Read(b []byte) (n int, err error) {
  336. return ReadWithContext(c.ctx, c.Conn, b)
  337. }
  338. func (c *connWithContext) Write(b []byte) (n int, err error) {
  339. return WriteWithContext(c.ctx, c.Conn, b)
  340. }
  341. func NewConnWithContext(ctx context.Context, conn net.Conn) net.Conn {
  342. return &connWithContext{ctx: ctx, Conn: conn}
  343. }