package statusMgr import ( "wb/om" "sync" "time" "wb/ut" "wb/ii" "wb/lg" "github.com/astaxie/beego" "fmt" "eps/models/iot" "wb/cs" "eps/models/etc" "wb/modbus" "strings" ) type StatusMgr struct { BlockId string DefaultValue cs.MObject Block *StatusCacheBlock } func GetItemBySid(sid string)string{ if len(sid) >= 24{ return iot.TypeGenset } item := "" for _, modelInfo := range modbus.ModelInfoMap{ if strings.HasPrefix(sid, modelInfo.StartId){ if len(item) > len(modelInfo.StartId){ continue } item = modelInfo.Item } } if item == ""{ lg.Error(" modelInfo error: no model match id:", sid) return iot.TypeGenset }else{ return item } } func GetModelInfoBySid(sid string)modbus.ModelInfo{ item := "" for _, modelInfo := range modbus.ModelInfoMap{ if strings.HasPrefix(sid, modelInfo.StartId){ if len(item) > len(modelInfo.StartId){ continue } item = modelInfo.Item } } if item == ""{ lg.Error(" modelInfo error: no model match id:", sid) return iot.TypeGenset }else{ return item } } func GetMgrByModelItem(item string)*StatusMgr{ if mgr, ok := IotMgrMap[item];ok{ return mgr } return IotMgrMap[iot.TypeGenset] } func GetMgrBySid(sid string)*StatusMgr{ if mgr, ok := IotMgrMap[GetItemBySid(sid)];ok{ return mgr } return IotMgrMap[iot.TypeGenset] } func RefreshStatus(sid string) { lg.Debug("statusMgr.RefreshStatus sid:", sid) GetMgrBySid(sid).RefreshStatus(sid) } //func AddStatus(sid string, statusMap map[string]interface{}) { // lg.Debug("statusMgr.AddStatus sid:", sid) // GetMgrBySid(sid).AddStatus(statusMap) //} func GetStatus(sid string) cs.MObject { lg.Debug("statusMgr.GetStatus sid:", sid) return GetMgrBySid(sid).GetStatus(sid) } //func GetPosition(sid string)(float64, float64, bool){ // lg.Debug("statusMgr.GetPosition sid:", sid) // return GetMgrBySid(sid).GetPosition(sid) //} //func AddPosition(sid string, x, y float64) { // lg.Debug("statusMgr.UpdatePosition sid:", sid, " x:", x, " y;", y) // GetMgrBySid(sid).AddPosition(sid, x, y) //} func (this *StatusMgr) GetDefaultStatusMap(sid string) map[string]interface{} { statusMap := map[string]interface{}(this.DefaultValue.Clone()) statusMap["sid"] = sid return statusMap } func (this *StatusMgr) AddStatus(statusMap map[string]interface{}) { chanObj := statusChanObj{} chanObj.ValueObject = this.DefaultValue.Clone() for k := range chanObj.ValueObject { if newV, ok := statusMap[k];ok{ chanObj.ValueObject[k] = newV } } chanObj.ValueObject["sn"] = ut.TUId() chanObj.ValueObject["createtime"] = ut.GetCurDbTime() chanObj.Status = chanObj.ValueObject.GetString("status") //lg.Debug("status", chanObj.Status) chanObj.BlockId = this.BlockId chanObj.ForceRefreshStatus = false statusChan <- chanObj } func (this *StatusMgr) GetStatus(sid string)cs.MObject { return this.Block.GetStatus(sid) } func (this *StatusMgr) RefreshStatus(sid string) { chanObj := statusChanObj{} chanObj.ValueObject = cs.MObject{} chanObj.ValueObject["sid"] = sid chanObj.ForceRefreshStatus = true chanObj.BlockId = this.BlockId statusChan <- chanObj } func (this *StatusMgr) AddPosition(sid string, x, y float64) { if x == 0 || y == 0{ return } if x == 113.344779 || y == 23.122803{ return } chanObj := posChanObj{} chanObj.BlockId = this.BlockId chanObj.Sid = sid chanObj.X = x chanObj.Y = y chanObj.T = ut.GetCurDbTime() posChan <- chanObj } //func (this *StatusMgr) GetPosition(sid string)(float64, float64, bool){ // return this.Block.GetPosition(sid) //} func (this *StatusMgr)GetPositions()([]map[string]interface{}) { return this.Block.GetPositions() } type dbBuff struct { dbName string Sql string buffList [][]interface{} } func newBuff(dbName, sql string, buffLen int) *dbBuff { o := dbBuff{} o.Sql = sql o.dbName = dbName o.buffList = make([][]interface{}, 0, buffLen) return &o } func (this *dbBuff) Add(one []interface{}) { this.buffList = append(this.buffList, one) //lg.Debug("buflist.Add:", this.buffList) } func (this *dbBuff) Save() { om.MultiExe(this.Sql, this.buffList) this.buffList = this.buffList[:0] } func (this *dbBuff) SaveToDb(){ if len(this.dbName) == 0{ om.MultiExe(this.Sql, this.buffList) }else{ om.DbMultiExe(this.dbName, this.Sql, this.buffList) } this.buffList = this.buffList[:0] } type StatusCacheBlock struct { // 状态项 StatusFields []string // 锁 StatusLock sync.Mutex PosLock sync.Mutex // 状态字典 StatusCacheMap map[string]*statusCache PosCacheMap map[string]*posCache // buff StatusAddBuff *dbBuff StatusUpdateBuff *dbBuff PosUpdateBuff *dbBuff } func NewStatusMgr(table, statusTable string, defaultValue cs.MObject) *StatusMgr { //lg.Info("NewStatusMgr talbe:", table, " statusTable:", statusTable, " defaultValue:", defaultValue) lg.Info(fmt.Sprintf("Init table %s status 2 offline", table)) om.DbUpdate(fmt.Sprintf("UPDATE '%s' SET 'status' = ?", table), "offline") block := StatusCacheBlock{} block.StatusFields = defaultValue.Keys() block.StatusLock = sync.Mutex{} block.PosLock = sync.Mutex{} block.StatusCacheMap = map[string]*statusCache{} block.PosCacheMap = map[string]*posCache{} block.StatusAddBuff = newBuff(statusTable, om.CreateInsertSql(statusTable, block.StatusFields), 1024) block.StatusUpdateBuff = newBuff("", om.CreateUpdateSql(table, []string{"status"}, "sid"), 1024) block.PosUpdateBuff = newBuff("", om.CreateUpdateSql(table, []string{"x", "y"}, "sid"), 1024) blockId := table gStatusCacheBlockMap[blockId] = &block o := StatusMgr{} o.BlockId = blockId o.DefaultValue = defaultValue.Clone() o.Block = &block return &o } type statusChanObj struct { BlockId string Status string ValueObject cs.MObject ForceRefreshStatus bool } type posChanObj struct { BlockId string Sid string X float64 Y float64 T string } func (this *StatusCacheBlock)GetStatus(sid string)cs.MObject{ ret := cs.MObject{} this.StatusLock.Lock() //fmt.Println("sid:", sid, "map", this.StatusCacheMap) if sCache, ok := this.StatusCacheMap[sid];ok{ ret = sCache.ValueObject.Clone() //fmt.Println("ret:", ret) }else{ ret = cs.MObject{} } this.StatusLock.Unlock() return ret } func (this *StatusCacheBlock)GetPosition(sid string)(float64, float64, bool){ x, y := float64(0), float64(0) retOk := false this.PosLock.Lock() pCache, retOk := this.PosCacheMap[sid] if retOk{ x = pCache.X y = pCache.Y } this.PosLock.Unlock() return x, y, retOk } func (this *StatusCacheBlock)GetPositions()([]map[string]interface{}){ this.PosLock.Lock() lstPos := make([]map[string]interface{},0) for sid, pos := range this.PosCacheMap{ pt := make(map[string]interface{}) pt["x"] = pos.X pt["y"] = pos.Y pt["t"] = pos.T pt["sid"] = sid lstPos = append(lstPos, pt) } this.PosLock.Unlock() return lstPos } func (this *StatusCacheBlock)recvStatus(chanObj statusChanObj) { //lg.Debug("StatusCacheBlock.recvStatus:", chanObj) if chanObj.ForceRefreshStatus { this.recvRefreshStatus(chanObj) } else { this.recvDateStatus(chanObj) } } func (this *StatusCacheBlock)recvRefreshStatus(chanObj statusChanObj) { this.StatusLock.Lock() sid := chanObj.ValueObject.GetString("sid") if sCache, ok := this.StatusCacheMap[sid]; ok { sCache.NeedRefresh = true sCache.NoDateCount = 0 } this.StatusLock.Unlock() } func (this *StatusCacheBlock)recvDateStatus(chanObj statusChanObj) { lstStatus := make([]interface{}, len(this.StatusFields)) for idx, field := range this.StatusFields { if v, ok := chanObj.ValueObject[field]; ok { lstStatus[idx] = v } else { lg.Error("StatusCacheBlock field error!") return } } this.StatusAddBuff.Add(lstStatus) // 状态缓存 this.StatusLock.Lock() sid := chanObj.ValueObject.GetString("sid") if _, ok := this.StatusCacheMap[sid]; !ok { sCache := statusCache{} sCache.DBStatus = "offline" this.StatusCacheMap[sid] = &sCache } sCache, _ := this.StatusCacheMap[sid] sCache.Status = chanObj.Status //lg.Debug("status", sCache.Status) sCache.ValueObject = chanObj.ValueObject sCache.NoDateCount = 0 this.StatusLock.Unlock() } func (this *StatusCacheBlock)recvPos(chanObj posChanObj) { //lg.Debug("StatusCacheBlock.recvPos", chanObj) // 位置缓存 this.PosLock.Lock() if _, ok := this.PosCacheMap[chanObj.Sid]; !ok { pCache := &posCache{} this.PosCacheMap[chanObj.Sid] = pCache } pCache, _ := this.PosCacheMap[chanObj.Sid] if pCache.X != chanObj.X || pCache.Y != chanObj.Y{ pCache.NeedRefresh = true } pCache.X = chanObj.X pCache.Y = chanObj.Y pCache.T = chanObj.T this.PosLock.Unlock() // 添加pos记录 gPosAddBuff.Add([]interface{}{chanObj.X, chanObj.Y, chanObj.T, chanObj.Sid}) } // 必须在锁中调用 func (this *StatusCacheBlock)doSave() { this.StatusAddBuff.SaveToDb() lstDelete := make([]string, 0) for sid, sCache := range this.StatusCacheMap { sCache.NoDateCount = sCache.NoDateCount + 1 // 新数据且状态改变,更新数据库 if sCache.DBStatus != sCache.Status { sCache.NeedRefresh = true }else { // 如果15个周期没有收到过数据,认为离线 if sCache.NoDateCount > 15 { sCache.NeedRefresh = true sCache.DBStatus = "offline" sCache.Status = "offline" lstDelete = append(lstDelete, sid) } } // 强制更新 if sCache.NeedRefresh { sCache.NeedRefresh = false this.StatusUpdateBuff.Add([]interface{}{sCache.Status, sid}) sCache.DBStatus = sCache.Status } } for _, sid := range lstDelete { delete(this.StatusCacheMap, sid) } this.StatusUpdateBuff.Save() for sid, pCache := range this.PosCacheMap { // 有新数据 if pCache.NeedRefresh{ this.PosUpdateBuff.Add([]interface{}{pCache.X, pCache.Y, sid}) } } this.PosUpdateBuff.Save() } type statusCache struct { BlockId string DBStatus string Status string ValueObject cs.MObject NoDateCount int NeedRefresh bool } type posCache struct { BlockId string X float64 Y float64 T string NeedRefresh bool } // 保存 func saveLoop() { timer := time.NewTicker(gSavePeriods * time.Second) for { select { case status := <-statusChan: if block, ok := gStatusCacheBlockMap[status.BlockId]; ok { block.recvStatus(status) } else { lg.Error("statusMgr.saveLoop recv status: blockID not exit") } case pos := <-posChan: if block, ok := gStatusCacheBlockMap[pos.BlockId]; ok { block.recvPos(pos) } else { lg.Error("statusMgr.saveLoop recv pos: blockID not exit") } case <-timer.C: for _, block := range gStatusCacheBlockMap { lg.Debug("block.doSave") block.doSave() } // 存储位置信息 lg.Debug("gPosAddBuff.SaveToDb") gPosAddBuff.SaveToDb() } } } var statusChan chan statusChanObj var posChan chan posChanObj var GsStatusMgr *StatusMgr var WPStatusMgr *StatusMgr var gStatusCacheBlockMap map[string]*StatusCacheBlock var gPosAddBuff *dbBuff var IotMgrMap = map[string]*StatusMgr{} func InitStatusMgr() { statusChan = make(chan statusChanObj, 1024) posChan = make(chan posChanObj, 1024) addPosSql := om.CreateInsertSql("position", []string{"x", "y", "createtime", "sid"}) gPosAddBuff = newBuff(etc.DbNamePosition, addPosSql, 1024) gStatusCacheBlockMap = map[string]*StatusCacheBlock{} for _, iotInfo := range iot.ThingsMap { stInfo, ok := ii.ItemInfoMap[iotInfo.Status] if !ok{ lg.Error("InitStatusMgr.ItemInfoMap error:no such item status:", iotInfo.Status) panic("InitStatusMgr.ItemInfoMap error") } itemInfo, ok := ii.ItemInfoMap[iotInfo.Name] if !ok{ lg.Error("InitStatusMgr.ItemInfoMap error:no such item:", itemInfo.Name) panic("InitStatusMgr.ItemInfoMap error") } defaultValue := cs.MObject{} for _, field := range stInfo.Fields{ defaultValue[field.Name] = field.GetDefaultValue() } IotMgrMap[iotInfo.Name] = NewStatusMgr(iotInfo.Name, iotInfo.Status, defaultValue) } for _, v := range modbus.ModelInfoMap{ if _, ok := IotMgrMap[v.Item];!ok{ lg.Error("InitStatusMgr.ModelInfoMap error:no such item:", v.Item) panic("InitStatusMgr.ModelInfoMap error:no such item") } } if defaultMgr, ok :=IotMgrMap[iot.TypeGenset];!ok{ GsStatusMgr = defaultMgr }else{ lg.Error("InitStatusMgr.GsStatusMgr error: no genset define") panic("InitStatusMgr.GsStatusMgr error: no genset define") } if mgr, ok :=IotMgrMap[iot.TypeWpVehicle];!ok{ WPStatusMgr = mgr }else{ lg.Error("InitStatusMgr.GsStatusMgr error: no wpvehicle define") panic("InitStatusMgr.GsStatusMgr error: no wpvehicle define") } //gsItemInfo, _ := ii.ItemInfoMap["gsstatus"] //gsDefaultValue := cs.MObject{} //for _, field := range gsItemInfo.Fields{ // gsDefaultValue[field.Name] = field.GetDefaultValue() //} //GsStatusMgr = NewStatusMgr("genset", "gsstatus", gsDefaultValue) //wpItemInfo, _ := ii.ItemInfoMap["wpstatus"] //wpDefaultValue := cs.MObject{} //for _, field := range wpItemInfo.Fields{ // wpDefaultValue[field.Name] = field.GetDefaultValue() //} //WPStatusMgr = NewStatusMgr("wpvehicle", "wpstatus", wpDefaultValue) go saveLoop() } // 初始化 var gSavePeriods time.Duration func init() { gSavePeriods = time.Duration(beego.AppConfig.DefaultInt64("savePeriods", 20)) lg.Info("statusMgr save period:", gSavePeriods) }