videocapture.pipeparser - Videocapture
Table of Contents
1 Namespace videocapture.pipeparser
2 Types
(s/def ::type string?) (s/def ::name string?) (s/def ::input (s/or :single string? :multi (s/coll-of string?))) (s/def ::caps string?) (s/def :gstreamer/element (partial instance? org.freedesktop.gstreamer.Element)) (s/def :gstreamer/pipeline (partial instance? org.freedesktop.gstreamer.Pipeline)) (s/def ::settings any?) ;;<<appcap-specs()>> <<type-from-table(table=appcaptable,type='())>> (s/def ::appcap (s/keys :opt-un [:appcap/device :appcap/storage-valve :appcap/encoder :appcap/endpoint :appcap/source :appcap/media])) (s/def ::specials (s/map-of keyword? any?)) (s/def ::appcap-value (s/tuple (s/or :keyword keyword? :string string? :boolean boolean?) :gstreamer/element)) (s/def ::appcaps (s/map-of keyword? (s/coll-of ::appcap-value))) (s/def ::pipe-element (s/keys :req-un [::type ::name] :opt-un [::input ::caps :gstreamer/element ::settings ::appcap])) (s/def ::pipe-elements (s/coll-of ::pipe-element))
3 Main pipeline building process
(defn build-pipeline [pipename]
{:pre [(keyword? pipename)]
:post [(s/valid? (s/tuple :gstreamer/pipeline ::appcaps) %)]}
(Gst/init "VideoCapture" (into-array String []))
(debug "Building pipeline" pipename "...")
(trace (settings/get-setting :C :pipelines pipename))
(let [specials (atom {})
nodes (->> (settings/get-setting :C :pipelines pipename)
(map apply-node)
(map apply-settings)
(map apply-caps)
(map #(apply-appcap % specials)))
pipe (new Pipeline)]
(.addMany pipe (into-array Element (map :element nodes)))
(debug (pr-str @specials))
(debug "Linking pipeline...")
(linker nodes)
(.connect (.getBus pipe) (fx/fi org.freedesktop.gstreamer.Bus$WARNING
[bus msg] (warn (.getStructure msg))))
(.connect (.getBus pipe) (fx/fi org.freedesktop.gstreamer.Bus$ERROR
[bus msg] (error (.getStructure msg))))
;(.connect (.getBus pipe) (fx/fi org.freedesktop.gstreamer.Bus$INFO
; [bus msg] (info (.getStructure msg))))
;(.connect (.getBus pipe) (fx/fi org.freedesktop.gstreamer.Bus$MESSAGE
; [bus msg] (debug (.getStructure msg))))
(debug "Done building and linking pipeline.")
(.debugToDotFile pipe 6 "build")
[pipe @specials]))
4 Helper functions
(defn log-through [in] (debug "Log through:" (pr-str in)) in)
(defn conj-vec "Helper function to add values to a vector inside a map." [map path value]
;;{:pre [(s/valid? ::appcaps map) (s/valid? (s/coll-of keyword?) path) (s/valid? ::appcap-value value)]
;; :post [#(s/valid? ::appcaps %)]}
(let [path (if (coll? path) path [path])]
(case (get-in map path)
nil (assoc-in map path [value])
(assoc-in map path (conj (get-in map path) value)))))
5 Pipeline builder
(defn apply-node "Extracts the GStreamer node out of the videocapture map." [element]
{:pre [(s/valid? ::pipe-element element)]
:post [(s/valid? ::pipe-element %)]}
(assoc element :element (ElementFactory/make (:type element) (:name element))))
(defn apply-settings "" [element]
{:pre [(s/valid? ::pipe-element element)]
:post [(s/valid? ::pipe-element %)]}
(doseq [setting (:settings element)]
(trace element)
(if-not (nil? (namespace (key setting)))
(.connect (:element element)
(fx/fi org.freedesktop.gstreamer.Element$PAD_ADDED
[elem pad]
(when (= (namespace (key setting)) (.getName pad))
(.set pad (name (key setting)) (val setting)))))
(.set (:element element) (name (key setting)) (val setting)))
)
element)
(defn apply-caps "Applies the GStreamer caps from the EDN file to the actual GStreamer objects." [element]
{:pre [(s/valid? ::pipe-element element)]
:post [(s/valid? ::pipe-element %)]}
(when (contains? element :caps)
(.setCaps (:element element) (new Caps (:caps element))))
element)
The following appcap specs are defined:
| Appcap | Allowed values | Description | Generate? |
|---|---|---|---|
| :appcap/device | #{:display} | The device this element's output is to be displayed on | x |
| :appcap/storage-valve | #{true false} | This element is a valve in front of an encoder, and gets opened when the recording is to be started. | x |
| :appcap/encoder | #{true false} | This element is an encoder. | x |
| :appcap/endpoint | #{true false} | This element is the pipeline endpoint (e.g. a video muxer) | x |
| :appcap/source | #{:device :rtsp} | This is a source that has to be connected to an outside video source; a file (like /dev/video) in case of :device, or a network stream in case of :rtsp. | x |
| :appcap/media | #{"audio/x-raw" "video/x-raw"} | The media type this element accepts. Used for dynamic linking. | x |
| :appcap/padname | string? | The pad name this element's buffers should be fed into. | x |
REMOVE THIS CODE BLOCK
(defn apply-appcap [element specials]
{:pre [(s/valid? ::pipe-element element) (s/valid? ::appcaps @specials)]
:post [(s/valid? ::pipe-element %)]}
(trace (:name element) "appcap:" (:appcap element))
(when (contains? element :appcap)
(trace (:name element) "contains appcap:" (pr-str (:appcap element)))
(doseq [appcap (:appcap element)]
(swap! specials conj-vec [(key appcap)] [(val appcap) (:element element)])))
element)
In the next and last step, the pipeline segments get linked. This is a three-step process:
- extracting the source pads of all elements;
- creating a hashmap with source-to-target relations;
- the actual linking.
(defn relation-extractor [targets]
(flatten
(for [target targets
:when (contains? target :input)
:let [out-node (:element target)
inputs (into #{} (if (coll? (:input target)) (:input target) (str/split (:input target) #" ")))
in-elems (filter #(inputs (:name %)) targets)]]
(do
(reduce #(conj %1 {:source %2 :target target}) [] in-elems)))))
(defn link-by-media-type [source target]
(let [source-elem (:element source)
target-elem (:element target)
media (-> target :appcap :media)
target-pad (first (.getSrcPads target-elem))]
(.link source-elem target-elem)
(.connect source-elem
(fx/fi org.freedesktop.gstreamer.Element$PAD_ADDED
[element pad]
(when (= (-> target :appcap :media) (-> pad .getNegotiatedCaps (.getStructure 0) .getName))
(.link source-elem target-elem))))))
(defn link-by-target-pad [source target]
(let [source-elem (:element source)
target-elem (:element target)
source-pad (first (.getSrcPads source-elem))
target-pad (.getRequestPad target-elem (-> source :appcap :padname))]
(.link source-pad target-pad)))
(defn link-generic [source target]
(let [source-elem (:element source)
target-elem (:element target)]
(.connect source-elem
(fx/fi org.freedesktop.gstreamer.Element$PAD_ADDED
[element pad]
(.link source-elem target-elem)))
(.link source-elem target-elem)))
<<link-by-media-type>> <<link-by-target-pad>> <<link-generic>> (defn conditional-linker [source target] (cond (and (contains? target :appcap) (contains? (:appcap target) :media)) (link-by-media-type source target) (and (contains? source :appcap) (contains? (:appcap source) :padname)) (link-by-target-pad source target) :else (link-generic source target)))
<<pipebuilder-linker-conditional>>
<<pipebuilder-linker-extractor>>
(defn linker [targets]
(let [relations (relation-extractor targets)]
;;(debug "Relations:")
;;(pprint/pprint relations)
(doseq [relation relations]
(conditional-linker (:source relation) (:target relation)))))
6 Complete namespace definition
(ns videocapture.pipeparser (:require [clojure.zip :as z] [clojure.pprint :as pprint] [clojure.string :as str] [clojure.pprint :refer [pprint]] [videocapture.settings :as settings] [clojure.core.async :as async :refer [go <! >!]] [taoensso.timbre :as timbre :refer [log trace debug info warn error fatal report logf tracef debugf infof warnf errorf fatalf reportf spy get-env]] [clojurefx.clojurefx :as fx] [clojure.spec.alpha :as s]) (:import (org.freedesktop.gstreamer Gst Caps Element ElementFactory Pipeline) (clojure.lang Atom))) <<types>> <<helper-fns>> <<pipebuilder>> <<pipebuilder-appcaps>> <<pipebuilder-linker>> <<main-pipebuilder>>