go如何实现并发安全的优先级队列

描述

go的如何实现优先级队列?

优先级别队列是否并发安全?

我的需求场景

实现了一个通信协议,收发消息。发送通道会阻塞。如果拥堵时,要控制拥堵消息的优先顺序。

优先级用Priority加上时间, Priority小优先, 早到优先。

原理

  1. 通过实现官方接口heap, 得到优先级队列

  2. 使用用chan进行控制并发控制,达到并发安全的效果

优先队列实现

package queue

import (
    "xxxx/packets"
    "container/heap"
    "time"
)

//---------------------------------------
//      优先队列(提供并发安全接口)
//---------------------------------------

func NewPriorityQueue() *PriorityQueue{
    q := &PriorityQueue{
        isPushing: false,
        ch: make(chan *QueueItem, 1),
    }
    return q
}


type PriorityQueue struct{
    //队列(最小堆)
    items []*QueueItem  //堆
    ch chan *QueueItem  //同步控制
    isPushing bool  //推送状态
}

// 数据项
type QueueItem struct {
    Value       *packets.Packet // value
    Priority    int                 // 优先级(越小越靠前)
    JoinTime    time.Time           // 加入时间
    index       int                 // 索引
}

//---------------------------------------
//      heap接口
//---------------------------------------
func (pq PriorityQueue) Len() int { return len(pq.items) }

func (pq PriorityQueue) Less(i, j int) bool {
    // priority越小越靠前
    if pq.items[i].Priority != pq.items[j].Priority{
        return pq.items[i].Priority < pq.items[j].Priority
    }else{
        return pq.items[i].JoinTime.Sub(pq.items[j].JoinTime) <0
    }
}

func (pq PriorityQueue) Swap(i, j int) {
    pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
    pq.items[i].index = i
    pq.items[j].index = j
}

func (pq *PriorityQueue) Push(x interface{}) {
    n := len(pq.items)
    item := x.(*QueueItem)
    item.index = n
    pq.items = append(pq.items, item)
}

func (pq *PriorityQueue) Pop() interface{} {
    old := *pq
    n := len(old.items)
    if n==0{
        return nil
    }
    item := old.items[n-1]
    old.items[n-1] = nil  // avoid memory leak
    item.index = -1 // for safety
    pq.items = old.items[0 : n-1]
    return item
}

// update modifies the priority and value of an Item in the queue.
func (pq *PriorityQueue) update(item *QueueItem, value *packets.Packet, priority int) {
    item.Value = value
    item.Priority = priority
    heap.Fix(pq, item.index)
}

//---------------------------------------
//      同步接口
//---------------------------------------
func (pq *PriorityQueue) Get(timeout time.Duration) *QueueItem{
    timeout |= 30

    ticker := time.NewTicker(time.Second*timeout)
    defer ticker.Stop()

    select {
    case result := <- pq.ch:
        return result
    case <- ticker.C:
    }
    return nil
}

func (pq *PriorityQueue) PutNoWait(item *QueueItem){
    heap.Push(pq, item)
    //启用推送
    pq.startPushing()
}

//负责推送数据
func (pq *PriorityQueue) startPushing(){
    if pq.isPushing{
        return
    }
    go pq._startPushing()
}

func (pq *PriorityQueue) _startPushing(){
    for pq.Len()>0{
        pq.isPushing = true

        x := heap.Pop(pq)
        if x==nil{
            continue
        }
        item := x.(*QueueItem)
        pq.ch <- item
    }
    pq.isPushing = false
}

用法示例

//并发安全用法
func TestSendQueue(t *testing.T){
    q := NewPriorityQueue()
    q.PutNoWait(&QueueItem{
        Value:  &packets.Packet{
                        Header: &packets.SecsHeader{
                        System: 1,
                    },
                    Data: []byte{},
                },
        Priority: 100,
        JoinTime: time.Now(),
    })
    item := q.Get(30)
    fmt.Println(item)
}

总结

上面已经满足我的需求场景了。 队列的删除、撤销,并不是我的应用场景,不在这里考虑,也不是队列该做的事。

佛說大乘無量壽莊嚴清淨平等覺經pdf
净土大经科注2014-doc

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注