| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406 | package statusMgrimport (    "wb/om"    "sync"    "time"    "wb/ut"    "wb/ii"    "wb/lg"    "github.com/astaxie/beego"    "fmt"    "testbench/models/iot"    "wb/cs"    "testbench/models/etc")type StatusMgr struct {    BlockId      string    DefaultValue cs.MObject    Block        *StatusCacheBlock}func GetMgrBySid(sid string)*StatusMgr{    // zz的24位    t := iot.GetThingTypeById(sid)    if t == iot.TypeWpvehicle {        return WPStatusMgr    }    return GSStatusMgr}func RefreshStatus(sid string) {    lg.Debug("statusMgr.RefreshStatus sid:", sid)    GetMgrBySid(sid).RefreshStatus(sid)}//func AddStatus(sid string, statusMap map[string]interface{}) {//    lg.Debug("statusMgr.AddStatus sid:", sid)//    GetMgrBySid(sid).AddStatus(statusMap)//}func GetStatus(sid string) cs.MObject {    lg.Debug("statusMgr.GetStatus sid:", sid)    return GetMgrBySid(sid).GetStatus(sid)}//func GetPosition(sid string)(float64, float64, bool){//    lg.Debug("statusMgr.GetPosition sid:", sid)//    return GetMgrBySid(sid).GetPosition(sid)//}//func AddPosition(sid string, x, y float64) {//    lg.Debug("statusMgr.UpdatePosition sid:", sid, " x:", x, " y;", y)//    GetMgrBySid(sid).AddPosition(sid, x, y)//}func (this *StatusMgr) GetDefaultStatusMap(sid string) map[string]interface{} {    statusMap := map[string]interface{}(this.DefaultValue.Clone())    statusMap["sid"] = sid    return statusMap}func (this *StatusMgr) AddStatus(statusMap map[string]interface{}) {    chanObj := statusChanObj{}    chanObj.ValueObject = this.DefaultValue.Clone()    for k := range chanObj.ValueObject {        if newV, ok := statusMap[k];ok{            chanObj.ValueObject[k] = newV        }    }    chanObj.ValueObject["sn"] = ut.TUId()    chanObj.ValueObject["createtime"] = ut.GetCurDbTime()    chanObj.Status = chanObj.ValueObject.GetString("status")    //lg.Debug("status", chanObj.Status)    chanObj.BlockId = this.BlockId    chanObj.ForceRefreshStatus = false    statusChan <- chanObj}func (this *StatusMgr) GetStatus(sid string)cs.MObject {    return this.Block.GetStatus(sid)}func (this *StatusMgr) RefreshStatus(sid string) {    chanObj := statusChanObj{}    chanObj.ValueObject = cs.MObject{}    chanObj.ValueObject["sid"] = sid    chanObj.ForceRefreshStatus = true    chanObj.BlockId = this.BlockId    statusChan <- chanObj}func (this *StatusMgr) AddPosition(sid string, x, y float64) {    if x == 0 || y == 0{        return    }    if x == 113.344779 || y == 23.122803{        return    }    chanObj := posChanObj{}    chanObj.BlockId = this.BlockId    chanObj.Sid = sid    chanObj.X = x    chanObj.Y = y    chanObj.T = ut.GetCurDbTime()    posChan <- chanObj}//func (this *StatusMgr) GetPosition(sid string)(float64, float64, bool){//    return this.Block.GetPosition(sid)//}func (this *StatusMgr)GetPositions()([]map[string]interface{})  {    return this.Block.GetPositions()}type dbBuff struct {    dbName   string    Sql      string    buffList [][]interface{}}func newBuff(dbName, sql string, buffLen int) *dbBuff {    o := dbBuff{}    o.Sql = sql    o.dbName = dbName    o.buffList = make([][]interface{}, 0, buffLen)    return &o}func (this *dbBuff) Add(one []interface{}) {    this.buffList = append(this.buffList, one)    //lg.Debug("buflist.Add:", this.buffList)}func (this *dbBuff) Save() {    om.MultiExe(this.Sql, this.buffList)    this.buffList = this.buffList[:0]}func (this *dbBuff) SaveToDb(){    if len(this.dbName) == 0{        om.MultiExe(this.Sql, this.buffList)    }else{        om.DbMultiExe(this.dbName, this.Sql, this.buffList)    }    this.buffList = this.buffList[:0]}type StatusCacheBlock struct {    // 状态项    StatusFields     []string    // 锁    StatusLock       sync.Mutex    PosLock          sync.Mutex    // 状态字典    StatusCacheMap   map[string]*statusCache    PosCacheMap      map[string]*posCache    // buff    StatusAddBuff    *dbBuff    StatusUpdateBuff *dbBuff    PosUpdateBuff    *dbBuff}func NewStatusMgr(table, statusTable string, defaultValue cs.MObject) *StatusMgr {    //lg.Info("NewStatusMgr talbe:", table, " statusTable:", statusTable, " defaultValue:", defaultValue)    lg.Info(fmt.Sprintf("Init table %s status 2 offline", table))    om.DbUpdate(fmt.Sprintf("UPDATE '%s' SET 'status' = ?", table), "offline")    block := StatusCacheBlock{}    block.StatusFields = defaultValue.Keys()    block.StatusLock = sync.Mutex{}    block.PosLock = sync.Mutex{}    block.StatusCacheMap = map[string]*statusCache{}    block.PosCacheMap = map[string]*posCache{}    block.StatusAddBuff = newBuff(statusTable, om.CreateInsertSql(statusTable, block.StatusFields), 1024)    block.StatusUpdateBuff = newBuff("", om.CreateUpdateSql(table, []string{"status"}, "sid"), 1024)    block.PosUpdateBuff = newBuff("", om.CreateUpdateSql(table, []string{"x", "y"}, "sid"), 1024)    blockId := table    gStatusCacheBlockMap[blockId] = &block    o := StatusMgr{}    o.BlockId = blockId    o.DefaultValue = defaultValue.Clone()    o.Block = &block    return &o}type statusChanObj struct {    BlockId            string    Status             string    ValueObject        cs.MObject    ForceRefreshStatus bool}type posChanObj struct {    BlockId string    Sid     string    X       float64    Y       float64    T       string}func (this *StatusCacheBlock)GetStatus(sid string)cs.MObject{    ret := cs.MObject{}    this.StatusLock.Lock()    //fmt.Println("sid:", sid, "map", this.StatusCacheMap)    if sCache, ok := this.StatusCacheMap[sid];ok{        ret = sCache.ValueObject.Clone()        //fmt.Println("ret:", ret)    }else{        ret = cs.MObject{}    }    this.StatusLock.Unlock()    return ret}//func (this *StatusCacheBlock)GetPosition(sid string)(float64, float64, bool){//    x, y := float64(0), float64(0)//    retOk := false//    this.PosLock.Lock()//    pCache, retOk := this.PosCacheMap[sid]//    if retOk{//        x = pCache.X//        y = pCache.Y//    }//    this.PosLock.Unlock()//    return x, y, retOk//}func (this *StatusCacheBlock)GetPositions()([]map[string]interface{}){    this.PosLock.Lock()    lstPos := make([]map[string]interface{},0)    for sid, pos := range this.PosCacheMap{        pt := make(map[string]interface{})        pt["x"] = pos.X        pt["y"] = pos.Y        pt["t"] = pos.T        pt["sid"] = sid        lstPos = append(lstPos, pt)    }    this.PosLock.Unlock()    return lstPos}func (this *StatusCacheBlock)recvStatus(chanObj statusChanObj) {    //lg.Debug("StatusCacheBlock.recvStatus:", chanObj)    if chanObj.ForceRefreshStatus {        this.recvRefreshStatus(chanObj)    } else {        this.recvDateStatus(chanObj)    }}func (this *StatusCacheBlock)recvRefreshStatus(chanObj statusChanObj) {    this.StatusLock.Lock()    sid := chanObj.ValueObject.GetString("sid")    if sCache, ok := this.StatusCacheMap[sid]; ok {        sCache.NeedRefresh = true        sCache.NoDateCount = 0    }    this.StatusLock.Unlock()}func (this *StatusCacheBlock)recvDateStatus(chanObj statusChanObj) {    lstStatus := make([]interface{}, len(this.StatusFields))    for idx, field := range this.StatusFields {        if v, ok := chanObj.ValueObject[field]; ok {            lstStatus[idx] = v        } else {            lg.Error("StatusCacheBlock field error!")            return        }    }    this.StatusAddBuff.Add(lstStatus)    // 状态缓存    this.StatusLock.Lock()    sid := chanObj.ValueObject.GetString("sid")    if _, ok := this.StatusCacheMap[sid]; !ok {        sCache := statusCache{}        sCache.DBStatus = "offline"        this.StatusCacheMap[sid] = &sCache    }    sCache, _ := this.StatusCacheMap[sid]    sCache.Status = chanObj.Status    //lg.Debug("status", sCache.Status)    sCache.ValueObject = chanObj.ValueObject    sCache.NoDateCount = 0    this.StatusLock.Unlock()}func (this *StatusCacheBlock)recvPos(chanObj posChanObj) {    //lg.Debug("StatusCacheBlock.recvPos", chanObj)    // 位置缓存    this.PosLock.Lock()    if _, ok := this.PosCacheMap[chanObj.Sid]; !ok {        pCache := &posCache{}        this.PosCacheMap[chanObj.Sid] = pCache    }    pCache, _ := this.PosCacheMap[chanObj.Sid]    if pCache.X != chanObj.X || pCache.Y != chanObj.Y{        pCache.NeedRefresh = true    }    pCache.X = chanObj.X    pCache.Y = chanObj.Y    pCache.T = chanObj.T    this.PosLock.Unlock()    // 添加pos记录    gPosAddBuff.Add([]interface{}{chanObj.X, chanObj.Y, chanObj.T, chanObj.Sid})}// 必须在锁中调用func (this *StatusCacheBlock)doSave() {    this.StatusAddBuff.SaveToDb()    lstDelete := make([]string, 0)    for sid, sCache := range this.StatusCacheMap {        sCache.NoDateCount = sCache.NoDateCount + 1        // 新数据且状态改变,更新数据库        if sCache.DBStatus != sCache.Status {            sCache.NeedRefresh = true        }else {            // 如果15个周期没有收到过数据,认为离线            if sCache.NoDateCount > 15 {                sCache.NeedRefresh = true                sCache.DBStatus = "offline"                sCache.Status = "offline"                lstDelete = append(lstDelete, sid)            }        }        // 强制更新        if sCache.NeedRefresh {            sCache.NeedRefresh = false            this.StatusUpdateBuff.Add([]interface{}{sCache.Status, sid})            sCache.DBStatus = sCache.Status        }    }    for _, sid := range lstDelete {        delete(this.StatusCacheMap, sid)    }    this.StatusUpdateBuff.Save()    for sid, pCache := range this.PosCacheMap {        // 有新数据        if pCache.NeedRefresh{            this.PosUpdateBuff.Add([]interface{}{pCache.X, pCache.Y, sid})        }    }    this.PosUpdateBuff.Save()}type statusCache struct {    BlockId     string    DBStatus    string    Status      string    ValueObject cs.MObject    NoDateCount int    NeedRefresh bool}type posCache struct {    BlockId            string    X     float64    Y     float64    T     string    NeedRefresh bool}// 保存func saveLoop() {    timer := time.NewTicker(gSavePeriods * time.Second)    for {        select {        case status := <-statusChan:            if block, ok := gStatusCacheBlockMap[status.BlockId]; ok {                block.recvStatus(status)            } else {                lg.Error("statusMgr.saveLoop recv status: blockID not exit")            }        case pos := <-posChan:            if block, ok := gStatusCacheBlockMap[pos.BlockId]; ok {                block.recvPos(pos)            } else {                lg.Error("statusMgr.saveLoop recv pos: blockID not exit")            }        case <-timer.C:            for _, block := range gStatusCacheBlockMap {                lg.Debug("block.doSave")                block.doSave()            }        // 存储位置信息            lg.Debug("gPosAddBuff.SaveToDb")            gPosAddBuff.SaveToDb()        }    }}var statusChan chan statusChanObjvar posChan chan posChanObjvar GSStatusMgr *StatusMgrvar WPStatusMgr *StatusMgrvar gStatusCacheBlockMap  map[string]*StatusCacheBlockvar gPosAddBuff *dbBufffunc InitStatusMgr() {    statusChan = make(chan statusChanObj, 1024)    posChan = make(chan posChanObj, 1024)    addPosSql := om.CreateInsertSql("position", []string{"x", "y", "createtime", "sid"})    gPosAddBuff = newBuff(etc.DbNamePosition, addPosSql, 1024)    gStatusCacheBlockMap = map[string]*StatusCacheBlock{}    gsItemInfo, _ := ii.ItemInfoMap["gsstatus"]    gsDefaultValue := cs.MObject{}    for _, field := range gsItemInfo.Fields{        gsDefaultValue[field.Name] = field.GetDefaultValue()    }    GSStatusMgr = NewStatusMgr("genset", "gsstatus", gsDefaultValue)    wpItemInfo, _ := ii.ItemInfoMap["wpstatus"]    wpDefaultValue := cs.MObject{}    for _, field := range wpItemInfo.Fields{        wpDefaultValue[field.Name] = field.GetDefaultValue()    }    WPStatusMgr = NewStatusMgr("wpvehicle", "wpstatus", wpDefaultValue)    go saveLoop()}// 初始化var gSavePeriods time.Durationfunc init() {    gSavePeriods = time.Duration(beego.AppConfig.DefaultInt64("savePeriods", 20))    lg.Info("statusMgr save period:", gSavePeriods)}
 |