videocapture.pipeparser - Videocapture
Table of Contents
1 Namespace videocapture.pipeparser
2 Types
(s/def ::type string?) (s/def ::name string?) (s/def ::input (s/or :single string? :multi (s/coll-of string?))) (s/def ::caps string?) (s/def :gstreamer/element (partial instance? org.freedesktop.gstreamer.Element)) (s/def :gstreamer/pipeline (partial instance? org.freedesktop.gstreamer.Pipeline)) (s/def ::settings any?) ;;<<appcap-specs()>> <<type-from-table(table=appcaptable,type='())>> (s/def ::appcap (s/keys :opt-un [:appcap/device :appcap/storage-valve :appcap/encoder :appcap/endpoint :appcap/source :appcap/media])) (s/def ::specials (s/map-of keyword? any?)) (s/def ::appcap-value (s/tuple (s/or :keyword keyword? :string string? :boolean boolean?) :gstreamer/element)) (s/def ::appcaps (s/map-of keyword? (s/coll-of ::appcap-value))) (s/def ::pipe-element (s/keys :req-un [::type ::name] :opt-un [::input ::caps :gstreamer/element ::settings ::appcap])) (s/def ::pipe-elements (s/coll-of ::pipe-element))
3 Main pipeline building process
(defn build-pipeline [pipename] {:pre [(keyword? pipename)] :post [(s/valid? (s/tuple :gstreamer/pipeline ::appcaps) %)]} (Gst/init "VideoCapture" (into-array String [])) (debug "Building pipeline" pipename "...") (trace (settings/get-setting :C :pipelines pipename)) (let [specials (atom {}) nodes (->> (settings/get-setting :C :pipelines pipename) (map apply-node) (map apply-settings) (map apply-caps) (map #(apply-appcap % specials))) pipe (new Pipeline)] (.addMany pipe (into-array Element (map :element nodes))) (debug (pr-str @specials)) (debug "Linking pipeline...") (linker nodes) (.connect (.getBus pipe) (fx/fi org.freedesktop.gstreamer.Bus$WARNING [bus msg] (warn (.getStructure msg)))) (.connect (.getBus pipe) (fx/fi org.freedesktop.gstreamer.Bus$ERROR [bus msg] (error (.getStructure msg)))) ;(.connect (.getBus pipe) (fx/fi org.freedesktop.gstreamer.Bus$INFO ; [bus msg] (info (.getStructure msg)))) ;(.connect (.getBus pipe) (fx/fi org.freedesktop.gstreamer.Bus$MESSAGE ; [bus msg] (debug (.getStructure msg)))) (debug "Done building and linking pipeline.") (.debugToDotFile pipe 6 "build") [pipe @specials]))
4 Helper functions
(defn log-through [in] (debug "Log through:" (pr-str in)) in) (defn conj-vec "Helper function to add values to a vector inside a map." [map path value] ;;{:pre [(s/valid? ::appcaps map) (s/valid? (s/coll-of keyword?) path) (s/valid? ::appcap-value value)] ;; :post [#(s/valid? ::appcaps %)]} (let [path (if (coll? path) path [path])] (case (get-in map path) nil (assoc-in map path [value]) (assoc-in map path (conj (get-in map path) value)))))
5 Pipeline builder
(defn apply-node "Extracts the GStreamer node out of the videocapture map." [element] {:pre [(s/valid? ::pipe-element element)] :post [(s/valid? ::pipe-element %)]} (assoc element :element (ElementFactory/make (:type element) (:name element)))) (defn apply-settings "" [element] {:pre [(s/valid? ::pipe-element element)] :post [(s/valid? ::pipe-element %)]} (doseq [setting (:settings element)] (trace element) (if-not (nil? (namespace (key setting))) (.connect (:element element) (fx/fi org.freedesktop.gstreamer.Element$PAD_ADDED [elem pad] (when (= (namespace (key setting)) (.getName pad)) (.set pad (name (key setting)) (val setting))))) (.set (:element element) (name (key setting)) (val setting))) ) element) (defn apply-caps "Applies the GStreamer caps from the EDN file to the actual GStreamer objects." [element] {:pre [(s/valid? ::pipe-element element)] :post [(s/valid? ::pipe-element %)]} (when (contains? element :caps) (.setCaps (:element element) (new Caps (:caps element)))) element)
The following appcap specs are defined:
Appcap | Allowed values | Description | Generate? |
---|---|---|---|
:appcap/device | #{:display} | The device this element's output is to be displayed on | x |
:appcap/storage-valve | #{true false} | This element is a valve in front of an encoder, and gets opened when the recording is to be started. | x |
:appcap/encoder | #{true false} | This element is an encoder. | x |
:appcap/endpoint | #{true false} | This element is the pipeline endpoint (e.g. a video muxer) | x |
:appcap/source | #{:device :rtsp} | This is a source that has to be connected to an outside video source; a file (like /dev/video) in case of :device, or a network stream in case of :rtsp. | x |
:appcap/media | #{"audio/x-raw" "video/x-raw"} | The media type this element accepts. Used for dynamic linking. | x |
:appcap/padname | string? | The pad name this element's buffers should be fed into. | x |
REMOVE THIS CODE BLOCK
(defn apply-appcap [element specials] {:pre [(s/valid? ::pipe-element element) (s/valid? ::appcaps @specials)] :post [(s/valid? ::pipe-element %)]} (trace (:name element) "appcap:" (:appcap element)) (when (contains? element :appcap) (trace (:name element) "contains appcap:" (pr-str (:appcap element))) (doseq [appcap (:appcap element)] (swap! specials conj-vec [(key appcap)] [(val appcap) (:element element)]))) element)
In the next and last step, the pipeline segments get linked. This is a three-step process:
- extracting the source pads of all elements;
- creating a hashmap with source-to-target relations;
- the actual linking.
(defn relation-extractor [targets] (flatten (for [target targets :when (contains? target :input) :let [out-node (:element target) inputs (into #{} (if (coll? (:input target)) (:input target) (str/split (:input target) #" "))) in-elems (filter #(inputs (:name %)) targets)]] (do (reduce #(conj %1 {:source %2 :target target}) [] in-elems)))))
(defn link-by-media-type [source target] (let [source-elem (:element source) target-elem (:element target) media (-> target :appcap :media) target-pad (first (.getSrcPads target-elem))] (.link source-elem target-elem) (.connect source-elem (fx/fi org.freedesktop.gstreamer.Element$PAD_ADDED [element pad] (when (= (-> target :appcap :media) (-> pad .getNegotiatedCaps (.getStructure 0) .getName)) (.link source-elem target-elem))))))
(defn link-by-target-pad [source target] (let [source-elem (:element source) target-elem (:element target) source-pad (first (.getSrcPads source-elem)) target-pad (.getRequestPad target-elem (-> source :appcap :padname))] (.link source-pad target-pad)))
(defn link-generic [source target] (let [source-elem (:element source) target-elem (:element target)] (.connect source-elem (fx/fi org.freedesktop.gstreamer.Element$PAD_ADDED [element pad] (.link source-elem target-elem))) (.link source-elem target-elem)))
<<link-by-media-type>> <<link-by-target-pad>> <<link-generic>> (defn conditional-linker [source target] (cond (and (contains? target :appcap) (contains? (:appcap target) :media)) (link-by-media-type source target) (and (contains? source :appcap) (contains? (:appcap source) :padname)) (link-by-target-pad source target) :else (link-generic source target)))
<<pipebuilder-linker-conditional>> <<pipebuilder-linker-extractor>> (defn linker [targets] (let [relations (relation-extractor targets)] ;;(debug "Relations:") ;;(pprint/pprint relations) (doseq [relation relations] (conditional-linker (:source relation) (:target relation)))))
6 Complete namespace definition
(ns videocapture.pipeparser (:require [clojure.zip :as z] [clojure.pprint :as pprint] [clojure.string :as str] [clojure.pprint :refer [pprint]] [videocapture.settings :as settings] [clojure.core.async :as async :refer [go <! >!]] [taoensso.timbre :as timbre :refer [log trace debug info warn error fatal report logf tracef debugf infof warnf errorf fatalf reportf spy get-env]] [clojurefx.clojurefx :as fx] [clojure.spec.alpha :as s]) (:import (org.freedesktop.gstreamer Gst Caps Element ElementFactory Pipeline) (clojure.lang Atom))) <<types>> <<helper-fns>> <<pipebuilder>> <<pipebuilder-appcaps>> <<pipebuilder-linker>> <<main-pipebuilder>>