A pooled implementation of Twitch PubSub
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

298 lines
5.7 KiB

package twpubsub
import (
"context"
"encoding/json"
"log"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
"raccatta.cc/twpubsub/internal/utils"
)
var isTesting = false
type Dispatcher interface {
PubSubMessage(topic, chanid string, msg []byte)
}
type (
TwitchListenTopics struct {
Topics []string `json:"topics"`
AuthToken string `json:"auth_token,omitempty"`
}
TwitchListen struct {
Type string `json:"type"` // LISTEN
Nonce string `json:"nonce,omitempty"`
Data TwitchListenTopics `json:"data"`
}
)
type (
TwitchResponse struct {
Type string `json:"type"`
Error string `json:"error"`
Nonce string `json:"nonce"`
Data json.RawMessage `json:"data"`
}
TwitchMessage struct {
Topic string `json:"topic"`
Message string `json:"message"` // Contains a JSON-encoded string
}
TwitchBasicUser struct {
ID string `json:"id"`
DisplayName string `json:"display_name"`
Login string `json:"login"`
}
TwitchBasicImage struct {
URL1x string `json:"url_1x"`
URL2x string `json:"url_2x"`
URL4x string `json:"url_4x"`
}
)
func (tr TwitchResponse) DecodeAs(v interface{}) error {
return json.Unmarshal(tr.Data, v)
}
func (tm TwitchMessage) DecodeAs(v interface{}) error {
return json.Unmarshal([]byte(tm.Message), v)
}
type WS struct {
sync.Mutex
auth string
dispatcher Dispatcher
cli *websocket.Conn
topics map[string]bool
halt bool
cancel context.CancelFunc
}
func New(auth string, d Dispatcher) *WS {
ws := &WS{
topics: make(map[string]bool),
auth: auth,
dispatcher: d,
}
if !isTesting {
go ws.loop()
}
return ws
}
func (ws *WS) Close() error {
ws.Lock()
defer ws.Unlock()
ws.halt = true
if ws.cancel != nil {
ws.cancel()
}
return nil
}
func (ws *WS) Reconnect() {
ws.Lock()
defer ws.Unlock()
if ws.cancel != nil {
ws.cancel()
}
}
func (ws *WS) topicCount() int {
ws.Lock()
defer ws.Unlock()
return len(ws.topics)
}
func (ws *WS) listenTopic(topic string) error {
ws.Lock()
defer ws.Unlock()
return ws.listenTopics([]string{topic})
}
func (ws *WS) unlistenTopic(topic string) error {
ws.Lock()
defer ws.Unlock()
return ws.unlistenTopics([]string{topic})
}
////
func (ws *WS) listenTopics(topics []string) error {
var req []string
for _, t := range topics {
if !ws.topics[t] {
req = append(req, t)
ws.topics[t] = true
}
}
return ws.listenTopicsEx(req)
}
func (ws *WS) listenTopicsEx(topics []string) error {
if ws.cli == nil || topics == nil {
return nil
}
lst, _ := json.Marshal(TwitchListen{
Type: "LISTEN",
Nonce: utils.Nonce(),
Data: TwitchListenTopics{
Topics: topics,
AuthToken: ws.auth,
},
})
if MetricsCounter != nil {
MetricsCounter.MessageTX("LISTEN")
}
ws.cli.SetWriteDeadline(time.Now().Add(time.Second * 30))
return ws.cli.WriteMessage(websocket.TextMessage, lst)
}
func (ws *WS) unlistenTopics(topics []string) error {
var req []string
for _, t := range topics {
if ws.topics[t] {
delete(ws.topics, t)
req = append(req, t)
}
}
if ws.cli == nil || len(req) == 0 {
return nil
}
lst, _ := json.Marshal(TwitchListen{
Type: "UNLISTEN",
Nonce: utils.Nonce(),
Data: TwitchListenTopics{Topics: req},
})
if MetricsCounter != nil {
MetricsCounter.MessageTX("UNLISTEN")
}
ws.cli.SetWriteDeadline(time.Now().Add(time.Second * 30))
err := ws.cli.WriteMessage(websocket.TextMessage, lst)
if err != nil && ws.cancel != nil {
ws.cancel()
}
return err
}
func (ws *WS) loop() {
for {
ws.Lock()
h := ws.halt
ws.Unlock()
if h {
return
}
if err := ws.once(); err != nil {
log.Println("pubsub error:", err)
}
time.Sleep(time.Second * 5)
}
}
func (ws *WS) once() error {
server := "wss://pubsub-edge.twitch.tv"
c, _, err := websocket.DefaultDialer.Dial(server, nil)
if err != nil {
return err
}
defer c.Close()
log.Println("Establishing connection to", server)
defer log.Println("Connection to", server, "closed")
ctx, cancel := context.WithCancel(context.Background())
ws.Lock()
ws.cli = c
ws.cancel = cancel
ws.Unlock()
defer func() {
ws.Lock()
ws.cli = nil
ws.cancel = nil
ws.Unlock()
}()
ticker := time.NewTicker(time.Second * 60 * 2)
defer ticker.Stop()
ws.Lock()
var topics []string
for topic := range ws.topics {
topics = append(topics, topic)
}
ws.listenTopicsEx(topics)
ws.Unlock()
errch := make(chan error, 1)
go func() {
defer close(errch)
for {
_, message, err := c.ReadMessage()
if err != nil {
errch <- err
return
}
var m TwitchResponse
if err := json.Unmarshal(message, &m); err != nil || (m.Type != "PONG" && m.Type != "MESSAGE") {
log.Printf("ws << %q", message)
}
topic, chanid := topicFromData(m.Data)
if MetricsCounter != nil {
MetricsCounter.MessageRX(m.Type, topic)
}
if m.Type == "MESSAGE" {
ws.dispatcher.PubSubMessage(topic, chanid, m.Data)
}
// RECONNECT
// TODO: some errors are really bad, like too many topics
//if resp.Error != "" {
// c.Close()
//}
}
}()
for {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errch:
return err
case <-ticker.C:
ws.Lock()
c.SetWriteDeadline(time.Now().Add(time.Second * 30))
if MetricsCounter != nil {
MetricsCounter.MessageTX("PING")
}
err := c.WriteMessage(websocket.TextMessage, []byte(`{"type":"PING"}`))
ws.Unlock()
if err != nil {
return err
}
}
}
return nil
}
func topicFromData(raw []byte) (string, string) {
var m struct {
Topic string `json:"topic"`
}
if err := json.Unmarshal(raw, &m); err != nil {
return "", ""
}
return extractTopic(m.Topic)
}
func extractTopic(s string) (string, string) {
if c := strings.LastIndexByte(s, '.'); c > -1 {
return s[:c], s[c+1:]
}
return s, ""
}