| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330 | package networkimport (	"fmt"	"net"	"net/netip"	"sync"	"time")// TCPClient 用于所有使用 TCP 协议的客户端, 可以通过 Dial 创建此连接, 但通常应该是用 Client 接口而不是只用 TCPClient 结构体指针type TCPClient struct {	// reconnect 自动重连, 可多次开启或关闭, 开启后 Read / Write 遇到错误时会自动关闭连接然后使用 reconnecting 重连, 重连期间调用	// Read / Write 时会返回 ErrReconnect 错误, 因此可以通过此错误翻盘	reconnect bool	// connected 值为 false 时表示此连接由于超时或者服务器异常而被动关闭. 断开后调用 Read / Write 时会返回原始 socket 错误.	// 若 reconnect 值为 true 时则断开后会通过 reconnecting 重连, 重连期间调用 Read / Write 时返回 ErrReconnect 错误	connected bool	// closeManually 值为 true 时:	// 表示主动调用 Close 关闭连接, 此连接不可再重用	// 会使 reconnecting 失效	// 调用 Read / Write 时会返回 ErrClosed 错误	closeManually bool	// rDeadline 用于 Read 等待超时时间, 优先级高于 deadline	rDeadline time.Time	// wDeadline 用于 Write 等待超时时间, 优先级高于 deadline	wDeadline time.Time	// deadline 超时时间, 适用于 Read 和 Write, 当 rDeadline 和 wDeadline 不存在时生效	deadline time.Time	conn net.Conn	mu sync.Mutex}// SetReadDeadline 设置 Read 超时时间, 优先级高于 SetDeadlinefunc (c *TCPClient) SetReadDeadline(t time.Time) error {	c.rDeadline = t	return nil}// SetWriteDeadline 设置 Write 超时时间, 优先级高于 SetDeadlinefunc (c *TCPClient) SetWriteDeadline(t time.Time) error {	c.wDeadline = t	return nil}// SetDeadline 设置 Read / Write 超时时间func (c *TCPClient) SetDeadline(t time.Time) error {	c.deadline = t	return nil}// Read 读取数据到 p 中, 使用 setReadDeadline 超时规则// 读取错误时:////	reconnect == true:  主动关闭连接并返回 ErrReconnect 错误, 重连期间调用 Read 时继续返回 ErrReconnect 错误//	reconnect == false: 返回原始错误//// 连接关闭时(connected == false):////	主动关闭(closeManually == true):  返回 ErrClosed//	开启自动重连时(reconnect == true): 返回 ErrReconnect//// 调用示例:// p := defaultPool.Get().([]byte)// defaultPool.Put(p)// b, err := Read(p)////	if err == ErrReconnect {//	   continue//	}////	if err != nil {//	   return//	}func (c *TCPClient) Read(p []byte) (n int, err error) {	if !c.connected {		if c.closeManually {			return 0, ErrClosed		}		if c.reconnect {			return 0, ErrReconnect		}	}	c.mu.Lock()	if err = c.setReadDeadline(); err != nil {		c.mu.Unlock()		return	}	n, err = c.conn.Read(p)	if err != nil {		if c.reconnect {			err = ErrReconnect		}		c.passiveClose()	}	c.mu.Unlock()	return}// Write 写入 p 至 conn, 使用 setWriteDeadline 超时规则// 写入错误时:////	reconnect == true:  主动关闭连接并返回 ErrReconnect 错误, 重连期间调用 Write 时继续返回 ErrReconnect 错误//	reconnect == false: 返回原始错误//// 连接关闭时(connected == false):////	主动关闭(closeManually == true):  返回 ErrClosed//	开启自动重连时(reconnect == true): 返回 ErrReconnect//// 调用示例:// n, err := Write(p)////	if err == ErrReconnect {//	   continue//	}////	if err != nil || len(p) != n {//	   return//	}func (c *TCPClient) Write(p []byte) (n int, err error) {	if !c.connected {		if c.closeManually {			return 0, ErrClosed		}		if c.reconnect {			return 0, ErrReconnect		}	}	c.mu.Lock()	defer c.mu.Unlock()	if err = c.setWriteDeadline(); err != nil {		return	}	n, err = c.conn.Write(p)	if err != nil {		if c.reconnect {			err = ErrReconnect		}		c.passiveClose()		return	}	if len(p) != n {		err = ErrNotFullyWrite	}	return}// Close 主动关闭连接// 调用后会关闭 reconnecting 线程, 关闭与服务器的连接, 并设置// closeManually = true// connected = falsefunc (c *TCPClient) Close() error {	if !c.connected {		return nil	}	c.mu.Lock()	_ = c.conn.Close()	c.closeManually = true	c.connected = false	c.mu.Unlock()	return nil}func (c *TCPClient) LocalAddr() net.Addr {	return c.conn.LocalAddr()}func (c *TCPClient) RemoteAddr() net.Addr {	return c.conn.RemoteAddr()}// setReadDeadline 设置 Read 读取超时, 必须在 Read 前调用. 优先级高于 deadline,// 当 rDeadline <= 0 时使用 deadline, 当两者都 <= 0 时则使用 DefaultReadTimoutfunc (c *TCPClient) setReadDeadline() error {	if !c.rDeadline.IsZero() && time.Now().After(c.rDeadline) {		return c.conn.SetReadDeadline(c.rDeadline)	} else if !c.deadline.IsZero() && time.Now().After(c.deadline) {		return c.conn.SetReadDeadline(c.deadline)	}	return c.conn.SetReadDeadline(time.Now().Add(DefaultReadTimout))}// setWriteDeadline 设置 Write 读取超时, 必须在 Write 前调用. 优先级高于 deadline// 当 wDeadline <= 0 时使用 deadline, 当两者都 <= 0 时则使用 DefaultWriteTimoutfunc (c *TCPClient) setWriteDeadline() error {	if !c.wDeadline.IsZero() && time.Now().After(c.wDeadline) {		return c.conn.SetWriteDeadline(c.wDeadline)	} else if !c.deadline.IsZero() && time.Now().After(c.wDeadline) {		return c.conn.SetWriteDeadline(c.deadline)	}	return c.conn.SetWriteDeadline(time.Now().Add(DefaultWriteTimout))}// passiveClose 被动关闭连接, 在 Read 和 Write 返回错误时在 mu 中调用.func (c *TCPClient) passiveClose() {	if c.connected && c.reconnect {		_ = c.conn.Close()		c.connected = false	}}// getAddr 获取服务器的 IP 和 Port, 用于 reconnecting// 注: 远程服务器断开连接后 RemoteAddr 内也会留存服务器地址func (c *TCPClient) getAddr() netip.AddrPort {	return c.conn.RemoteAddr().(*net.TCPAddr).AddrPort()}// reconnecting 每 1 秒检查一次连接, 当 closeManually == false 且 connected 和 reconnect == true 时使用 DefaultDialTimout 进行重连.// 主动调用 Close 会使 closeManually == true// Read 或 Write 遇到错误时满足 connected 和 reconnect == true (重连的条件)// 无限次重试, 直至连接成功func (c *TCPClient) reconnecting() {	t := time.NewTicker(1 * time.Second)	for range t.C {		if c.closeManually {			break		}		if c.connected || !c.reconnect {			continue		}		addr := c.getAddr()		conn, err := net.DialTimeout(NetTCP, addr.String(), DefaultDialTimout)		if err == nil {			c.mu.Lock()			c.conn = (net.Conn)(nil)			c.conn = conn			c.connected = true			c.mu.Unlock()		}	}	t.Stop()}// modbusClient 实现 ModbusClient 接口, 用于客户端需要异步获取服务器状态的场景, 详情见 asynctype modbusClient struct {	connected bool	e error	b []byte	p chan []byte	data ModbusCreator	conn net.Conn}// Get 数据来自 conn 服务器返回的数据. 仅保留最后一次服务器返回的数据// 当遇到非 ErrReconnect 的错误时应调用 Close 关闭此连接, 否则 async 可能会一直返回 socket 错误func (ms *modbusClient) Get() ([]byte, error) {	if !ms.connected {		return nil, ErrClosed	}	t := time.Now().Add(DefaultWriteTimout + DefaultModbusWriteInterval)	for cap(ms.b) == 0 {		n := time.Now().Add(100 * time.Millisecond)		if t.Equal(n) || t.Before(n) {			return nil, ErrTimout		}		time.Sleep(100 * time.Millisecond)	}	return ms.b, ms.e}func (ms *modbusClient) Write(p []byte) error {	if !ms.connected {		return ErrClosed	}	ms.p <- p	return nil}// Close 断开与服务器的连接, 关闭 async 线程func (ms *modbusClient) Close() error {	if !ms.connected {		return nil	}	ms.connected = false	ms.b = make([]byte, 0)	return ms.conn.Close()}func (ms *modbusClient) writeRead(p []byte) ([]byte, error) {	if _, err := ms.conn.Write(p); err != nil {		return nil, err	}	b := defaultPool.Get().([]byte)	defaultPool.Put(b)	n, err := ms.conn.Read(b)	if err != nil {		return nil, err	}	return Remake(b[:n]), nil}// async 每 1 秒调用 ModbusCreator 接口创建数据并发送至 conn, 然后将返回的数据保存至 b// 如果期间遇到任何错误将会继续重试, 除非主动调用 Close 关闭func (ms *modbusClient) async() {	t := time.NewTicker(DefaultModbusWriteInterval)	defer func() {		t.Stop()		_ = ms.Close()	}()	for ms.connected {		select {		case p, ok := <-ms.p:			if ok {				ms.b, ms.e = ms.writeRead(p)			}		case <-t.C:			// 如果创建数据失败则关闭连接			b, err := ms.data.Create()			if err != nil {				ms.e = fmt.Errorf("called ModbusStatusWrite.Create: %s", err)				return			}			ms.b, ms.e = ms.writeRead(b)		}	}}
 |