Flevo CFD

Streaming Data with SSE Protocol in Python

HTTP Server Push or HTTP Streaming is a communication between the server and the user where the user only needs to establish a connection, and after that, data is sent to them without the need for the user to make requests.

There are several protocols for streaming data

HTTP Long Pulling or Hanging GET

In Long Pulling, when the user sends a request, the server checks if there is new data to send. If there is, it sends the data, and the connection closes, similar to a regular request. If there is no new data, it keeps the connection open until new data is ready, then sends it, and closes the connection.

Server Sent Events (SSE)

In this protocol, when the user sends a request and the initial connection is established, the server sends data as a stream, keeping the connection open. If new data arrives, it sends it without the user sending a new request. With SSE, data can be received in real-time through a single connection.

Web Sockets

Similar to SSE, Web Sockets send data as a stream without closing the connection. The key difference is that Web Sockets provide bidirectional communication, allowing both the user and the server to send data. setting this up needs more effort comparing to previous protocols.

Streaming Data

Using the mentioned protocols, the goal is to stream data from the server to the user without closing the connection, specifically using Flask in this example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import time

from flask import Flask, Response
from werkzeug.serving import run_simple


app = Flask("SSE Server")


@app.route("/")
def index():
    def generate():
        for i in range(100):
            yield str(i) + '\n'
            time.sleep(1)

    resp = Response(generate())
    return resp

run_simple(
    application=app, port=8000,
    use_debugger=1, use_reloader=1
)

To run the script

1
python run.py

To receive streamed data using curl

1
curl http://localhost:8000

The line break is added for the sake of recognizing a complete data packet by the recieving program.

Well, up to now, we have seen how to stream data. Now, what is SSE?

Sending Data Streams with Server Sent Events (SSE)

The SSE protocol defines certain standards for sending data

  • The server, to send data, must send the header Content-Type: text/event-stream.

  • The sent message must contain the string data: and end with \n\n so that the browser understands it has received a complete message.

  • Each message can optionally include the following fields

    • id: Message number
    • event: Message type
    • retry: Informs the browser to reconnect if the connection is lost after a specified number of milliseconds. It’s advisable to send this value only once at the beginning of the connection, not with each message.

Examples of SSE messages

1
2
3
4
5
6
7
8
id: 6598\n
data: test1\n
event: sse\n
retry: 3000\n\n

data: {\n
data: "msg": "I am Pepsi"\n
data: }\n\n

On the browser side

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
var source = new EventSource("stream_url");

source.onopen = function(e) {
    console.log("Connection opened");
}

source.onmessage = function (e) {
  console.log(e.id, e.data);
  if (e.event === 'close') {
       source.close()
  }
}

Replace stream_url with the server address that streams the data. All messages are processed through source.onmessage.

Messages starting with a colon : are not considered messages and are rejected by EventSource.

If the connection is disconnected for any reason, the browser attempts to reconnect. If the retry value is sent, it waits for that duration before retrying; otherwise, it retries approximately 3 seconds later.

When the connection is re-established from the browser side, the browser puts the id of the last message in the Last-Event-ID header and requests to continue receiving messages from that point, assuming the server has implemented such a feature.

What Happens to Idle Connections?

Web servers can be configured to close connections when no data has transmitted in a period of time. to prevent this a randomg string like :heartbeat can be sent to keep the connection open. for our case, we’re serving it using werkzeug which is a simple web server and can’t be configured for this feature. later on we’ll learn hot things work with Nginx and uWSGI.

Full Example with Flask

This is an example of front-end and backend with Flask

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import time

from flask import Flask, Response
from werkzeug.serving import run_simple


app = Flask("SSE Server")


@app.route("/")
def index():
    content = """
    <html>
        <header></header>
        <body>
            <script type='text/javascript'>
                var source = new EventSource("/stream");

                source.onopen = function(e) {
                    console.log("Connection opened.");
                }

                source.onmessage = function (e) {
                    console.log(e.id, e.data);
                    if (e.event === 'close') {
                        source.close()
                    }
                }
            </script>
        </body>
    </html>
    """
    return Response(content, mimetype="text/html")


@app.route("/stream")
def stream():
    def generate():
        for i in range(100):
            yield 'data: ' + str(i) + '\n\n'
            time.sleep(1)

    resp = Response(generate())
    resp.content_type = "text/event-stream"
    return resp


run_simple(
    application=app, port=5000,
    use_debugger=1, use_reloader=1, threaded=True
)

Open http://localhost:8000 in browser, go to Developer Tools > Network > xhr select the connection with title stream. the type should be EventStream.

Serve Behind Nginx and uWSGI

We demonstrated the solution using a simple web server. we can employ better tools to achieve maximum performance.

We want to put the app under a web server like Nginx. to achieve this we need to use an application server in order to be able to communicate with Nginx.

There’s a protocol called wsgi which defines the spec for python applications to communication with web servers. uWSGI is a popular application server that talks with Nginx in uwsgi protocol which implements wsgi.

Let’s configure uWSGI to communicate with Nginx using Unix Socket

Run the app

1
uwsgi --socket myapp.sock -w main:app --chmod-socket=644 --enable-threads --py-auto-reload=1

Replace the content of /etc/sites-available/default with the following block and restart Nginx.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
upstream myapp {
    server unix:///home/project/myapp.sock;
}

server {
    listen  8000;

    uwsgi_read_timeout 120;

    location / {
        include uwsgi_params;  // Include directives in /etc/nginx/uwsgi_params
        uwsgi_pass  myapp;
        uwsgi_buffering off; 
    }
}

The buffering is disabled by uwsgi_buffering off. buffering means that Nginx stores response from the proxied server and flushes it when it reaches specific size. we don’t want this since the data is returned as a stream.

Check the logs in real time to see if there’s any error

1
less +F /var/log/nginx/error.log

The common issue in file socket is permission error. Nignx is run by a dedicated user named www-data. make sure the directory is accessible for that user. you can ensure it by logging in as that user and run a command on the socket file

1
sudo -u www-data file /home/project/myapp.sock

uwsgi_read_timeout specifies the amount of time that if the uwsgi server does not transmit anything within this time, the connection is closed which here is set to 120 seconds.