Motivation
Recently, more and more websites, servers and tools are starting to adopt http2. What started as SPDY somewhere around 2012, evolved and accepted as http2 around 2015.
After some reading about some of the benfits of http2 I decided I have to give it a try.
Basically, I wanted to build a very simple “event driven” app that will use http2’s server push.
This was my general idea:
- Users get an SPA ui that allows them to read “live” messages
- The ui is served using server push over http2
- Messages are read from some kind of queue
- The user gets the messages as soon as they come
- Messages come from some source (doesn’t matter)
- Use polling as less as possible
The final code is at https://github.com/asafg6/MessagesAppPOC.
Want to know how I got there? Keep reading.
Client Updates
The first thing I thought about, was how will the client receive updates from the server. My line of thought automatically went towards websockets, which provides full duplex communication. Webscokets have been around for a while now and are supported in the popular browsers, so they make a nice choice. However, I wanted to use http2, but websockets are only using it to open the initial connection, the actual streaming just uses tcp. After some research, I came accross this article, by Martin Chaov. The article compares between long polling, websockets and server sent events. I have never used server sent events and also never got the reassuring “We use it in production” statement from anyone. In any case, this sounded like the best solution for what I wanted.
Here’s what Martin wrote about server sent events:
It works as expected and looks like magic at first. We get everything we want for our application to work in an efficient way. As with all things that look too good to be true, we sometimes face some issues that need to be addressed. However, they are not complicated to implement or go around
The most important things I got out from reading this are:
- Connections will be multiplexed over http2
- Clients that disconnect will automatically try to reconnect
Sounds perfect. As I chose Go for the server I just had to find an implementation for SSE.
Server Sent Events In Go
Go gives you a low level api that you can use in order to implement server sent events. You can cast http.ResponseWriter in to a http.Flusher. All you need to do is write to the responseWriter in the right format and after you are done call flusher.Flush().
I wanted to avoid reinventing the wheel, so I started looking for Go packages that implement an SSE server. Quick google search revealed some implementations, for example https://github.com/julienschmidt/sse and https://github.com/alexandrevicenzi/go-sse (there are more). I went through the sources and found a lot of stuff I didn’t expect to find. Channels, maps, configurations, goroutines and more. Certainly a lot more than I needed, or to be more accurate - a lot of stuff I just don’t need. I was really tempted to use one of the packages, their code was neatly written and looked pretty decent. However, taking into consideration the low amount of github stars on the packages I found and the high amount of bells and whistles, I decided to roll my own.
You can find my implementation at https://github.com/asafg6/sse_handler, it’s around 70 lines of code and it has no sophistication whatsoever.
Basically, it provides a function that wraps your “for loop” function that you’ll use to send events to the client and returns a *func(http.ResponseWriter, http.Request) that you can then pass to http.Handle.
So, after some coding and debugging, I was satisfied with my solution.
Server Push
Multiplexing is great, but server push should save precious milliseconds. The problem is, how do you know in advance what paths the client is going to ask for? The very simple solution would be to hold a list of paths in a configuration file and let the server read it when it starts. Don’t take my word for it, check out Nginx 1.13.9.The thing is… For this POC, I built a small React app using Create-React-App. Everytime you build your React app for production it generates js files with hashes (using webpack plugin). As a side note, maybe regular expressions support will offer a better solution? I decided to keep it as simple as possible. Just put all the files that the React build script generates in a list and push them whenever index.html is requested, assuming that the client will request them at some point.
The go interface for server push looks like this:
From https://blog.golang.org/h2push
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if pusher, ok := w.(http.Pusher); ok {
// Push is supported.
if err := pusher.Push("/app.js", nil); err != nil {
log.Printf("Failed to push: %v", err)
}
}
// ...
})
Here is my implementation:
func serveIndex(w http.ResponseWriter, r *http.Request) {
log.Println("serving index.html")
pusher, ok := w.(http.Pusher)
if ok {
log.Println("Push is supported")
for _, path := range pushPaths {
if err := pusher.Push(path , nil); err != nil {
log.Printf("Failed to push: %v", err)
} else {
log.Printf("Pushed %v", path)
}
}
}
http.ServeFile(w, r, client_dir + "/index.html")
}
func visit(path string, f os.FileInfo, err error) error {
if !f.IsDir() && !strings.Contains(path, "index.html") {
pushPaths = append(pushPaths, strings.TrimPrefix(path, client_dir))
}
return nil
}
func fillPushPaths() {
pushPaths = make([]string, 0)
err := filepath.Walk(client_dir, visit)
if err != nil {
log.Printf("filepath.Walk() returned %v\n", err)
}
log.Println(pushPaths)
}
Broadcasting Messages
After deciding on the infrastructure, I had to find a way to broadcast messages efficiently throughout my app. The Go standard way for communicating between goroutines is to use channels. After some research, I found out that the concurrency pattern I am looking for is “fan out”. You can see an example of the “fan out” pattern in this StackOverflow question. This pattern is quite nice, but it seems that opening a channel for each consumer is a waste. Especially in the case where all consumers need to get the same value. After some thinking, I decided to use shared memory. Instead of using channels, my idea was to use a linked list that will always return the current item. Each item will have a next function that will block until the next item will be available. Go has tools for such cases, sync.Cond lets you block until something happens with cond.wait(). When something does happen you can use Broadcast to wake up all the goroutines waiting. Here’s what I came up with at first:
package messages
import (
"sync"
)
type BroadcastChannel struct {
channelItem *ChannelItem
lock sync.Mutex
}
func (channel *BroadcastChannel) Listen() *ChannelItem {
channel.lock.Lock()
defer channel.lock.Unlock()
return channel.channelItem
}
func (channel *BroadcastChannel) Publish(data interface{}) {
channel.lock.Lock()
defer channel.lock.Unlock()
newItem := MakeNewChannelItem(data)
channel.channelItem.setNext(newItem)
channel.channelItem = newItem
}
func MakeNewBroadcastChannel() *BroadcastChannel {
m := sync.Mutex{}
nilItem := MakeNewChannelItem(nil)
return &BroadcastChannel{channelItem: nilItem, lock: m}
}
type ChannelItem struct {
next *ChannelItem
cond *sync.Cond
data interface{}
}
func (channelItem *ChannelItem) GetNextMessageOrWait() *ChannelItem {
channelItem.cond.L.Lock()
defer channelItem.cond.L.Unlock()
for channelItem.next == nil {
channelItem.cond.Wait()
}
return channelItem.next
}
func (channelItem *ChannelItem) setNext(next *ChannelItem) {
channelItem.cond.L.Lock()
defer channelItem.cond.L.Unlock()
channelItem.next = next
channelItem.cond.Broadcast()
}
func (channelItem *ChannelItem) GetData() interface{} {
return channelItem.data
}
func MakeNewChannelItem(data interface{}) *ChannelItem {
m := sync.Mutex{}
cond := sync.NewCond(&m)
return &ChannelItem{data:data, cond:cond}
}
This implementation kind of behaves like a channel, blocking until there’s a value.
After some more coding, I found out that this implementation has a very strong down side - it can’t be canceled. In a case that a client disconnects the code keeps blocking and the goroutine becomes a memory leak. There’s no way to cancel a goroutine from outside, it has to “die” on it’s own. Go’s SSE interface gives you a channel that tells you when the client disconnects, so naturally I wanted a way to use that.
Here’s my solution:
...
func (channelItem *ChannelItem) GetNextMessageOrWaitWithClose(close <- chan bool) *ChannelItem {
waitChan := make( chan struct{})
go func() {
channelItem.cond.L.Lock()
defer channelItem.cond.L.Unlock()
channelItem.cond.Wait()
waitChan <- struct {}{}
} ()
select {
case _, _ = <- waitChan:
return channelItem.next
case _, ok := <- close:
if ok {
return nil
}
}
return channelItem.next
}
...
As you can see, in the end I had to use what I was trying to avoid… a channel for each goroutine. However, this channel is an empty one so it hardly takes any memory, so it’s the lesser evil. I could avoid using the waitChan if Go’s select block supported waiting on a variable, but anyway, it gets the job done.
Someone Has To Poll Eventually
PubSub systems are the obvious tool for broadcasting messages. What I needed is a PubSub that lets you publish from one producer to many consumers, and can be clusterized. After some searching, I found out that Redis PUBSUB fits perfectly. Redis PUBSUB is scalable and has the option to publish messages from multiple producers to multiple consumers. The client uses long polling, which is still polling.. but I guess someone has to poll eventually.
There might be other tools that I could have used here, but as this was not my main focus I just went with the first one that fitted.
The Client
The client, as I already mentioned, is a very simple React app. It listens to events from the servers and displays the incoming messages.
SSE is very simple to implement in Javascript, here is my Events class:
class Events {
constructor() {
this.client = new EventSource('https://' + window.location.host + '/messages');
}
register(eventName, handler) {
this.client.addEventListener(eventName, handler);
}
unRegister(eventName, handler) {
this.client.removeEventListener(eventName, handler);
}
}
export default Events;
And the client code:
import React, { Component } from 'react';
class MessageList extends Component {
constructor(props) {
super(props);
this.state = {messages: []}
}
componentWillMount() {
const { events, channel } = this.props;
events.register(channel, (e) => {
this.addMessage(JSON.parse(e.data));
})
}
componentWillUnmount() {
const { events, channel } = this.props;
events.unRegister(channel, (e) => { this.addMessage(e) })
}
addMessage(e) {
let messages = this.state.messages.slice();
messages.push(e);
this.setState({ messages });
}
render() {
const { messages } = this.state;
return (
<div className="message-box" style={{backgroundColor: this.props.color}}>
{ messages.map((message) => {
return (
<div className="msg" key={message.id.toString()} >
<p>{message.data}</p>
</div>
);
}) }
</div>
);
}
}
export default MessageList;
Summary
All the pieces together make up the following flows:
Client flow:
- Client connects to the app, all resources are pushed using pusher
- Eventsource connects to SSE handler
- Components registers to events and wait for data
- Events come in and data is displayed on the screen
Server flow:
- A message is pushed to Redis PUBSUB
- The Redis client gets the message and pushes it to the BroadcastChannel
- SSE layer gets the message from BroadcastChannel and sends an event to the client
I hope you enjoyed, and of course, feel free to comment or share.
Thank you for reading.