lipre

Stream text files for live (coding) representations
Log | Files | Refs

commit 96603232a44572accc7e421b3233083d8b845a35
parent be63b1958759aa7adf39f68c13717b5e533ef32d
Author: Vetle Haflan <vetle@haflan.dev>
Date:   Thu, 28 Jan 2021 23:54:36 +0100

Attempt to improve thread safety + some refactoring

Also adds a 'linger' prop to Room, which should eventually be used to
allow a room to remain open a certain amount of minutes after the
presenter has left.

The lipre.py modification does not work as I hoped for (disconnect
/ close is still not handled for some reason).

Diffstat:
Mlipre.go | 82++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------
Mlipre.py | 37+++++++++++++++++++++++++------------
2 files changed, 79 insertions(+), 40 deletions(-)

diff --git a/lipre.go b/lipre.go @@ -1,11 +1,12 @@ package main import ( - "encoding/json" "fmt" "io/ioutil" "log" "net/http" + "strconv" + "sync" "github.com/gorilla/mux" "github.com/gorilla/websocket" @@ -14,52 +15,76 @@ import ( // Not used - either read password from config / flag or define the valid rooms in a file on server const temporaryCorrectRoomCode = "tester" +type File struct { + Name string `json:"name"` + Contents string `json:"contents"` +} + type Room struct { + mu sync.Mutex code string presenter *websocket.Conn viewers []*websocket.Conn // Store files so that they can be sent to new viewers upon connection - files map[string][]byte + files map[string]File + // Number of minutes for which the room should continue to be open after the presenter disconnects + linger int +} + +var rooms = make(map[string]*Room) + +// Thread safe Room functions. +// The rooms map should only be updated from here +func (room *Room) Open() { + room.mu.Lock() + defer room.mu.Unlock() + existingRoom := rooms[room.code] + if existingRoom != nil { + fmt.Printf("Closing existing room '%v'\n", existingRoom.code) + existingRoom.Close(false) + } + rooms[room.code] = room + room.presenter.SetCloseHandler(func(code int, text string) error { + room.Close(true) + return nil + }) + go room.listen() } -func (room *Room) Close() { +func (room *Room) Close(presenter bool) { + room.mu.Lock() + defer room.mu.Unlock() for _, viewer := range room.viewers { if viewer != nil { viewer.Close() } } + // Close the presenter too, in case it's not closed already + if !presenter { + room.presenter.Close() + } delete(rooms, room.code) } -// TODO: Lock on this for concurrency? -var rooms = make(map[string]*Room) - func (room *Room) listen() { for { - _, message, err := room.presenter.ReadMessage() + var file File + err := room.presenter.ReadJSON(&file) if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Printf("Connection to room '%v' closed by presenter", room.code) + } else { log.Printf("error: %v", err) } - room.Close() - return - } - file := struct { - Name string `json:"name"` - Contents string `json:"contents"` - }{} - err = json.Unmarshal(message, &file) - if err != nil { - log.Printf("error: %v", err) - // TODO: Send info to presenter + // TODO: Handle JSON errors (send info to presenter) return } - room.files[file.Name] = message + room.files[file.Name] = file for _, viewerConn := range room.viewers { if viewerConn == nil { break } - viewerConn.WriteMessage(websocket.TextMessage, message) + viewerConn.WriteJSON(&file) } } } @@ -92,6 +117,12 @@ func fileHandler(w http.ResponseWriter, r *http.Request) { func presentHandler(w http.ResponseWriter, r *http.Request) { roomCode := mux.Vars(r)["roomCode"] + qparams := r.URL.Query() + pLinger := qparams["linger"] + var iLinger int + if len(pLinger) == 1 { + iLinger, _ = strconv.Atoi(pLinger[0]) + } /*if roomCode != temporaryCorrectRoomCode { w.WriteHeader(http.StatusBadRequest) return @@ -102,13 +133,8 @@ func presentHandler(w http.ResponseWriter, r *http.Request) { log.Println(err) return } - room := &Room{code: roomCode, presenter: conn, files: make(map[string][]byte)} - rooms[roomCode] = room - conn.SetCloseHandler(func(code int, text string) error { - room.Close() - return nil - }) - go room.listen() + room := &Room{code: roomCode, presenter: conn, linger: iLinger, files: make(map[string]File)} + room.Open() } func viewHandler(w http.ResponseWriter, r *http.Request) { @@ -124,7 +150,7 @@ func viewHandler(w http.ResponseWriter, r *http.Request) { } room.viewers = append(rooms[roomCode].viewers, conn) for _, filedata := range room.files { - conn.WriteMessage(websocket.TextMessage, filedata) + conn.WriteJSON(&filedata) } } diff --git a/lipre.py b/lipre.py @@ -33,6 +33,9 @@ def send_file(filename): print(f'Sending {filename}') ws.send(json.dumps(fileobj)) +def closed(): + print('Connection closed') + exit() program = sys.argv[0] if len(sys.argv) <= 1: @@ -47,16 +50,15 @@ elif len(sys.argv) >= 3: else: HOST = 'ws://localhost:8080' -ws = websocket.WebSocket() -ws.connect(f'{HOST}/ws/pres/{room_code}') +if ':' in room_code: + code, linger = room_code = room_code.split(':') + url = f'{HOST}/ws/pres/{code}?linger={linger}' +else: + url = f'{HOST}/ws/pres/{room_code}' if isfile(IGNOREFILE): ignorefilelist = [fn for fn in open(IGNOREFILE).read().split('\n') if fn] -# Initial file upload -filenames = [fn for fn in listdir() if isfile(fn)] -for fn in filenames: - send_file(fn) # Listen for changes class EventHandler(pyinotify.ProcessEvent): @@ -69,9 +71,20 @@ class EventHandler(pyinotify.ProcessEvent): def process_IN_MODIFY(self, event): self.process_IN_CREATE(event) -wm = pyinotify.WatchManager() -handler = EventHandler() -notifier = pyinotify.Notifier(wm, handler) -mask = pyinotify.IN_DELETE | pyinotify.IN_CREATE | pyinotify.IN_MODIFY -wm.add_watch('.', mask) -notifier.loop() +def present(): + # Initial file upload + filenames = [fn for fn in listdir() if isfile(fn)] + for fn in filenames: + send_file(fn) + # Continously watch for changes + wm = pyinotify.WatchManager() + handler = EventHandler() + notifier = pyinotify.Notifier(wm, handler) + mask = pyinotify.IN_DELETE | pyinotify.IN_CREATE | pyinotify.IN_MODIFY + wm.add_watch('.', mask) + notifier.loop() + +ws = websocket.WebSocket() +ws.connect(url) +ws.on_close = closed +present()