| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305 | package svcimport (	"sync"	"time"	"golib/v2/features/mo"	"golib/v2/infra/ii")// Cache 数据库缓存// 缓存被设计为将写入频率较低且读取频率较高的数据库表预加载至内存中, 在关联查询时使用缓存数据进行匹配type Cache struct {	items    ii.Items	itemIdx  int	nameList []ii.Name	dataIdx  []map[string]map[any][]int	data     [][]mo.M	mutex sync.Mutex}// Include 检查 ii.Lookup.From 是否需要存在缓存func (c *Cache) Include(name ii.Name) (int, bool) {	for i, oldName := range c.nameList {		if oldName == name {			_, ok := c.items.Has(name)			return i, ok		}	}	return 0, false}// AddItem 增加 itemName 缓存func (c *Cache) AddItem(name ii.Name) {	for _, oldName := range c.nameList {		if oldName == name {			return		}	}	if _, ok := c.items.Has(name); !ok {		return	}	c.nameList[c.itemIdx] = name	c.itemIdx++}// SetData 设置 data 作为 itemName 的缓存数据func (c *Cache) SetData(name ii.Name, data []mo.M) {	c.mutex.Lock()	for i, oldName := range c.nameList {		if oldName != name {			continue // 如果未预设置 name 则无法设置缓存数据		}		itemInfo, ok := c.items.Has(name)		if !ok {			panic(ok)		}		idxMap := make(map[string]map[any][]int, len(data))		// 由于 _id 可能不在 XML 内, 所以此处单独初始化 _id 作为索引		oidIdx := make(map[any][]int)		for n, row := range data {			if oid, o := row[mo.ID.Key()]; o {				oidIdx[oid] = []int{n}			}		}		if len(oidIdx) > 0 {			idxMap[mo.ID.Key()] = oidIdx		}		// XML 索引		for _, field := range itemInfo.Fields {			if field.Name == mo.ID.Key() {				continue // 由于上方已处理 _id 作为索引, 所以当 XML 存在 _id 字段时跳过, 防止重复设置			}			if field.Type == mo.TypeArray || field.Type == mo.TypeObject {				continue			}			idx := make(map[any][]int)			for j, row := range data {				if fieldValue, o := row[field.Name]; o {					idx[fieldValue] = append(idx[fieldValue], j)				}			}			idxMap[field.Name] = idx		}		c.dataIdx[i] = idxMap		c.data[i] = data	}	c.mutex.Unlock()}// getData 从缓存中调出数据, 返回的 map 必须只读func (c *Cache) getData(name ii.Name) (map[string]map[any][]int, []mo.M) {	for i, oldName := range c.nameList {		if oldName == name {			return c.dataIdx[i], c.data[i]		}	}	return nil, nil}func (c *Cache) SpitPipe(itemInfo *ii.ItemInfo, pipe mo.Pipeline) (stage mo.Pipeline, lookup []ii.Lookup) {	for _, p := range pipe {		if _, lookVal, ok := mo.HasOperator(mo.Pipeline{p}, mo.PsLookup); ok {			if look, has := c.hasCacheFromLookup(itemInfo, lookVal); has {				lookup = append(lookup, look)				continue			}		}		stage = append(stage, p)	}	return}func (c *Cache) Format(itemInfo *ii.ItemInfo, lookup []ii.Lookup, rows *[]mo.M) time.Duration {	t := time.Now()	var group sync.WaitGroup	group.Add(len(*rows))	for i := 0; i < len(*rows); i++ {		go func(group *sync.WaitGroup, i int) {			for _, look := range lookup {				lookInfo, ok := c.items.Has(itemInfo.ForkName(look.From))				if !ok {					continue				}				lField, ok := itemInfo.Field(look.LocalField)				if !ok {					continue				}				c.handleLookup(i, rows, &look, lookInfo, &lField)			}			group.Done()		}(&group, i)	}	group.Wait()	return time.Now().Sub(t)}func (c *Cache) deepCopy(lField *ii.FieldInfo, lookInfo *ii.ItemInfo, cacheRow mo.M) mo.M {	m := make(mo.M)	for _, sub := range lField.Fields {		field, ok := lookInfo.Field(sub.Name)		if !ok {			continue		}		sv, ok := cacheRow[field.Name]		if !ok {			continue		}		switch field.Type {		case mo.TypeObject:			svv, ok := sv.(mo.M)			if !ok {				m[field.Name] = sv			} else {				dm, err := mo.DeepMapCopy(svv)				if err == nil {					m[field.Name] = dm				} else {					m[field.Name] = sv				}			}		case mo.TypeArray:			if field.Items == ii.FieldItemsObject {				svv, o := sv.(mo.A)				if !o {					m[field.Name] = sv				} else {					svList := make(mo.A, len(svv))					for i, row := range svv {						sr, ok := row.(mo.M)						if !ok {							svList[i] = row						} else {							r, err := mo.DeepMapCopy(sr)							if err == nil {								svList[i] = r							} else {								svList[i] = row							}						}					}					m[field.Name] = svList				}				continue			}			fallthrough		default:			m[field.Name] = sv		}	}	return m}func (c *Cache) handleList(lField *ii.FieldInfo, lookInfo *ii.ItemInfo, idxMap map[any][]int, cacheList []mo.M, lv any) mo.A {	// 先获取索引	idxList := make([]int, 0)	idx, ok := idxMap[lv]	if ok {		idxList = append(idxList, idx...)	}	// 根据索引分配大小	list := make(mo.A, len(idxList))	for i := 0; i < len(idxList); i++ {		list[i] = c.deepCopy(lField, lookInfo, cacheList[idxList[i]])	}	return list}func (c *Cache) handleSUM(idxMap map[any][]int, cacheList []mo.M, lv any, look *ii.Lookup) mo.A {	idxList := make([]int, 0)	idx, ok := idxMap[lv]	if ok {		idxList = append(idxList, idx...)	}	var sum float64 // 数据类型始终为 float64	for _, i := range idxList {		switch n := cacheList[i][look.SUM].(type) { // 累加字段数量		case float64:			sum += n		case int64:			sum += float64(n)		}	}	return mo.A{mo.M{look.SUM: sum}}}func (c *Cache) handleLookup(i int, rows *[]mo.M, look *ii.Lookup, lookInfo *ii.ItemInfo, lField *ii.FieldInfo) {	cacheIdx, cacheList := c.getData(lookInfo.Name)	lv, ok := (*rows)[i][look.LocalField]	if !ok {		return // 可能会存在某一条文档不存在这个字段的现象	}	idxMap := cacheIdx[look.ForeignField]	if look.List {		(*rows)[i][look.AS] = c.handleList(lField, lookInfo, idxMap, cacheList, lv)		return	}	if look.SUM != "" { // SUM 不为空时表示合计数量		// 当 Look.Form 的 ItemInfo 中包含 Look.SUM 字段时才进行合计		if _, o := lookInfo.FieldMap[look.SUM]; o {			(*rows)[i][look.AS] = c.handleSUM(idxMap, cacheList, lv, look)		}	} else {		// 由于设置缓存时规定了类型必须为 ObjectID, 所以此处可以直接断言		idx, o := idxMap[lv]		if !o {			return // 如果本地数据无法在索引中找到则跳过		}		// 对于 List=false 的情况, 需要确认是否使用唯一值进行关联		// 当使用非唯一值关联(如 name 而非 _id)时则仅使用众多索引的第一个数据		(*rows)[i][look.AS] = mo.A{c.deepCopy(lField, lookInfo, cacheList[idx[0]])}	}}type cacheLookup struct {	From         string `bson:"from"`	LocalField   string `bson:"localField"`	ForeignField string `bson:"foreignField"`	AS           string `bson:"as"`}// hasCacheFromLookup 从 val 中解析出上层代码传来的 Lookup, 然后从 itemInfo 根据 localField 拿到原始 ii.Lookup 配置// 并检查原始 ii.Lookup 的 From 表是否被缓存func (c *Cache) hasCacheFromLookup(itemInfo *ii.ItemInfo, val any) (ii.Lookup, bool) {	b, err := mo.Marshal(val)	if err != nil {		return ii.Lookup{}, false	}	var cl cacheLookup	if err = mo.Unmarshal(b, &cl); err != nil {		return ii.Lookup{}, false	}	field, ok := itemInfo.Field(cl.LocalField)	if !ok {		return ii.Lookup{}, false	}	lookup, ok := field.HasLookup(cl.AS)	if !ok {		return ii.Lookup{}, false	}	lookup.LocalField = field.Name	// 检查 lookup.From 是否被缓存	if _, ok = c.Include(itemInfo.ForkName(lookup.From)); !ok {		return ii.Lookup{}, false	}	return *lookup, true}const (	maxCacheTblSize = 128)func NewCache(items ii.Items) *Cache {	c := new(Cache)	c.nameList = make([]ii.Name, maxCacheTblSize)	c.dataIdx = make([]map[string]map[any][]int, maxCacheTblSize)	c.data = make([][]mo.M, maxCacheTblSize)	c.items = items	return c}
 |