| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 | 
							- package network
 
- import (
 
- 	"io"
 
- 	"net"
 
- 	"sync"
 
- 	"sync/atomic"
 
- 	"time"
 
- )
 
- // TCPClient 用于所有使用 TCP 协议的客户端, 可以通过 Dial 创建此连接, 但通常应该是用 Client 接口而不是只用 TCPClient 结构体指针
 
- type TCPClient struct {
 
- 	// Reconnect 自动重连, 默认为 true, 当 Read / Write 遇到错误时主动断开连接并会通过 reconnecting 重连. 重连期间调用 Read / Write
 
- 	// 时会返回 ErrReconnect 错误. 当调用 Close 时 Reconnect 会被更改为 false
 
- 	Reconnect bool
 
- 	// Connected 已连接, 默认为 true.
 
- 	// 调用 Close 后 Connected 会被更改为 false
 
- 	// 值为 false 时表示已与服务器断开连接, 之后调用 Read / Write 时会返回原始 socket 错误.
 
- 	// 若 Reconnect 值为 true 时则断开后会通过 reconnecting 重连, 重连期间调用 Read / Write 时会返回 ErrReconnect 错误.
 
- 	Connected bool
 
- 	// RDeadline 用于 Read 等待超时时间, 优先级高于 Deadline
 
- 	RDeadline time.Time
 
- 	// WDeadline 用于 Write 等待超时时间, 优先级高于 Deadline
 
- 	WDeadline time.Time
 
- 	// Deadline 超时时间, 适用于 Read 和 Write, 当 RDeadline 和 WDeadline 不存在时生效
 
- 	Deadline time.Time
 
- 	// Conn 服务器连接
 
- 	Conn *ConnSafe
 
- 	mu sync.Mutex
 
- 	Log Logger
 
- }
 
- // SetReadDeadline 设置 Read 超时时间, 优先级高于 SetDeadline
 
- func (c *TCPClient) SetReadDeadline(t time.Time) error {
 
- 	c.RDeadline = t
 
- 	c.Log.Println("[TCPClient] SetReadDeadline: %s", t.String())
 
- 	return nil
 
- }
 
- // SetWriteDeadline 设置 Write 超时时间, 优先级高于 SetDeadline
 
- func (c *TCPClient) SetWriteDeadline(t time.Time) error {
 
- 	c.WDeadline = t
 
- 	c.Log.Println("[TCPClient] SetWriteDeadline: %s", t.String())
 
- 	return nil
 
- }
 
- // SetDeadline 设置 Read / Write 超时时间
 
- func (c *TCPClient) SetDeadline(t time.Time) error {
 
- 	c.Deadline = t
 
- 	c.Log.Println("[TCPClient] SetDeadline: %s", t.String())
 
- 	return nil
 
- }
 
- // Read 读取数据到 p 中, 使用 setReadDeadline 超时规则
 
- func (c *TCPClient) Read(p []byte) (n int, err error) {
 
- 	c.mu.Lock()
 
- 	defer c.mu.Unlock()
 
- 	if !c.Connected {
 
- 		c.Log.Println("[TCPClient] Read: Connected == false")
 
- 		if c.Reconnect {
 
- 			c.Log.Println("[TCPClient] Read: %s returned", ErrReconnect)
 
- 			return 0, ErrReconnect
 
- 		}
 
- 		c.Log.Println("[TCPClient] Read: %s returned", ErrClosed)
 
- 		return 0, ErrClosed
 
- 	}
 
- 	if err = setReadDeadline(c.Conn, c.RDeadline, c.Deadline); err != nil {
 
- 		err = c.handleErr(err)
 
- 		return
 
- 	}
 
- 	n, err = c.Conn.Read(p)
 
- 	if err != nil {
 
- 		c.Log.Println("[TCPClient] Conn.Read: %s -> %s", Bytes(p).HexTo(), err)
 
- 		err = c.handleErr(err)
 
- 	}
 
- 	return
 
- }
 
- // Write 写入 p 至 Conn, 使用 setWriteDeadline 超时规则
 
- func (c *TCPClient) Write(p []byte) (n int, err error) {
 
- 	c.mu.Lock()
 
- 	defer c.mu.Unlock()
 
- 	if !c.Connected {
 
- 		c.Log.Println("[TCPClient] Write: Connected == false")
 
- 		if c.Reconnect {
 
- 			c.Log.Println("[TCPClient] Write: %s returned", ErrReconnect)
 
- 			return 0, ErrReconnect
 
- 		}
 
- 		c.Log.Println("[TCPClient] Write: %s returned", ErrClosed)
 
- 		return 0, ErrClosed
 
- 	}
 
- 	if err = setWriteDeadline(c.Conn, c.WDeadline, c.Deadline); err != nil {
 
- 		err = c.handleErr(err)
 
- 		return
 
- 	}
 
- 	n, err = c.Conn.Write(p)
 
- 	if err != nil {
 
- 		c.Log.Println("[TCPClient] Conn.Write: %s -> %s", Bytes(p).HexTo(), err)
 
- 		err = c.handleErr(err)
 
- 	}
 
- 	return
 
- }
 
- // Close 主动关闭连接
 
- func (c *TCPClient) Close() error {
 
- 	c.mu.Lock()
 
- 	defer c.mu.Unlock()
 
- 	if !c.Connected {
 
- 		c.Log.Println("[TCPClient] Close: Connected == false")
 
- 		return nil
 
- 	}
 
- 	_ = c.Conn.Close()
 
- 	c.Reconnect = false
 
- 	c.Connected = false
 
- 	c.Log.Println("[TCPClient] Close: closed")
 
- 	return nil
 
- }
 
- func (c *TCPClient) LocalAddr() net.Addr {
 
- 	return c.Conn.LocalAddr()
 
- }
 
- func (c *TCPClient) RemoteAddr() net.Addr {
 
- 	return c.Conn.RemoteAddr()
 
- }
 
- // handleErr 当 err != nil 时, 若 Connected == true && Reconnect == true 则关闭连接并将 Connected 更改为 ErrReconnect
 
- func (c *TCPClient) handleErr(err error) error {
 
- 	if err == nil {
 
- 		return nil
 
- 	}
 
- 	if c.Connected && c.Reconnect {
 
- 		c.Log.Println("[TCPClient] handleErr: %s -> %s returned", err, ErrReconnect)
 
- 		_ = c.Conn.Close()
 
- 		c.Connected = false
 
- 		return ErrReconnect
 
- 	}
 
- 	c.Log.Println("[TCPClient] handleErr: %s", err)
 
- 	return err
 
- }
 
- // reconnecting 每 2 秒检查一次连接, 当 Reconnect == true 且 Connected == false 时使用 DefaultDialTimout 进行重连.
 
- // 主动调用 Close 会使 Reconnect == false
 
- // 无限次重试, 直至连接成功
 
- func (c *TCPClient) reconnecting() {
 
- 	addr := c.RemoteAddr().(*net.TCPAddr).AddrPort()
 
- 	c.Log.Println("[TCPClient] Connected to %s", addr)
 
- 	t := time.NewTicker(2 * time.Second)
 
- 	c.Log.Println("[TCPClient] reconnecting: Started Ticker")
 
- 	for range t.C {
 
- 		if !c.Reconnect {
 
- 			c.Log.Println("[TCPClient] reconnecting: Reconnect == false")
 
- 			break
 
- 		}
 
- 		if c.Connected {
 
- 			continue
 
- 		}
 
- 		conn, err := net.DialTimeout(NetTCP, addr.String(), DefaultDialTimout)
 
- 		if err == nil {
 
- 			c.mu.Lock()
 
- 			c.Conn.Set(conn)
 
- 			c.Connected = true
 
- 			c.Log.Println("[TCPClient] reconnecting: reconnected -> %s", addr)
 
- 			c.mu.Unlock()
 
- 		} else {
 
- 			c.Log.Println("[TCPClient] reconnecting: %s", err)
 
- 		}
 
- 	}
 
- 	t.Stop()
 
- 	c.Log.Println("[TCPClient] reconnecting: Stopped Ticker")
 
- }
 
- func NewTCPClient(conn net.Conn, logger Logger) net.Conn {
 
- 	tc := new(TCPClient)
 
- 	tc.Log = logger
 
- 	tc.Conn = new(ConnSafe)
 
- 	tc.Conn.Set(conn)
 
- 	tc.Reconnect = true
 
- 	tc.Connected = true
 
- 	go tc.reconnecting()
 
- 	return tc
 
- }
 
- // ModbusClient 实现 ModbusClient 接口, 用于客户端需要异步获取服务器状态的场景, 详情见 async
 
- // 关系: 前端 <- ModbusClient -> TCPClient
 
- type ModbusClient struct {
 
- 	Connected bool // 当前连接控制
 
- 	Transmit atomic.Value // 来自下游客户端的数据, 返回给前端
 
- 	Recv     chan []byte  // 来自上游前端的数据, 需要发送至 Conn
 
- 	Handler ModbusCreator // 当 Recv 中没有数据时默认调用此接口发送数据
 
- 	Conn    net.Conn      // 通常为 TCPClient
 
- 	Log Logger
 
- }
 
- // Get 数据来自 Conn 服务器返回的数据. 仅保留最后一次服务器返回的数据
 
- // 当遇到非 ErrReconnect 的错误时应调用 Close 关闭此连接, 否则 async 可能会一直返回 socket 错误
 
- func (ms *ModbusClient) Read(b []byte) (n int, err error) {
 
- 	if !ms.Connected {
 
- 		ms.Log.Println("[ModbusClient] Read: Connected == false; %s returned", ErrClosed)
 
- 		return 0, ErrClosed
 
- 	}
 
- 	t := time.Now().Add(DefaultWriteTimout + DefaultModbusWriteInterval)
 
- 	for ms.Transmit.Load() == nil {
 
- 		timout := time.Now().Add(100 * time.Millisecond)
 
- 		if t.Equal(timout) || t.Before(timout) {
 
- 			ms.Log.Println("[ModbusClient] Read: %s -> %s returned", t.String(), ErrTimout)
 
- 			return 0, ErrTimout
 
- 		}
 
- 		time.Sleep(100 * time.Millisecond)
 
- 	}
 
- 	p := ms.Transmit.Load().([]byte)
 
- 	copy(b, p)
 
- 	return len(p), nil
 
- }
 
- func (ms *ModbusClient) Write(p []byte) (n int, err error) {
 
- 	if !ms.Connected {
 
- 		ms.Log.Println("[ModbusClient] Write: Connected == false; %s returned", ErrClosed)
 
- 		return 0, ErrClosed
 
- 	}
 
- 	ms.Recv <- p
 
- 	ms.Log.Println("[ModbusClient] Write: Added to Recv channel")
 
- 	return len(p), nil
 
- }
 
- // Close 断开与服务器的连接, 关闭 async 线程
 
- func (ms *ModbusClient) Close() error {
 
- 	if !ms.Connected {
 
- 		ms.Log.Println("[ModbusClient] Close: Connected == false")
 
- 		return nil
 
- 	}
 
- 	ms.Transmit.Store([]byte{})
 
- 	_ = ms.Conn.Close() // 先关闭下游连接. 可能存在共用同一个日志接口的情况, 否则会导致下游连接写入日志失败
 
- 	ms.Connected = false
 
- 	ms.Log.Println("[ModbusClient] Close: closed")
 
- 	return nil
 
- }
 
- func (ms *ModbusClient) writeRead(p []byte) {
 
- 	if _, err := ms.Conn.Write(p); err != nil {
 
- 		ms.Log.Println("[ModbusClient] writeRead: Conn.Write: %s", err)
 
- 		return
 
- 	}
 
- 	b := make(Bytes, DefaultBufferSize)
 
- 	n, err := ms.Conn.Read(b)
 
- 	if err != nil {
 
- 		ms.Log.Println("[ModbusClient] writeRead: Conn.Read: %s", err)
 
- 		return
 
- 	}
 
- 	ms.Transmit.Store(b[:n].Remake().Bytes())
 
- }
 
- // async 每 1 秒调用 ModbusCreator 接口创建数据并发送至 Conn, 然后将返回的数据保存至 Transmit
 
- // 如果期间遇到任何错误将会继续重试, 除非主动调用 Close 关闭
 
- func (ms *ModbusClient) async() {
 
- 	t := time.NewTicker(DefaultModbusWriteInterval)
 
- 	defer func() {
 
- 		t.Stop()
 
- 		_ = ms.Close()
 
- 	}()
 
- 	for ms.Connected {
 
- 		select {
 
- 		case p, ok := <-ms.Recv:
 
- 			if ok {
 
- 				ms.writeRead(p)
 
- 			}
 
- 		case <-t.C:
 
- 			// 如果创建数据失败则关闭连接
 
- 			if ms.Handler != nil {
 
- 				b, err := ms.Handler.Create()
 
- 				if err != nil {
 
- 					ms.Log.Println("[ModbusClient] async: Handler.Create: %s", err)
 
- 					return
 
- 				}
 
- 				ms.writeRead(b)
 
- 			}
 
- 		}
 
- 	}
 
- }
 
- func createModbusClient(conn net.Conn, data ModbusCreator, logger Logger) io.ReadWriteCloser {
 
- 	ms := new(ModbusClient)
 
- 	ms.Log = logger
 
- 	ms.Recv = make(chan []byte, 1)
 
- 	ms.Conn = conn
 
- 	ms.Handler = data
 
- 	ms.Connected = true
 
- 	go ms.async()
 
- 	return ms
 
- }
 
 
  |