A pooled implementation of Twitch PubSub
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

101 lines
1.9 KiB

package twpubsub
import (
"container/list"
"errors"
"sync"
)
// ErrTooManyTopics is returned when the caller violates
// twitchMaxTopics*twitchMaxCons
var ErrTooManyTopics = errors.New("Too many topics")
const (
twitchMaxTopics = 50
twitchMaxCons = 10
)
// Pooled is a PubSub client with multiple connections to work around Twitch
// topic limits.
type Pooled struct {
Dispatcher Dispatcher
mu sync.Mutex
cons *list.List
topics map[string]*list.Element
auth string
}
// NewPool creates a new pool.
func NewPool(d Dispatcher, opts ...PoolOpts) *Pooled {
p := &Pooled{
Dispatcher: d,
cons: list.New(),
topics: make(map[string]*list.Element),
}
for _, opt := range opts {
switch v := opt.(type) {
case PoolOptAuth:
p.auth = v.Token
}
}
return p
}
// AddTopics adds new topics to listen to, automatically creating connections
// if necessary.
func (p *Pooled) AddTopics(topics []string) error {
for _, t := range topics {
e := p.topics[t]
if e != nil {
continue
}
var ws *WS
var elem *list.Element
for e := p.cons.Front(); e != nil; e = e.Next() {
ws = e.Value.(*WS)
if ws.topicCount() < twitchMaxTopics {
elem = e
break
}
}
if elem == nil {
if p.cons.Len() >= twitchMaxCons {
return ErrTooManyTopics
}
ws = New(p.auth, p.Dispatcher)
elem = p.cons.PushFront(ws)
}
ws.listenTopic(t)
p.topics[t] = elem
}
return nil
}
// RemoveTopics removes and unlistens topics, automatically removing
// connections if possible. Topics are NOT rebalanced to minimize the number of
// connections.
func (p *Pooled) RemoveTopics(topics []string) {
for _, t := range topics {
e := p.topics[t]
if e == nil {
continue
}
ws := e.Value.(*WS)
ws.unlistenTopic(t)
delete(p.topics, t)
if ws.topicCount() == 0 {
ws.Close()
p.cons.Remove(e)
}
}
}
//
//
//
type PoolOpts interface{}
type PoolOptAuth struct {
Token string
}