| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 | 
							- package modbus
 
- import (
 
- 	"context"
 
- 	"net"
 
- 	"sync/atomic"
 
- 	"time"
 
- 	"golib/v2/gnet"
 
- )
 
- // Creator 创建需要写入的数据
 
- type Creator interface {
 
- 	Create() ([]byte, error)
 
- }
 
- // ReadAfter 读取数据之后会调用此接口
 
- type ReadAfter interface {
 
- 	ReadAfterHandle(b []byte) error
 
- }
 
- // ReadAfterFunc 为 ReadAfter 的快捷方式
 
- type ReadAfterFunc func(b []byte) error
 
- func (f ReadAfterFunc) ReadAfterHandle(b []byte) error {
 
- 	return f(b)
 
- }
 
- // ErrHandler 遇到错误时会调用此接口
 
- type ErrHandler interface {
 
- 	ErrHandle(err error)
 
- }
 
- // ErrHandlerFunc 为 ErrHandler 的快捷方式
 
- type ErrHandlerFunc func(err error)
 
- func (f ErrHandlerFunc) ErrHandle(err error) {
 
- 	f(err)
 
- }
 
- type Buffer struct {
 
- 	Conn       net.Conn
 
- 	ReadAfter  ReadAfter  // 读取数据后执行
 
- 	ErrHandler ErrHandler // 读写失败时执行
 
- 	Cache      atomic.Value
 
- 	Creator    Creator       // 当 Wait 无数据且到达轮询时间时执行
 
- 	Interval   time.Duration // 轮询频率
 
- 	Wait       chan []byte
 
- 	Logger     gnet.Logger
 
- 	Ctx context.Context
 
- }
 
- func (rw *Buffer) Get() ([]byte, bool) {
 
- 	b, ok := rw.Cache.Load().([]byte)
 
- 	if !ok {
 
- 		return nil, false
 
- 	}
 
- 	return b, true
 
- }
 
- func (rw *Buffer) Send(b []byte) {
 
- 	rw.Wait <- b
 
- }
 
- func (rw *Buffer) handleData(b []byte) {
 
- 	if len(b) > 0 {
 
- 		rw.Logger.Println("Write: %s", gnet.Bytes(b).HexTo())
 
- 		n, err := rw.Conn.Write(b)
 
- 		if err != nil {
 
- 			rw.ErrHandler.ErrHandle(err)
 
- 			rw.Logger.Println("Write err: %s", err)
 
- 			return
 
- 		}
 
- 		if n != len(b) {
 
- 			rw.ErrHandler.ErrHandle(err)
 
- 			rw.Logger.Println("Write err: not fully write: data length: %d write length: %d", len(b), n)
 
- 			return
 
- 		}
 
- 	}
 
- 	body := make([]byte, 4096)
 
- 	n, err := rw.Conn.Read(body)
 
- 	if err != nil {
 
- 		rw.ErrHandler.ErrHandle(err)
 
- 		rw.Logger.Println("Read err: %s", err)
 
- 		return
 
- 	}
 
- 	rw.Cache.Store(body[:n])
 
- 	rw.Logger.Println("Read: %s", gnet.Bytes(body[:n]).HexTo())
 
- 	if err = rw.ReadAfter.ReadAfterHandle(body[:n]); err != nil {
 
- 		rw.Logger.Println("Handle err: %s", err)
 
- 	}
 
- }
 
- func (rw *Buffer) callCreate() {
 
- 	if rw.Creator != nil {
 
- 		b, err := rw.Creator.Create()
 
- 		if err != nil {
 
- 			rw.Logger.Println("Handle Create err: %s", err)
 
- 		} else {
 
- 			rw.handleData(b)
 
- 		}
 
- 	} else {
 
- 		rw.handleData(nil)
 
- 	}
 
- }
 
- func (rw *Buffer) Start() {
 
- 	rw.callCreate() // call once
 
- 	if rw.Interval <= 0 {
 
- 		rw.Interval = gnet.WriteInterval
 
- 	}
 
- 	t := time.NewTimer(rw.Interval)
 
- 	defer t.Stop()
 
- 	for {
 
- 		select {
 
- 		case <-rw.Ctx.Done():
 
- 			_ = rw.Conn.Close()
 
- 			rw.ErrHandler.ErrHandle(rw.Ctx.Err())
 
- 			return
 
- 		case <-t.C:
 
- 			rw.callCreate()
 
- 			t.Reset(rw.Interval)
 
- 		case b := <-rw.Wait:
 
- 			rw.handleData(b)
 
- 		}
 
- 	}
 
- }
 
- func NewBuffer(ctx context.Context, conn net.Conn, creator Creator) *Buffer {
 
- 	b := new(Buffer)
 
- 	b.Conn = conn
 
- 	b.ReadAfter = ReadAfterFunc(func(_ []byte) error { return nil })
 
- 	b.ErrHandler = ErrHandlerFunc(func(_ error) {})
 
- 	b.Wait = make(chan []byte, 3)
 
- 	b.Creator = creator
 
- 	b.Logger = gnet.DefaultLogger("[Buffer] ")
 
- 	b.Ctx = ctx
 
- 	return b
 
- }
 
 
  |