videocapture.pipeparser - Videocapture

Table of Contents

1 Namespace videocapture.pipeparser

2 Types

(s/def ::type string?)
(s/def ::name string?)
(s/def ::input (s/or :single string? :multi (s/coll-of string?)))
(s/def ::caps string?)
(s/def :gstreamer/element (partial instance? org.freedesktop.gstreamer.Element))
(s/def :gstreamer/pipeline (partial instance? org.freedesktop.gstreamer.Pipeline))
(s/def ::settings any?)

;;<<appcap-specs()>>
<<type-from-table(table=appcaptable,type='())>>

(s/def ::appcap (s/keys :opt-un [:appcap/device :appcap/storage-valve :appcap/encoder :appcap/endpoint :appcap/source :appcap/media]))
(s/def ::specials (s/map-of keyword? any?))
(s/def ::appcap-value (s/tuple (s/or :keyword keyword?
				     :string string?
				     :boolean boolean?) :gstreamer/element))
(s/def ::appcaps (s/map-of keyword? (s/coll-of ::appcap-value)))

(s/def ::pipe-element (s/keys :req-un [::type ::name]
			      :opt-un [::input ::caps :gstreamer/element ::settings ::appcap]))
(s/def ::pipe-elements (s/coll-of ::pipe-element))

3 Main pipeline building process

(defn build-pipeline [pipename]
  {:pre [(keyword? pipename)]
   :post [(s/valid? (s/tuple :gstreamer/pipeline ::appcaps) %)]}
  (Gst/init "VideoCapture" (into-array String []))
  (debug "Building pipeline" pipename "...")
  (trace (settings/get-setting :C :pipelines pipename))
  (let [specials (atom {})
	nodes (->> (settings/get-setting :C :pipelines pipename)
		   (map apply-node)
		   (map apply-settings)
		   (map apply-caps)
		   (map #(apply-appcap % specials)))
	pipe (new Pipeline)]
    (.addMany pipe (into-array Element (map :element nodes)))
    (debug (pr-str @specials))
    (debug "Linking pipeline...")
    (linker nodes)
    (.connect (.getBus pipe) (fx/fi org.freedesktop.gstreamer.Bus$WARNING
				    [bus msg] (warn (.getStructure msg))))
    (.connect (.getBus pipe) (fx/fi org.freedesktop.gstreamer.Bus$ERROR
				    [bus msg] (error (.getStructure msg))))
    ;(.connect (.getBus pipe) (fx/fi org.freedesktop.gstreamer.Bus$INFO
    ;                                [bus msg] (info (.getStructure msg))))
    ;(.connect (.getBus pipe) (fx/fi org.freedesktop.gstreamer.Bus$MESSAGE
    ;                                [bus msg] (debug (.getStructure msg))))
    (debug "Done building and linking pipeline.")
    (.debugToDotFile pipe 6 "build")
    [pipe @specials]))

4 Helper functions

(defn log-through [in] (debug "Log through:" (pr-str in)) in)

(defn conj-vec "Helper function to add values to a vector inside a map." [map path value]
  ;;{:pre [(s/valid? ::appcaps map) (s/valid? (s/coll-of keyword?) path) (s/valid? ::appcap-value value)]
  ;; :post [#(s/valid? ::appcaps %)]}
  (let [path (if (coll? path) path [path])]
    (case (get-in map path)
      nil (assoc-in map path [value])
      (assoc-in map path (conj (get-in map path) value)))))

5 Pipeline builder

(defn apply-node "Extracts the GStreamer node out of the videocapture map." [element]
  {:pre [(s/valid? ::pipe-element element)]
   :post [(s/valid? ::pipe-element %)]}
  (assoc element :element (ElementFactory/make (:type element) (:name element))))

(defn apply-settings "" [element]
  {:pre [(s/valid? ::pipe-element element)]
   :post [(s/valid? ::pipe-element %)]}
  (doseq [setting (:settings element)]
    (trace element)
    (if-not (nil? (namespace (key setting)))
      (.connect (:element element)
		(fx/fi org.freedesktop.gstreamer.Element$PAD_ADDED
		       [elem pad]
		       (when (= (namespace (key setting)) (.getName pad))
			 (.set pad (name (key setting)) (val setting)))))
      (.set (:element element) (name (key setting)) (val setting)))
    )
  element)

(defn apply-caps "Applies the GStreamer caps from the EDN file to the actual GStreamer objects." [element]
  {:pre [(s/valid? ::pipe-element element)]
   :post [(s/valid? ::pipe-element %)]}
  (when (contains? element :caps)
    (.setCaps (:element element) (new Caps (:caps element))))
  element)

The following appcap specs are defined:

Appcap Allowed values Description Generate?
:appcap/device #{:display} The device this element's output is to be displayed on x
:appcap/storage-valve #{true false} This element is a valve in front of an encoder, and gets opened when the recording is to be started. x
:appcap/encoder #{true false} This element is an encoder. x
:appcap/endpoint #{true false} This element is the pipeline endpoint (e.g. a video muxer) x
:appcap/source #{:device :rtsp} This is a source that has to be connected to an outside video source; a file (like /dev/video) in case of :device, or a network stream in case of :rtsp. x
:appcap/media #{"audio/x-raw" "video/x-raw"} The media type this element accepts. Used for dynamic linking. x
:appcap/padname string? The pad name this element's buffers should be fed into. x

REMOVE THIS CODE BLOCK

(defn apply-appcap [element specials]
  {:pre [(s/valid? ::pipe-element element) (s/valid? ::appcaps @specials)]
   :post [(s/valid? ::pipe-element %)]}
  (trace (:name element) "appcap:" (:appcap element))
  (when (contains? element :appcap)
    (trace (:name element) "contains appcap:" (pr-str (:appcap element)))
    (doseq [appcap (:appcap element)]
      (swap! specials conj-vec [(key appcap)] [(val appcap) (:element element)])))
  element)

In the next and last step, the pipeline segments get linked. This is a three-step process:

  • extracting the source pads of all elements;
  • creating a hashmap with source-to-target relations;
  • the actual linking.
(defn relation-extractor [targets]
  (flatten
   (for [target targets
	 :when (contains? target :input)
	 :let [out-node (:element target)
	       inputs (into #{} (if (coll? (:input target)) (:input target) (str/split (:input target) #" ")))
	       in-elems (filter #(inputs (:name %)) targets)]]
     (do
       (reduce #(conj %1 {:source %2 :target target}) [] in-elems)))))
(defn link-by-media-type [source target]
  (let [source-elem (:element source)
	target-elem (:element target)
	media (-> target :appcap :media)
	target-pad (first (.getSrcPads target-elem))]
    (.link source-elem target-elem)
    (.connect source-elem
	      (fx/fi org.freedesktop.gstreamer.Element$PAD_ADDED
		     [element pad]
		     (when (= (-> target :appcap :media) (-> pad .getNegotiatedCaps (.getStructure 0) .getName))
		       (.link source-elem target-elem))))))
(defn link-by-target-pad [source target]
  (let [source-elem (:element source)
	target-elem (:element target)
	source-pad (first (.getSrcPads source-elem))
	target-pad (.getRequestPad target-elem (-> source :appcap :padname))]
    (.link source-pad target-pad)))
(defn link-generic [source target]
  (let [source-elem (:element source)
	target-elem (:element target)]
    (.connect source-elem
	      (fx/fi org.freedesktop.gstreamer.Element$PAD_ADDED
		     [element pad]
		     (.link source-elem target-elem)))
    (.link source-elem target-elem)))
<<link-by-media-type>>
<<link-by-target-pad>>
<<link-generic>>

(defn conditional-linker [source target]
  (cond (and (contains? target :appcap)
	     (contains? (:appcap target) :media))
	(link-by-media-type source target)

	(and (contains? source :appcap)
	     (contains? (:appcap source) :padname))
	(link-by-target-pad source target)

	:else
	(link-generic source target)))
<<pipebuilder-linker-conditional>>
<<pipebuilder-linker-extractor>>

(defn linker [targets]
  (let [relations (relation-extractor targets)]
    ;;(debug "Relations:")
    ;;(pprint/pprint relations)
    (doseq [relation relations]
      (conditional-linker (:source relation) (:target relation)))))

6 Complete namespace definition

(ns videocapture.pipeparser
  (:require [clojure.zip :as z]
	    [clojure.pprint :as pprint]
	    [clojure.string :as str]
	    [clojure.pprint :refer [pprint]]
	    [videocapture.settings :as settings]
	    [clojure.core.async :as async :refer [go <! >!]]
	    [taoensso.timbre :as timbre
	     :refer [log trace debug info warn error fatal report
		     logf tracef debugf infof warnf errorf fatalf reportf
		     spy get-env]]
	    [clojurefx.clojurefx :as fx]
	    [clojure.spec.alpha :as s])
  (:import (org.freedesktop.gstreamer Gst
				      Caps
				      Element
				      ElementFactory
				      Pipeline)
	   (clojure.lang Atom)))

<<types>>

<<helper-fns>>

<<pipebuilder>>
<<pipebuilder-appcaps>>
<<pipebuilder-linker>>

<<main-pipebuilder>>

Author: Daniel Ziltener

Created: 2019-06-21 Fr 16:22

Validate