| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 | package gioimport (	"log"	"sync"	"sync/atomic"	"time")type MutexID struct {	value map[any]struct{}	mu    sync.Mutex}func (mux *MutexID) TryLock(id any) bool {	if mux.value == nil {		mux.value = make(map[any]struct{})	}	mux.mu.Lock()	defer mux.mu.Unlock()	if _, ok := mux.value[id]; ok {		return false	}	mux.value[id] = struct{}{}	return true}func (mux *MutexID) Unlock(id any) {	mux.mu.Lock()	if _, ok := mux.value[id]; ok {		delete(mux.value, id)	} else {		panic("id not exist")	}	mux.mu.Unlock()}func (mux *MutexID) Close() error {	mux.mu.Lock()	clear(mux.value)	mux.mu.Unlock()	return nil}type Event struct {	cond         *sync.Cond	waiters      int32         // 当前等待者计数	totalWaiters int32         // 注册的等待者总数	ready        chan struct{} // 就绪信号	timeout      time.Duration // 超时用于唤醒其他 goroutine}// NewEvent 创建一个新的 Event,timeout 用于防止卡死func NewEvent(timeout time.Duration) *Event {	return &Event{		cond:    sync.NewCond(&sync.Mutex{}),		ready:   make(chan struct{}, 128), // 缓冲通道		timeout: timeout,	}}// Register 注册一个等待者func (e *Event) Register() {	atomic.AddInt32(&e.totalWaiters, 1)}// Unregister 注销一个等待者func (e *Event) Unregister() {	atomic.AddInt32(&e.totalWaiters, -1)}// Wait 等待事件触发,超时仅唤醒不退出func (e *Event) Wait() {	e.cond.L.Lock()	defer e.cond.L.Unlock()	// 增加等待者计数	atomic.AddInt32(&e.waiters, 1)	// 发送就绪信号	select {	case e.ready <- struct{}{}:	default:	}	// 等待事件,超时仅唤醒其他 goroutine	if e.timeout > 0 {		for {			timer := time.NewTimer(e.timeout)			go func() {				<-timer.C				e.cond.Broadcast() // 超时唤醒其他 goroutine			}()			e.cond.Wait()			if timer.Stop() {				break // 正常唤醒,退出循环			}			// 超时,继续等待下一次通知		}	} else {		e.cond.Wait()	}	// 正常唤醒,减少计数	atomic.AddInt32(&e.waiters, -1)}// Notify 广播通知所有等待的 goroutinefunc (e *Event) Notify() {	// 等待当前活跃的 goroutine 就绪	currentWaiters := atomic.LoadInt32(&e.totalWaiters)	for atomic.LoadInt32(&e.waiters) < currentWaiters {		select {		case <-e.ready:		case <-time.After(time.Millisecond * 100):			log.Printf("Notify: Timeout waiting for %d waiters, proceeding\n", currentWaiters)			break		}	}	e.cond.Broadcast()}
 |