statusMgr.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. package statusMgr
  2. import (
  3. "wb/om"
  4. "sync"
  5. "time"
  6. "wb/ut"
  7. "wb/ii"
  8. "wb/lg"
  9. "github.com/astaxie/beego"
  10. "fmt"
  11. "eps/models/iot"
  12. "wb/cs"
  13. "eps/models/etc"
  14. "wb/modbus"
  15. "strings"
  16. )
  17. type StatusMgr struct {
  18. BlockId string
  19. DefaultValue cs.MObject
  20. Block *StatusCacheBlock
  21. }
  22. func GetItemBySid(sid string)string{
  23. if len(sid) >= 24{
  24. return iot.TypeGenset
  25. }
  26. item := ""
  27. for _, modelInfo := range modbus.ModelInfoMap{
  28. if strings.HasPrefix(sid, modelInfo.StartId){
  29. if len(item) > len(modelInfo.StartId){
  30. continue
  31. }
  32. item = modelInfo.Item
  33. }
  34. }
  35. if item == ""{
  36. lg.Error(" modelInfo error: no model match id:", sid)
  37. return iot.TypeGenset
  38. }else{
  39. return item
  40. }
  41. }
  42. func GetModelInfoBySid(sid string)modbus.ModelInfo{
  43. item := ""
  44. for _, modelInfo := range modbus.ModelInfoMap{
  45. if strings.HasPrefix(sid, modelInfo.StartId){
  46. if len(item) > len(modelInfo.StartId){
  47. continue
  48. }
  49. item = modelInfo.Item
  50. }
  51. }
  52. if item == ""{
  53. lg.Error(" modelInfo error: no model match id:", sid)
  54. return iot.TypeGenset
  55. }else{
  56. return item
  57. }
  58. }
  59. func GetMgrByModelItem(item string)*StatusMgr{
  60. if mgr, ok := IotMgrMap[item];ok{
  61. return mgr
  62. }
  63. return IotMgrMap[iot.TypeGenset]
  64. }
  65. func GetMgrBySid(sid string)*StatusMgr{
  66. if mgr, ok := IotMgrMap[GetItemBySid(sid)];ok{
  67. return mgr
  68. }
  69. return IotMgrMap[iot.TypeGenset]
  70. }
  71. func RefreshStatus(sid string) {
  72. lg.Debug("statusMgr.RefreshStatus sid:", sid)
  73. GetMgrBySid(sid).RefreshStatus(sid)
  74. }
  75. //func AddStatus(sid string, statusMap map[string]interface{}) {
  76. // lg.Debug("statusMgr.AddStatus sid:", sid)
  77. // GetMgrBySid(sid).AddStatus(statusMap)
  78. //}
  79. func GetStatus(sid string) cs.MObject {
  80. lg.Debug("statusMgr.GetStatus sid:", sid)
  81. return GetMgrBySid(sid).GetStatus(sid)
  82. }
  83. //func GetPosition(sid string)(float64, float64, bool){
  84. // lg.Debug("statusMgr.GetPosition sid:", sid)
  85. // return GetMgrBySid(sid).GetPosition(sid)
  86. //}
  87. //func AddPosition(sid string, x, y float64) {
  88. // lg.Debug("statusMgr.UpdatePosition sid:", sid, " x:", x, " y;", y)
  89. // GetMgrBySid(sid).AddPosition(sid, x, y)
  90. //}
  91. func (this *StatusMgr) GetDefaultStatusMap(sid string) map[string]interface{} {
  92. statusMap := map[string]interface{}(this.DefaultValue.Clone())
  93. statusMap["sid"] = sid
  94. return statusMap
  95. }
  96. func (this *StatusMgr) AddStatus(statusMap map[string]interface{}) {
  97. chanObj := statusChanObj{}
  98. chanObj.ValueObject = this.DefaultValue.Clone()
  99. for k := range chanObj.ValueObject {
  100. if newV, ok := statusMap[k];ok{
  101. chanObj.ValueObject[k] = newV
  102. }
  103. }
  104. chanObj.ValueObject["sn"] = ut.TUId()
  105. chanObj.ValueObject["createtime"] = ut.GetCurDbTime()
  106. chanObj.Status = chanObj.ValueObject.GetString("status")
  107. //lg.Debug("status", chanObj.Status)
  108. chanObj.BlockId = this.BlockId
  109. chanObj.ForceRefreshStatus = false
  110. statusChan <- chanObj
  111. }
  112. func (this *StatusMgr) GetStatus(sid string)cs.MObject {
  113. return this.Block.GetStatus(sid)
  114. }
  115. func (this *StatusMgr) RefreshStatus(sid string) {
  116. chanObj := statusChanObj{}
  117. chanObj.ValueObject = cs.MObject{}
  118. chanObj.ValueObject["sid"] = sid
  119. chanObj.ForceRefreshStatus = true
  120. chanObj.BlockId = this.BlockId
  121. statusChan <- chanObj
  122. }
  123. func (this *StatusMgr) AddPosition(sid string, x, y float64) {
  124. if x == 0 || y == 0{
  125. return
  126. }
  127. if x == 113.344779 || y == 23.122803{
  128. return
  129. }
  130. chanObj := posChanObj{}
  131. chanObj.BlockId = this.BlockId
  132. chanObj.Sid = sid
  133. chanObj.X = x
  134. chanObj.Y = y
  135. chanObj.T = ut.GetCurDbTime()
  136. posChan <- chanObj
  137. }
  138. //func (this *StatusMgr) GetPosition(sid string)(float64, float64, bool){
  139. // return this.Block.GetPosition(sid)
  140. //}
  141. func (this *StatusMgr)GetPositions()([]map[string]interface{}) {
  142. return this.Block.GetPositions()
  143. }
  144. type dbBuff struct {
  145. dbName string
  146. Sql string
  147. buffList [][]interface{}
  148. }
  149. func newBuff(dbName, sql string, buffLen int) *dbBuff {
  150. o := dbBuff{}
  151. o.Sql = sql
  152. o.dbName = dbName
  153. o.buffList = make([][]interface{}, 0, buffLen)
  154. return &o
  155. }
  156. func (this *dbBuff) Add(one []interface{}) {
  157. this.buffList = append(this.buffList, one)
  158. //lg.Debug("buflist.Add:", this.buffList)
  159. }
  160. func (this *dbBuff) Save() {
  161. om.MultiExe(this.Sql, this.buffList)
  162. this.buffList = this.buffList[:0]
  163. }
  164. func (this *dbBuff) SaveToDb(){
  165. if len(this.dbName) == 0{
  166. om.MultiExe(this.Sql, this.buffList)
  167. }else{
  168. om.DbMultiExe(this.dbName, this.Sql, this.buffList)
  169. }
  170. this.buffList = this.buffList[:0]
  171. }
  172. type StatusCacheBlock struct {
  173. // 状态项
  174. StatusFields []string
  175. // 锁
  176. StatusLock sync.Mutex
  177. PosLock sync.Mutex
  178. // 状态字典
  179. StatusCacheMap map[string]*statusCache
  180. PosCacheMap map[string]*posCache
  181. // buff
  182. StatusAddBuff *dbBuff
  183. StatusUpdateBuff *dbBuff
  184. PosUpdateBuff *dbBuff
  185. }
  186. func NewStatusMgr(table, statusTable string, defaultValue cs.MObject) *StatusMgr {
  187. //lg.Info("NewStatusMgr talbe:", table, " statusTable:", statusTable, " defaultValue:", defaultValue)
  188. lg.Info(fmt.Sprintf("Init table %s status 2 offline", table))
  189. om.DbUpdate(fmt.Sprintf("UPDATE '%s' SET 'status' = ?", table), "offline")
  190. block := StatusCacheBlock{}
  191. block.StatusFields = defaultValue.Keys()
  192. block.StatusLock = sync.Mutex{}
  193. block.PosLock = sync.Mutex{}
  194. block.StatusCacheMap = map[string]*statusCache{}
  195. block.PosCacheMap = map[string]*posCache{}
  196. block.StatusAddBuff = newBuff(statusTable, om.CreateInsertSql(statusTable, block.StatusFields), 1024)
  197. block.StatusUpdateBuff = newBuff("", om.CreateUpdateSql(table, []string{"status"}, "sid"), 1024)
  198. block.PosUpdateBuff = newBuff("", om.CreateUpdateSql(table, []string{"x", "y"}, "sid"), 1024)
  199. blockId := table
  200. gStatusCacheBlockMap[blockId] = &block
  201. o := StatusMgr{}
  202. o.BlockId = blockId
  203. o.DefaultValue = defaultValue.Clone()
  204. o.Block = &block
  205. return &o
  206. }
  207. type statusChanObj struct {
  208. BlockId string
  209. Status string
  210. ValueObject cs.MObject
  211. ForceRefreshStatus bool
  212. }
  213. type posChanObj struct {
  214. BlockId string
  215. Sid string
  216. X float64
  217. Y float64
  218. T string
  219. }
  220. func (this *StatusCacheBlock)GetStatus(sid string)cs.MObject{
  221. ret := cs.MObject{}
  222. this.StatusLock.Lock()
  223. //fmt.Println("sid:", sid, "map", this.StatusCacheMap)
  224. if sCache, ok := this.StatusCacheMap[sid];ok{
  225. ret = sCache.ValueObject.Clone()
  226. //fmt.Println("ret:", ret)
  227. }else{
  228. ret = cs.MObject{}
  229. }
  230. this.StatusLock.Unlock()
  231. return ret
  232. }
  233. func (this *StatusCacheBlock)GetPosition(sid string)(float64, float64, bool){
  234. x, y := float64(0), float64(0)
  235. retOk := false
  236. this.PosLock.Lock()
  237. pCache, retOk := this.PosCacheMap[sid]
  238. if retOk{
  239. x = pCache.X
  240. y = pCache.Y
  241. }
  242. this.PosLock.Unlock()
  243. return x, y, retOk
  244. }
  245. func (this *StatusCacheBlock)GetPositions()([]map[string]interface{}){
  246. this.PosLock.Lock()
  247. lstPos := make([]map[string]interface{},0)
  248. for sid, pos := range this.PosCacheMap{
  249. pt := make(map[string]interface{})
  250. pt["x"] = pos.X
  251. pt["y"] = pos.Y
  252. pt["t"] = pos.T
  253. pt["sid"] = sid
  254. lstPos = append(lstPos, pt)
  255. }
  256. this.PosLock.Unlock()
  257. return lstPos
  258. }
  259. func (this *StatusCacheBlock)recvStatus(chanObj statusChanObj) {
  260. //lg.Debug("StatusCacheBlock.recvStatus:", chanObj)
  261. if chanObj.ForceRefreshStatus {
  262. this.recvRefreshStatus(chanObj)
  263. } else {
  264. this.recvDateStatus(chanObj)
  265. }
  266. }
  267. func (this *StatusCacheBlock)recvRefreshStatus(chanObj statusChanObj) {
  268. this.StatusLock.Lock()
  269. sid := chanObj.ValueObject.GetString("sid")
  270. if sCache, ok := this.StatusCacheMap[sid]; ok {
  271. sCache.NeedRefresh = true
  272. sCache.NoDateCount = 0
  273. }
  274. this.StatusLock.Unlock()
  275. }
  276. func (this *StatusCacheBlock)recvDateStatus(chanObj statusChanObj) {
  277. lstStatus := make([]interface{}, len(this.StatusFields))
  278. for idx, field := range this.StatusFields {
  279. if v, ok := chanObj.ValueObject[field]; ok {
  280. lstStatus[idx] = v
  281. } else {
  282. lg.Error("StatusCacheBlock field error!")
  283. return
  284. }
  285. }
  286. this.StatusAddBuff.Add(lstStatus)
  287. // 状态缓存
  288. this.StatusLock.Lock()
  289. sid := chanObj.ValueObject.GetString("sid")
  290. if _, ok := this.StatusCacheMap[sid]; !ok {
  291. sCache := statusCache{}
  292. sCache.DBStatus = "offline"
  293. this.StatusCacheMap[sid] = &sCache
  294. }
  295. sCache, _ := this.StatusCacheMap[sid]
  296. sCache.Status = chanObj.Status
  297. //lg.Debug("status", sCache.Status)
  298. sCache.ValueObject = chanObj.ValueObject
  299. sCache.NoDateCount = 0
  300. this.StatusLock.Unlock()
  301. }
  302. func (this *StatusCacheBlock)recvPos(chanObj posChanObj) {
  303. //lg.Debug("StatusCacheBlock.recvPos", chanObj)
  304. // 位置缓存
  305. this.PosLock.Lock()
  306. if _, ok := this.PosCacheMap[chanObj.Sid]; !ok {
  307. pCache := &posCache{}
  308. this.PosCacheMap[chanObj.Sid] = pCache
  309. }
  310. pCache, _ := this.PosCacheMap[chanObj.Sid]
  311. if pCache.X != chanObj.X || pCache.Y != chanObj.Y{
  312. pCache.NeedRefresh = true
  313. }
  314. pCache.X = chanObj.X
  315. pCache.Y = chanObj.Y
  316. pCache.T = chanObj.T
  317. this.PosLock.Unlock()
  318. // 添加pos记录
  319. gPosAddBuff.Add([]interface{}{chanObj.X, chanObj.Y, chanObj.T, chanObj.Sid})
  320. }
  321. // 必须在锁中调用
  322. func (this *StatusCacheBlock)doSave() {
  323. this.StatusAddBuff.SaveToDb()
  324. lstDelete := make([]string, 0)
  325. for sid, sCache := range this.StatusCacheMap {
  326. sCache.NoDateCount = sCache.NoDateCount + 1
  327. // 新数据且状态改变,更新数据库
  328. if sCache.DBStatus != sCache.Status {
  329. sCache.NeedRefresh = true
  330. }else {
  331. // 如果15个周期没有收到过数据,认为离线
  332. if sCache.NoDateCount > 15 {
  333. sCache.NeedRefresh = true
  334. sCache.DBStatus = "offline"
  335. sCache.Status = "offline"
  336. lstDelete = append(lstDelete, sid)
  337. }
  338. }
  339. // 强制更新
  340. if sCache.NeedRefresh {
  341. sCache.NeedRefresh = false
  342. this.StatusUpdateBuff.Add([]interface{}{sCache.Status, sid})
  343. sCache.DBStatus = sCache.Status
  344. }
  345. }
  346. for _, sid := range lstDelete {
  347. delete(this.StatusCacheMap, sid)
  348. }
  349. this.StatusUpdateBuff.Save()
  350. for sid, pCache := range this.PosCacheMap {
  351. // 有新数据
  352. if pCache.NeedRefresh{
  353. this.PosUpdateBuff.Add([]interface{}{pCache.X, pCache.Y, sid})
  354. }
  355. }
  356. this.PosUpdateBuff.Save()
  357. }
  358. type statusCache struct {
  359. BlockId string
  360. DBStatus string
  361. Status string
  362. ValueObject cs.MObject
  363. NoDateCount int
  364. NeedRefresh bool
  365. }
  366. type posCache struct {
  367. BlockId string
  368. X float64
  369. Y float64
  370. T string
  371. NeedRefresh bool
  372. }
  373. // 保存
  374. func saveLoop() {
  375. timer := time.NewTicker(gSavePeriods * time.Second)
  376. for {
  377. select {
  378. case status := <-statusChan:
  379. if block, ok := gStatusCacheBlockMap[status.BlockId]; ok {
  380. block.recvStatus(status)
  381. } else {
  382. lg.Error("statusMgr.saveLoop recv status: blockID not exit")
  383. }
  384. case pos := <-posChan:
  385. if block, ok := gStatusCacheBlockMap[pos.BlockId]; ok {
  386. block.recvPos(pos)
  387. } else {
  388. lg.Error("statusMgr.saveLoop recv pos: blockID not exit")
  389. }
  390. case <-timer.C:
  391. for _, block := range gStatusCacheBlockMap {
  392. lg.Debug("block.doSave")
  393. block.doSave()
  394. }
  395. // 存储位置信息
  396. lg.Debug("gPosAddBuff.SaveToDb")
  397. gPosAddBuff.SaveToDb()
  398. }
  399. }
  400. }
  401. var statusChan chan statusChanObj
  402. var posChan chan posChanObj
  403. var GsStatusMgr *StatusMgr
  404. var WPStatusMgr *StatusMgr
  405. var gStatusCacheBlockMap map[string]*StatusCacheBlock
  406. var gPosAddBuff *dbBuff
  407. var IotMgrMap = map[string]*StatusMgr{}
  408. func InitStatusMgr() {
  409. statusChan = make(chan statusChanObj, 1024)
  410. posChan = make(chan posChanObj, 1024)
  411. addPosSql := om.CreateInsertSql("position", []string{"x", "y", "createtime", "sid"})
  412. gPosAddBuff = newBuff(etc.DbNamePosition, addPosSql, 1024)
  413. gStatusCacheBlockMap = map[string]*StatusCacheBlock{}
  414. for _, iotInfo := range iot.ThingsMap {
  415. stInfo, ok := ii.ItemInfoMap[iotInfo.Status]
  416. if !ok{
  417. lg.Error("InitStatusMgr.ItemInfoMap error:no such item status:", iotInfo.Status)
  418. panic("InitStatusMgr.ItemInfoMap error")
  419. }
  420. itemInfo, ok := ii.ItemInfoMap[iotInfo.Name]
  421. if !ok{
  422. lg.Error("InitStatusMgr.ItemInfoMap error:no such item:", itemInfo.Name)
  423. panic("InitStatusMgr.ItemInfoMap error")
  424. }
  425. defaultValue := cs.MObject{}
  426. for _, field := range stInfo.Fields{
  427. defaultValue[field.Name] = field.GetDefaultValue()
  428. }
  429. IotMgrMap[iotInfo.Name] = NewStatusMgr(iotInfo.Name, iotInfo.Status, defaultValue)
  430. }
  431. for _, v := range modbus.ModelInfoMap{
  432. if _, ok := IotMgrMap[v.Item];!ok{
  433. lg.Error("InitStatusMgr.ModelInfoMap error:no such item:", v.Item)
  434. panic("InitStatusMgr.ModelInfoMap error:no such item")
  435. }
  436. }
  437. if defaultMgr, ok :=IotMgrMap[iot.TypeGenset];!ok{
  438. GsStatusMgr = defaultMgr
  439. }else{
  440. lg.Error("InitStatusMgr.GsStatusMgr error: no genset define")
  441. panic("InitStatusMgr.GsStatusMgr error: no genset define")
  442. }
  443. if mgr, ok :=IotMgrMap[iot.TypeWpVehicle];!ok{
  444. WPStatusMgr = mgr
  445. }else{
  446. lg.Error("InitStatusMgr.GsStatusMgr error: no wpvehicle define")
  447. panic("InitStatusMgr.GsStatusMgr error: no wpvehicle define")
  448. }
  449. //gsItemInfo, _ := ii.ItemInfoMap["gsstatus"]
  450. //gsDefaultValue := cs.MObject{}
  451. //for _, field := range gsItemInfo.Fields{
  452. // gsDefaultValue[field.Name] = field.GetDefaultValue()
  453. //}
  454. //GsStatusMgr = NewStatusMgr("genset", "gsstatus", gsDefaultValue)
  455. //wpItemInfo, _ := ii.ItemInfoMap["wpstatus"]
  456. //wpDefaultValue := cs.MObject{}
  457. //for _, field := range wpItemInfo.Fields{
  458. // wpDefaultValue[field.Name] = field.GetDefaultValue()
  459. //}
  460. //WPStatusMgr = NewStatusMgr("wpvehicle", "wpstatus", wpDefaultValue)
  461. go saveLoop()
  462. }
  463. // 初始化
  464. var gSavePeriods time.Duration
  465. func init() {
  466. gSavePeriods = time.Duration(beego.AppConfig.DefaultInt64("savePeriods", 20))
  467. lg.Info("statusMgr save period:", gSavePeriods)
  468. }