videocapture.network - Videocapture

Table of Contents

1 Netzwerkübersicht

Netzwerk.png

Die bis zu 3 Kameras werden über eine Netzwerkkarte direkt mit dem jeweiligen Arbeitsplatz verbunden, damit sie vom Intranet getrennt sind. Die Therapie-Arbeitsplätze arbeiten passiv; die Supervisor-PCs sowie der Videoserver verbinden sich zu den Arbeitsplätzen und fragen die Daten ab. Bei Anfrage durch ein Supervisor-Arbeitsplatz wird das Videosignal als RTSP-Stream weitergereicht, und der Supervisor kann über eine verschlüsselte TCP-Verbindung Anmerkungen an den Arbeitsplatz senden. Die Art der Zugriffsberechtigung muss noch festgelegt werden. Der Videoserver verbindet sich direkt über SSH, und verschiebt die verschlüsselten Aufnahmen auf den Server.

Die Geräte werden über das normale, durch eine Firewall geschützte, Netzwerk der Praxisstelle Gesellschaftsstrasse miteinander verbunden.

Noch nicht aktueller Code unten.

2 Namespace: videocapture.network

This namespace provides all functionality needed for communication between a supervisor machine and one or more workstation machines.

3 Types

For the main communication, sockets are needed:

(s/def ::socket (partial instance? Socket))
(s/def ::server-socket (partial instance? ServerSocket))

The machines exchange messages. There are different message formats that specify what information the message contains.

Format Description
:workstation-data Used by the supervisor to get information about the newly connected workstation.
:message Text messages sent by the supervisor to the workstation.

For each format, the types request and response are defined.

The entire message has to contain the keys :format, :type and :data. The messages are exchanged in a stateless manner.

::net-message

Name Format Description Generate?
::format #{:workstation-data :message}   x
::type #{:request :response}   x
::data See ::data.    
<<type-from-table(table=net-message-type-tbl,type="::net-message")>>

<<workstation-data-type>>
<<message-data-type>>

(s/def ::data (s/or :workstation ::workstation-data
		    :message ::message))

3.1 :workstation-data

Argument Format Description Generate?
::workstation string? The workstation name. x
<<type-from-table(table=workstation-data-response,type="::workstation-data")>>

3.2 :message

Argument Format Description Generate?
::text string?   x
<<type-from-table(table=message-request,type="::message")>>

3.3 Current (deprecated) network types

For purposes of presentation, these deprecated network types have been used:

(s/def ::format #{:camera-data :message})
(s/def ::type #{:request :response})
(s/def ::data any?)
(s/def ::message (s/keys :req-un [::format ::type ::data]))

4 Messages

(defn handle-message [reader writer & [address]]
  (go-loop []
    (let [message (s/conform ::message (<! reader))
	  msg-format (:format message)
	  msg-type (:type message)
	  msg-data (:data message)]
      (debug "Handling message" message)
      (debug @network)
      (case msg-format
	:camera-data (case msg-type :request (do (>! writer {:format :camera-data
							     :type :response
							     :data {:workstation (settings/get-setting :form :room)
								    :camera (settings/get-setting :camera)}})
						 (debug "Sending camera data to supervisor."))
			   :response (do (swap! (:sockets @network)
						#(map (fn [x]
							(if (= address (:address x))
							  (assoc x :workstation (:workstation msg-data)
								 :camera (:camera msg-data))
							  x))
						      %))
					 (debug "Adding workstation:" msg-data "to" (pr-str @(:sockets @network)))))
	:message (case msg-type :request (if (= :supervisor (:type @network))
					   (swap! (:messages @(:active-workstation @network)) conj msg-data)
					   (swap! (:messages @network) conj msg-data)))))
    (debug "Done handling message.")
    (recur)))

(defn handle-text [text]
  (swap! (:messages @(:active-workstation @network)) conj text)
  (async/put! (:writer @(:active-workstation @network)) {:format :message :type :request :data text}))

5 Workstation

(defn workstation-handle [active? socket {:keys [read write]}]
  {:pre [(s/valid? ::socket socket)]}
  (.setKeepAlive socket true)
  (let [reader (BufferedReader. (InputStreamReader. (.getInputStream socket)))
	writer (PrintWriter. ^OutputStream (.getOutputStream socket) true)]
    (go-loop []
      (let [str (edn/read-string (.readLine reader))]
	(when-not (nil? str)
	  (>! read str)))
      (when (.isConnected socket) (recur)))
    (go-loop []
      (.println writer ^String (pr-str (<! write)))
      (when (.isConnected socket) (recur)))
    (handle-message read write)
    (go-loop []
      (if-not @active?
	(.close socket)
	(recur)))))

(defn workstation-init []
  (let [socket (atom nil)
	active? (atom true)
	channels {:read (chan 2)
		  :write (chan 2)}
	messages (atom [])]
    (go-loop []
      (reset! socket (s/conform ::socket
				(try (Socket. ^String (settings/get-setting :supervisor :ip) ^Integer (settings/get-setting :supervisor :port))
				     (catch Exception e (trace e)))))
      (if (= :clojure.spec.alpha/invalid @socket)
	(do (Thread/sleep 5000)
	    (when @active? (recur)))
	(workstation-handle active? @socket channels)))
    {:active? active?
     :socket socket
     :channels channels
     :messages messages
     :type :workstation}))

6 Supervisor

(defn supervisor-handle "Establishes the new connection, adding channel interfaces and storing it in the mount state."
  [sockmap socket]
  {:pre [(s/valid? ::socket socket)]}
  (.setKeepAlive socket true)
  (let [reader (BufferedReader. (InputStreamReader. (.getInputStream socket)))
	writer (PrintWriter. ^OutputStream (.getOutputStream socket) true)
	reader-chan (chan 2)
	writer-chan (chan 2)
	messages (atom [])
	address (.getHostAddress ^InetAddress (.getInetAddress socket))]
    ;; Reader loop
    (go-loop []
      (let [str (edn/read-string (.readLine reader))]
	(when-not (nil? str)
	  (>! reader-chan str)))
      (when (.isConnected socket) (recur)))
    ;; Writer loop
    (go-loop []
      (.println writer ^String (pr-str (<! writer-chan)))
      (when (.isConnected socket) (recur)))
    ;; Check for disconnects
    (go-loop []
      (if-not (.isConnected socket)
	(swap! sockmap dissoc address)
	(recur)))
    ;; Add message parser
    (handle-message reader-chan writer-chan address)
    ;; Request camera data
    (debug "Requesting camera data from new workstation.")
    (async/put! writer-chan {:format :camera-data
			     :type :request
			     :data nil})
    ;; Associations
    (swap! sockmap remove #(= address (:address %)))
    (swap! sockmap conj
	   {:address address
	    :reader reader-chan
	    :writer writer-chan
	    :messages messages
	    :socket socket})))

(defn supervisor-init []
  (let [server-socket (s/conform ::server-socket (ServerSocket. (settings/get-setting :supervisor :port)))
	active-workstation (atom nil)
	active? (atom true)
	sockets (atom [])
	messages (atom [])]
    (if (= :s/invalid server-socket)
      (fatal "Unable to open server-socket.")
      (go-loop []
	(try (supervisor-handle sockets (.accept server-socket))
	     (catch SocketException e (reset! active? false)))
	(when @active? (recur))))
    {:active? active?
     :sockets sockets
     :server-socket server-socket
     :active-workstation active-workstation
     :type :supervisor}))

7 Complete namespace definition

(ns videocapture.network
  (:require [videocapture.settings :as settings]
	    [clojure.edn :as edn]
	    [clojure.core.async :as async :refer [chan <! >! go go-loop]]
	    [clojure.spec.alpha :as s]
	    [mount.core :as mount]
	    [taoensso.timbre :as timbre
	     :refer [log trace debug info warn error fatal report
		     logf tracef debugf infof warnf errorf fatalf reportf
		     spy get-env]])
  (:import (java.net ServerSocket Socket InetAddress ConnectException SocketException)
	   (java.io BufferedReader InputStreamReader PrintWriter OutputStream)))

(declare network)

<<socket-types>>

<<message-types>>

<<net-messages>>

<<supervisor>>

<<workstation>>

(defn net-init []
  (if (settings/get-setting :supervisor :activated)
    (supervisor-init)
    (workstation-init)))

(defn net-stop [network]
  (reset! (:active @network) false)
  (if (= :supervisor (:type @network))
    (.close (:server-socket @network))
    (.close @(:socket @network))))

(mount/defstate network
   :start (net-init)
   :stop (net-stop network))

Author: Daniel Ziltener

Created: 2019-06-21 Fr 16:22

Validate