| | |
| | | |
| | | //Element info |
| | | type ElemInfo struct { |
| | | ShmId int |
| | | PicId int |
| | | InfoId int |
| | | } |
| | | |
| | | //queue info |
| | |
| | | shmdata := bytes2shmEsQueue(data) |
| | | fmt.Println("shmdata:", shmdata) |
| | | eqi.EsCaches = ptr2esCache(unsafe.Pointer(&shmdata.cache), int(shmdata.capacity)) |
| | | //fmt.Println("EsCaches:", eqi.EsCaches) |
| | | fmt.Println("EsCaches:", eqi.EsCaches) |
| | | |
| | | eqi.Queue = shmdata |
| | | eqi.ShmKey = key |
| | |
| | | |
| | | // ReleaseQueue detach an exist shm queue |
| | | func (eqi *EsQueueInfo) ReleaseQueue() { |
| | | fmt.Println("ReleaseQueue: key", eqi.ShmKey) |
| | | fmt.Printf("ReleaseQueue: key=%x\n", eqi.ShmKey) |
| | | DestroyShm(eqi.ShmData) |
| | | } |
| | | |
| | | // RemoveShmId remove an exist shm queue (ipcrm -m shmid) |
| | | func (eqi *EsQueueInfo) RemoveShmId() error { |
| | | fmt.Printf("RemoveShmId: key=%x\n", eqi.ShmKey) |
| | | return shm.Rm(eqi.ShmID) |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | return int(quantity) |
| | | } |
| | | |
| | | // Status status of shm queue |
| | | func (eqi *EsQueueInfo) QueueElements() []ElemInfo { |
| | | var elems []ElemInfo |
| | | |
| | | for i := 0; i < len(eqi.EsCaches); i++ { |
| | | if eqi.EsCaches[i].value.PicId != 0 && eqi.EsCaches[i].value.InfoId != 0 { |
| | | elems = append(elems, eqi.EsCaches[i].value) |
| | | } |
| | | } |
| | | |
| | | return elems |
| | | } |
| | | |
| | | // Put 单次写入,可能失败 |
| | |
| | | putNo := atomic.LoadUint32(&cache.putNo) |
| | | if getPosNew == getNo && getNo == putNo-eqi.Queue.capacity { |
| | | val := cache.value |
| | | cache.value = ElemInfo{ShmId:0} |
| | | cache.value = ElemInfo{PicId: 0} |
| | | atomic.AddUint32(&cache.getNo, eqi.Queue.capacity) |
| | | return val, true, int(posCnt - 1) |
| | | } else { |
| | |
| | | v++ |
| | | return v |
| | | } |
| | | |