| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 | package modbusimport (	"context"	"net"	"sync/atomic"	"time"	"golib/v3/gnet")// Creator 创建需要写入的数据type Creator interface {	Create() ([]byte, error)}// ReadAfter 读取数据之后会调用此接口type ReadAfter interface {	ReadAfterHandle(b []byte) error}// ReadAfterFunc 为 ReadAfter 的快捷方式type ReadAfterFunc func(b []byte) errorfunc (f ReadAfterFunc) ReadAfterHandle(b []byte) error {	return f(b)}// ErrHandler 遇到错误时会调用此接口type ErrHandler interface {	ErrHandle(err error)}// ErrHandlerFunc 为 ErrHandler 的快捷方式type ErrHandlerFunc func(err error)func (f ErrHandlerFunc) ErrHandle(err error) {	f(err)}type Buffer struct {	Conn       net.Conn	ReadAfter  ReadAfter  // 读取数据后执行	ErrHandler ErrHandler // 读写失败时执行	Cache      atomic.Value	Creator    Creator       // 当 Wait 无数据且到达轮询时间时执行	Interval   time.Duration // 轮询频率	Wait       chan []byte	Logger     gnet.Logger	Ctx context.Context}func (rw *Buffer) Get() ([]byte, bool) {	b, ok := rw.Cache.Load().([]byte)	if !ok {		return nil, false	}	return b, true}func (rw *Buffer) Send(b []byte) {	rw.Wait <- b}func (rw *Buffer) handleData(b []byte) {	if len(b) > 0 {		rw.Logger.Debug("Write: %s", gnet.Bytes(b).HexTo())		n, err := rw.Conn.Write(b)		if err != nil {			rw.ErrHandler.ErrHandle(err)			rw.Logger.Error("Write err: %s", err)			return		}		if n != len(b) {			rw.ErrHandler.ErrHandle(err)			rw.Logger.Error("Write err: not fully write: data length: %d write length: %d", len(b), n)			return		}	}	body := make([]byte, 4096)	n, err := rw.Conn.Read(body)	if err != nil {		rw.ErrHandler.ErrHandle(err)		rw.Logger.Error("Read err: %s", err)		return	}	rw.Cache.Store(body[:n])	rw.Logger.Debug("Read: %s", gnet.Bytes(body[:n]).HexTo())	if err = rw.ReadAfter.ReadAfterHandle(body[:n]); err != nil {		rw.Logger.Error("Handle err: %s", err)	}}func (rw *Buffer) callCreate() {	if rw.Creator != nil {		b, err := rw.Creator.Create()		if err != nil {			rw.Logger.Error("Handle Create err: %s", err)		} else {			rw.handleData(b)		}	} else {		rw.handleData(nil)	}}func (rw *Buffer) Start() {	rw.callCreate() // call once	if rw.Interval <= 0 {		rw.Interval = gnet.IdleTime	}	t := time.NewTimer(rw.Interval)	defer t.Stop()	for {		select {		case <-rw.Ctx.Done():			_ = rw.Conn.Close()			rw.ErrHandler.ErrHandle(rw.Ctx.Err())			return		case <-t.C:			rw.callCreate()			t.Reset(rw.Interval)		case b := <-rw.Wait:			rw.handleData(b)		}	}}func NewBuffer(ctx context.Context, conn net.Conn, creator Creator) *Buffer {	b := new(Buffer)	b.Conn = conn	b.ReadAfter = ReadAfterFunc(func(_ []byte) error { return nil })	b.ErrHandler = ErrHandlerFunc(func(_ error) {})	b.Wait = make(chan []byte, 3)	b.Creator = creator	b.Logger = gnet.DefaultLogger("[Buffer] ")	b.Ctx = ctx	return b}
 |