#! /bin/env tclsh
package require Thread
package require {ycl proc}
namespace import [yclprefix]::proc::checkargs
variable doc::tplex {
description {
Connect the outputs of one channel to the inputs of zero or more
channels.
Returns a list containing a new channel and a thread id. The new
channel has all the attributes of $chan . The new channel is a pipe
whose other end is in a separate thread which buffers the output of
$chan , allowing the new channel to act as a synchronous channel , with
the asynchronous event queue activity confined to the separate thread .
The buffer uses an additive-increase/multiplicative-decrease strategy
to avoid overflowing memory, and also to avoid congestion in
multiplexing scenarios.
By default, $chan is closed when [eof $chan] is true, and the close
information stored in
$closestatus
$closeres
$closeopts
To use tplex as a sink, set $background to true and don't provide $to.
}
args {
chan {
description {
The channel to accumulate data from .
}
positional true
}
background {
description {
If true, don't return a read channel.
}
default {lindex false}
}
buffsize {
description {
The size of the write buffer .
}
default {lindex 8192}
}
connect {
description {
What sort of connection to establish to the tplex instance.
"background" means no connection, "read" means a channel from
which the multiplexed data can be read, and "wait" means a
channel that becomes readable and reaches EOF when the tplex
instance source channel is closed.
}
validate {$connect in {background read wait}}
default {lindex read}
}
eofaction {
description {
What to do when [eof $chan] is true.
}
validate {$eofaction in {close detach nothing}}
default {lindex close}
}
init {
description {
A command prefix to execute to initialize the thread .
$chan and a list of output channels are appended to the prefix .
}
default {lindex [list apply {{chan to} {}}]}
}
persist {
description {
Don't release the thread once all the channels are closed.
}
default {lindex false}
}
paused {
description {
Create the buffer in a paused state. The buffer can be started
by executing [run] in the buffer's thread.
}
default {lindex false}
}
readsize {
description {
The number of characters to read at a time
}
default {lindex 8192}
}
to {
description {
A list of channels to multiplex the data from $chan to , rather
than creating and returning a new proxy chan .
}
default {lindex {}}
}
}
implementation {
Currently , channels are closed on EOF because there isn't any way to
signal an EOF on a pipe other than closing it . If that ever changes,
modify this code to take advantabe of that .
}
}
proc tplex {chan args} {
variable state
set donechan {}
checkargs [set doc::[lindex [info level 0] 0]] {*}$args
set tid [thread::create]
switch $connect {
background {
set returnchan {}
}
default {
lassign [chan pipe] pr pw
chan configure $pr {*}[chan configure $chan]
chan configure $pr -blocking 1
chan configure $chan -translation binary
switch $connect {
read {
chan configure $pw -blocking 0
lappend to $pw
}
wait {
set donechan $pw
}
}
set returnchan $pr
}
}
chan configure $chan -blocking 0
foreach toname $to {
thread::transfer $tid $toname
}
thread::transfer $tid $chan
thread::send $tid [list {*}$init $chan $to]
thread::send $tid [list variable buffsize $buffsize]
thread::send $tid [list variable donechan $donechan]
thread::send $tid [list variable eofaction $eofaction]
thread::send $tid [list variable persist $persist]
thread::send $tid [list variable readsize $readsize]
thread::send $tid [list variable to $to]
thread::send $tid [list apply [list chan {
variable wait 10
set [namespace current]::chan $chan
proc run {} {
variable chan
chan event $chan readable [list apply [list chan {
variable buffsize
variable closeres
variable eofaction
variable closeopts
variable closestatus
variable done 0
variable readsize
variable to
variable wait
if {[eof $chan]} {
switch $eofaction {
close {
set closestatus [catch {close $chan} closeres closeopts]
}
detach {
thread::detach $chan
}
nothing {
}
}
# Blocking channel caused a deadlock for some reason.
# Maybe a bug. See
#chan configure $pw -blocking 1
#chan close $pw
# Todo: is the blocking close above still causing deadlock in
# newer versions of Tcl?
foreach to1 $to {
if {$to1 in {stdout stderr}} {
flush $to1
continue
}
chan event $to1 writable [list apply [list to1 {
variable donechan
variable persist
variable to
if {[chan pending output $to1]} {
flush $to1
} else {
# Todo: is there any way yet in Tcl to signal an EOF on a pipe
# short of closing the pipe ?
chan configure $to1 -blocking 1
close $to1
foreach to2 $to {
if {$to2 in [chan names] && $to2 ni [
list $donechan stdout stderr]} {
return
}
}
if {$donechan ne {}} {
close $donechan
}
if {$persist} {
set [namespace current]::done 1
} else {
thread::release
}
}
} [namespace current]] $to1]
}
} else {
foreach to1 $to {
if {[chan pending output $to1] > $buffsize} {
after $wait
set wait [expr {$wait + max(entier($wait * .20), 1)}]
return
}
}
set data [read $chan $readsize]
if {$data ne {}} {
foreach to1 $to {
puts -nonewline $to1 $data[set data {}]
}
}
if {$wait > 10} {
set wait [expr {entier($wait / 2)}]
if {$wait < 10} {
set wait 10
}
}
}
} [namespace current]] $chan]
}
proc subscribe chan {
variable to
if {$chan ni $to} {
lappend chan $to
}
}
}] $chan]
if {!$paused} {
thread::send -async $tid run
}
return [list $returnchan $tid]
}