| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361 | package modbusimport (	"context"	"errors"	"fmt"	"io"	"math"	"net"	"strings"	"sync"	"time"	"golib/v4/gio"	"golib/v4/gnet"	"golib/v4/log")type PLC interface {	gnet.ConnStat	gnet.DataAccess	io.Closer}var (	ErrReadError    = errors.New("modbus: read error")	ErrWriteError   = errors.New("modbus: write error")	ErrReadTimeout  = errors.New("modbus: read timeout")	ErrWriteTimeout = errors.New("modbus: write timeout")	ErrConnError    = errors.New("modbus: connection error")	ErrParamError   = errors.New("modbus: parameter error"))const (	MaxReadBuffSize = 1024)// 一次连续读取寄存器的最大数量const maxReadRegister = 30type modbusConn struct {	conn   net.Conn	buf    gnet.Bytes	logger log.Logger	mu     sync.Mutex}func (w *modbusConn) IsConnected() bool {	if w.conn == nil {		return false	}	if conn, ok := w.conn.(gnet.ConnStat); ok {		return conn.IsConnected()	}	return true}func (w *modbusConn) IsClosed() bool {	if w.conn == nil {		return true	}	if conn, ok := w.conn.(gnet.ConnStat); ok {		return conn.IsClosed()	}	return false}// ReadData 读取原始数据, 当 count 过大时会自动分段读取// 规则:////	blockId == Code3, 表示读取保持寄存器, 每个寄存器大小为 2 个字节, count 为寄存器数量, 返回数据大小为 count*2func (w *modbusConn) ReadData(ctx context.Context, blockId, address, count int) ([]byte, error) {	if !w.IsConnected() || w.IsClosed() {		return nil, gnet.ErrUnconnected	}	w.mu.Lock()	defer w.mu.Unlock()	switch blockId {	case Code3:		if !w.checkCode3(address, count) {			return nil, ErrParamError		}	default:		// TODO 目前仅支持 4x(Code03) 地址		return nil, fmt.Errorf("modbus: ReadData: unsupported funCode: %d", blockId)	}	pduGroup := gnet.SplitNumber(count, maxReadRegister)	aduList := make([]ADU, len(pduGroup))	for i, length := range pduGroup { //		curAddr := address + i*maxReadRegister		pdu := NewPDUReadRegisters(byte(blockId), uint16(curAddr), uint16(length))		aduList[i] = NewADU(uint16(i), Protocol, 0, pdu)	}	buf := make([]byte, count*2)	for i, adu := range aduList {		deadline, ok := ctx.Deadline()		if !ok {			deadline = time.Now().Add(gnet.ClientReadTimout)		}		b, err := w.call(deadline, adu.Serialize())		if err != nil {			return nil, fmt.Errorf("modbus: ReadData: %s", err)		}		resp, err := ParseADU(b)		if err != nil {			return nil, fmt.Errorf("modbus: ReadData: ParseADU: %s", err)		}		if err = CheckADU(adu, resp); err != nil {			return nil, fmt.Errorf("modbus: ReadData: CheckADU: %s", err)		}		copy(buf[maxReadRegister*2*i:], resp.PDU.Data)	}	return buf, nil}func (w *modbusConn) WriteData(ctx context.Context, blockId, address, count int, buf []byte) error {	if !w.IsConnected() || w.IsClosed() {		return gnet.ErrUnconnected	}	w.mu.Lock()	defer w.mu.Unlock()	switch blockId {	case Code6, Code16:		if !w.checkCode6(address, count, buf) {			return ErrParamError		}	default:		return fmt.Errorf("modbus: WriteData: unsupported funCode: %d", blockId)	}	var (		pdu PDU		err error	)	if count == 1 {		pdu, err = NewPDUWriterSingleRegisterFromBuff(uint16(address), buf)	} else {		pdu, err = NewPDUWriterMultipleRegistersFromBuff(uint16(address), uint16(count), buf)	}	if err != nil {		return errors.Join(ErrParamError, err)	}	adu := NewADU(uint16(address), Protocol, 0, pdu)	deadline, ok := ctx.Deadline()	if !ok {		deadline = time.Now().Add(gnet.ClientReadTimout)	}	b, err := w.call(deadline, adu.Serialize())	if err != nil {		return fmt.Errorf("modbus: WriteData: : %s", err)	}	resp, err := ParseADU(b)	if err != nil {		return fmt.Errorf("modbus: WriteData: ParseADU: %s", err)	}	if resp.TransactionID != adu.TransactionID {		return fmt.Errorf("modbus: WriteData: transactionID mismatch: want %d, got %d", adu.TransactionID, resp.TransactionID)	}	return nil}func (w *modbusConn) GetProtocolName() string {	return ProtocolName}func (w *modbusConn) checkCode3(address, count int) bool {	return (address >= 0 && address <= math.MaxUint16) && (count > 0 && count <= math.MaxUint16)}func (w *modbusConn) checkCode6(address, count int, buf []byte) bool {	return (address >= 0 && address <= math.MaxUint16) && (count > 0 && count <= math.MaxUint16) && (len(buf)/2 == count)}func (w *modbusConn) call(deadline time.Time, b gnet.Bytes) ([]byte, error) {	if err := w.conn.SetDeadline(deadline); err != nil {		w.logger.Error("modbus: call: failed to set deadline: %s", err)		return nil, errors.Join(ErrConnError, err)	}	if _, err := w.conn.Write(b); err != nil {		w.logger.Error("modbus: call: failed to write response: %s", err)		if isNetTimeout(err) {			return nil, errors.Join(ErrWriteTimeout, err)		}		return nil, errors.Join(ErrWriteError, err)	}	w.logger.Debug("modbus: Write: %s", b.HexTo())	clear(w.buf)	n, err := w.conn.Read(w.buf)	if err != nil {		w.logger.Error("modbus: call: failed to read response: %s", err)		if isNetTimeout(err) {			return nil, errors.Join(ErrReadTimeout, err)		}		return nil, errors.Join(ErrReadError, err)	}	data := w.buf[:n]	w.logger.Debug("modbus: Read: %s", data.HexTo())	return data, nil}func (w *modbusConn) Close() error {	if w.conn == nil {		return nil	}	return w.conn.Close()}func New(conn net.Conn, logger log.Logger) PLC {	c := &modbusConn{		conn:   conn,		buf:    make([]byte, MaxReadBuffSize),		logger: logger,	}	return c}// Conn PLC 主控连接// Deprecated, 请使用 Newtype Conn interface {	// ConnStat 连接状态	gnet.ConnStat	// IsLocked 表示当前有其他线程正在与 PLC 交互	IsLocked() bool	// WriteResponse 向 PLC 发送数据并等待 PLC 响应	WriteResponse(b []byte) ([]byte, error)	// Closer 关闭与 PLC 主控的连接	io.Closer}// Dialer// Deprecated, 请使用 Newtype Dialer struct {	conn   net.Conn	buf    []byte	logger log.Logger	mu     sync.Mutex	lock   bool}func (w *Dialer) IsConnected() bool {	if w.conn == nil {		return false	}	return w.conn.(gnet.ConnStat).IsConnected()}func (w *Dialer) IsClosed() bool {	if w.conn == nil {		return true	}	return w.conn.(gnet.ConnStat).IsClosed()}func (w *Dialer) IsLocked() bool {	return w.lock}// WriteResponse 写入并读取下一次的数据func (w *Dialer) WriteResponse(b []byte) ([]byte, error) {	if w.conn == nil {		return nil, gnet.ErrConnNotFound	}	w.mu.Lock()	defer w.mu.Unlock()	w.lock = true	defer func() {		w.lock = false	}()	w.logger.Debug("Write: %s", gnet.Bytes(b).HexTo())	if i, err := w.conn.Write(b); err != nil {		w.logger.Error("Write err: %d->%d %s", len(b), i, err)		if isNetTimeout(err) {			return nil, errors.Join(ErrWriteTimeout, err)		}		return nil, err	}	clear(w.buf)	n, err := w.conn.Read(w.buf)	if err != nil {		w.logger.Error("Read err: %s", err)		if isNetTimeout(err) {			return nil, errors.Join(ErrReadTimeout, err)		}		return nil, err	}	w.logger.Debug("Read: %s", gnet.Bytes(w.buf[:n]).HexTo())	return w.buf[:n], nil}func (w *Dialer) Close() error {	if w.conn == nil {		return nil	}	return w.conn.Close()}func (w *Dialer) CloseWith(ctx context.Context) {	<-ctx.Done()	w.logger.Warn("DialContext: %s", ctx.Err())	_ = w.Close()}func (w *Dialer) DialContext(ctx context.Context, address string, logger log.Logger) (Conn, error) {	config := &gnet.Config{ // 由于现场网络环境比较差, 因此加大超时时间以防止频繁掉线重连		Timeout:     60 * time.Second,		DialTimeout: 10 * time.Second,		Reconnect:   true,	}	return w.DialConfig(ctx, address, config, logger)}func (w *Dialer) DialConfig(ctx context.Context, address string, config *gnet.Config, logger log.Logger) (Conn, error) {	deadline := time.Now().Add(config.DialTimeout)	if conn, err := gnet.DialTCPConfig(address, config); err == nil {		w.conn = conn	} else {		if timeout := deadline.Sub(time.Now()); timeout > 0 {			gio.RandSleep(0, timeout)		}		logger.Debug("DialContext: %s", err)		return nil, err	}	go func() {		w.CloseWith(ctx)	}()	w.buf = make([]byte, MaxReadBuffSize)	w.logger = log.Part(logger, "conn", strings.ReplaceAll(address, ":", "_"))	return w, nil}func isNetTimeout(err error) bool {	var ne net.Error	if errors.As(err, &ne) && ne.Timeout() {		return true	}	return false}// DialContext// Deprecated, 请使用 Newfunc DialContext(ctx context.Context, address string, logger log.Logger) (Conn, error) {	var dialer Dialer	return dialer.DialContext(ctx, address, logger)}// DialConfig// Deprecated, 请使用 Newfunc DialConfig(ctx context.Context, address string, config *gnet.Config, logger log.Logger) (Conn, error) {	var dialer Dialer	return dialer.DialConfig(ctx, address, config, logger)}// Dial// Deprecated, 请使用 Newfunc Dial(address string, logger log.Logger) (Conn, error) {	return DialContext(context.Background(), address, logger)}
 |