forked from stribny/python-web-realtime-streaming
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
35 lines (23 loc) · 795 Bytes
/
main.py
File metadata and controls
35 lines (23 loc) · 795 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import json
import asyncio
from fastapi import FastAPI
from fastapi import Request
from fastapi import WebSocket
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
import random
app = FastAPI()
app.mount("/static",StaticFiles(directory='static'))
templates = Jinja2Templates(directory="templates")
with open('measurements.json', 'r') as file:
measurements = iter(json.loads(file.read()))
@app.get("/")
def read_root(request: Request):
return templates.TemplateResponse("index.htm", {"request": request})
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
await asyncio.sleep(0.1)
payload = next(measurements)
await websocket.send_json(payload)