#! /usr/bin/env tclsh
package require Tcl 8.6
package require Thread
namespace eval ::pipethread {}
# Default command to clone an interpreter into a new thread
proc ::pipethread::_cloneInterp {thread {namespace "::"}} {
# Load all currently loaded packages into the new thread
if {$namespace eq "::"} {
thread::send $thread [list set auto_path $::auto_path]
foreach package [package names] {
set present 0
catch {
package present $package
set present 1
}
if {!$present} {
continue
}
thread::send $thread [list package require $package]
}
}
# Create the namespace we are operating on
thread::send $thread [list namespace eval $namespace ""]
# Create a valid namespace prefix string
set namespacePrefix $namespace
if {$namespacePrefix ne "::"} {
append namespacePrefix "::"
}
# Copy all namespace variables into the new thread
foreach var [info vars ${namespacePrefix}*] {
# Skip variables explicitly marked as noclone
if {[string match "*_NOCLONE" $var]} {
continue
}
if {[array exists $var]} {
thread::send $thread [list array set $var [array get $var]]
} elseif {[info exists $var]} {
thread::send $thread [list set $var [set $var]]
}
}
if {$namespace eq "::"} {
# Copy all local variables into the new thread as global variables
# This may not be the best approach -- but it has to go somewhere...
if {[uplevel 2 {info level}] != 0} {
foreach var [uplevel 2 {info vars}] {
if {[uplevel 2 [list array exists $var]]} {
thread::send $thread [list array set $var [uplevel 2 [list array get $var]]]
} else {
thread::send $thread [list set $var [uplevel 2 [list set $var]]]
}
}
}
}
# Copy all procedures into the new thread
foreach proc [info proc ${namespacePrefix}*] {
## Skip imported procs, hopefully they'll get re-imported
if {[namespace origin $proc] ne [namespace which $proc]} {
continue
}
set procArgs [list]
foreach arg [info args $proc] {
if {[info default $proc $arg argDefault]} {
lappend procArgs [list $arg $argDefault]
} else {
lappend procArgs $arg
}
}
thread::send $thread [list proc $proc $procArgs [info body $proc]]
}
# For every child namespace of the current namespace, repeat this whole process
foreach childNamespace [namespace children $namespace] {
_cloneInterp $thread $childNamespace
}
return
}
proc ::pipethread::_traceInterp {thread} {
thread::send $thread [list package require Ttrace]
}
# This invokes the closure responsible for cleaning up a thread that has been terminated
# Arguments:
# count -- The number of threads that have been created (also the number of times
# this closure will be called)
# vars -- A dictionary containing the variables to set in the closure
# closure -- The actual lambda itself, it will be [apply]'d
# _watchVar -- The name of the variable being traced... this is not
# passed through to the closure but used for internal
# purposes
# thread -- The thread that has been terminated
# _watchOp -- The operation that occured -- this is an implementation detail
proc ::pipethread::_invokeClosure {count vars closure _watchVar thread _watchOp} {
# Ignore writes to our internal bookkeeping values, they're not
# relevant to the closure which only [apply]'d when a thread terminates
if {$thread == "threadToChan" || $thread == "closureCallCount"} {
return
}
# Determine how many times this closure has been called
incr ${_watchVar}(closureCallCount)
set closureCallCount [set ${_watchVar}(closureCallCount)]
# If this call is the final call, set the appropriate variable
# to pass to the lambda
if {$closureCallCount == $count} {
lappend vars finalInvocation true
} else {
lappend vars finalInvocation false
}
# Set additional variables the lambda expects
lappend vars thread $thread
if {![dict exists $vars threadToChan]} {
lappend vars threadToChan [set ${_watchVar}(threadToChan)]
}
# Invoke the lambda
apply $closure $vars
}
# Debugging log procedure
# Replace as needed
proc ::pipethread::_debugLog err { return; puts stderr "$err" }
# Starts a set of "coprocessing" pipes, similar to UNIX shell
#
# Each block will execute in its own thread with two additional
# channels: inchan and outchan
#
# These channels represent pipes coming from the previous block (inchan)
# or going to the next block (outchan)
#
# Blocks may either be command prefixes or a block of commands
#
# If a block is a command prefix then two values are appended to it before
# being invoked: inchan outchan
#
# If a block is a block of commands then two variables are set: inchan outchan
# before the block is started.
#
# Examples of both:
# 1. Command prefixes:
# pipethread::pipe firstBlock | secondBlock | thirdBlock abcdef
#
# This creates three threads and in each thread invokes:
# a. firstBlock $inchan $outchan
# b. secondBlock $inchan $outchan
# c. thirdBlock abcdef $inchan $outchan
#
# 2. Block of commands:
# pipethread::pipe { puts $outchan ok } | { gets $inchan line; puts $line }
#
# This creates two threads and in each thread invokes:
# a. set inchan <inchan>; set outchan <outchan>; puts $outchan ok
# b. set inchan <inchan>; set outchan <outchan>; gets $inchan line; puts $line
#
# You may use both types together.
#
# Optional arguments:
# -cloneInterpCommand <commandPrefix>
# Specify the command to run to clone the interpreter into each
# new thread. If specified as the empty string then new
# interpreters will have no initialization done on them.
#
# This is a command prefix and the thread ID is added to it.
#
# There are some magic values for this command prefix:
# _DEFAULT: Use the default command prefix
# _TTRACE: Use the Ttrace mechanism
#
# -inchan <chan>
# Specify a channel to be used as the input channel to the
# first block. If this is not specified then the first block
# will get "" as its input channel.
#
# -outchan <chan>
# Specify a channel to be used as the output channel from the
# final block. If this is not specified then the final block
# will get either "" or a pipe channel as its output channel.
# The default is to create a pipe channel and anything written
# to it will be [return]'d by pipechan::pipe, however if -async
# mode is used (below) then this channel will not be created and
# the final block will get "" as its output channel unless this
# option is used.
#
# -async <cleanup>
# Specify that:
# 1. pipethread::pipe should not wait in the event loop (by
# calling [vwait]); and
# 2. pipethread::pipe should return immediately after starting
# the threads and connecting their pipes, the caller is
# then responsible for waiting in the event loop so that
# thread termination is handled.
# When all the blocks terminate (and the final block may not always
# by the last block to terminate) the script <cleanup> is invoked
# in the global scope
#
# --
# Signals the end of arguments
proc ::pipethread::pipe args {
# Create a unique handle for this series of blocks and pipes
incr [namespace current]::handles
set handle [set [namespace current]::handles]
# Special values for the -cloneInterpCommand option
set cloneInterpCommandMagic(_DEFAULT) [namespace current]::_cloneInterp
set cloneInterpCommandMagic(_TTRACE) [namespace current]::_traceInterp
# Process commandline arguments
## Set default values
set cloneInterpCommand _DEFAULT
set initialThreadReader ""
set finalThreadWriter ""
set operateAsync false
## Collect arguments
for {set idx 0} {$idx < [llength $args]} {incr idx} {
set arg [lindex $args $idx]
switch -glob -- $arg {
"-cloneInterpCommand" {
incr idx
set cloneInterpCommand [lindex $args $idx]
}
"-inchan" {
incr idx
set initialThreadReader [lindex $args $idx]
}
"-outchan" {
incr idx
set finalThreadWriter [lindex $args $idx]
}
"-async" {
set operateAsync true
incr idx
set onCompleteCleanup [lindex $args $idx]
}
"--" {
incr idx
break
}
"-*" {
return -code error "Invalid option: $arg, must be one of: -cloneInterpCommand, -inchan, -outchan, -async, or --"
}
default {
break
}
}
}
## Update the list of arguments to remove the ones we consumed
set args [lrange $args $idx end]
# Replace the cloneInterpCommand magic values with actual ones if appropriate
if {[info exists cloneInterpCommandMagic($cloneInterpCommand)]} {
set cloneInterpCommand $cloneInterpCommandMagic($cloneInterpCommand)
}
# If we are asked to read and write from the same channel (for example
# a socket) and we have more than one block, we need to create two
# channels from it since we need to read from the channel in the first block
# and write to it in the final block but we cannot give the channel to two
# different threads
#
# We create 2 new pipe-pairs and a thread to bridge them and we copy between
# them in the event loop.
if {$initialThreadReader eq $finalThreadWriter && $finalThreadWriter ne "" && [lsearch -exact $args "|"] != -1} {
set copyThreadChannel $initialThreadReader
set copyThread [thread::create]
thread::transfer $copyThread $copyThreadChannel
thread::send $copyThread [list set copyThreadChannel $copyThreadChannel]
set copyPipe [thread::send $copyThread {
set readerPipe [chan pipe]
set writerPipe [chan pipe]
set readerPipeReader [lindex $readerPipe 0]
set readerPipeWriter [lindex $readerPipe 1]
set writerPipeReader [lindex $writerPipe 0]
set writerPipeWriter [lindex $writerPipe 1]
foreach fd [list $readerPipeReader $readerPipeWriter $writerPipeReader $writerPipeWriter] {
fconfigure $fd -translation binary
}
thread::detach $writerPipeReader
thread::detach $readerPipeWriter
proc copyComplete {var args} {
incr ::copyComplete
set sock [set ::$var]
unset ::$var
close $sock
}
proc terminate {} {
if {[info exists ::writerPipeWriter]} {
close $::writerPipeWriter
}
if {[info exists ::readerPipeReader]} {
close $::readerPipeReader
}
thread::detach $::copyThreadChannel
}
fcopy $copyThreadChannel $writerPipeWriter -command [list copyComplete writerPipeWriter]
fcopy $readerPipeReader $copyThreadChannel -command [list copyComplete readerPipeReader]
list $writerPipeReader $readerPipeWriter
}]
set copyPipeReader [lindex $copyPipe 0]
set copyPipeWriter [lindex $copyPipe 1]
thread::attach $copyPipeReader
thread::attach $copyPipeWriter
set initialThreadReader $copyPipeReader
set finalThreadWriter $copyPipeWriter
}
# Process the list of blocks
set blockThreadList [list]
set toRun [list]
set finalBlock false
for {set idx 0} {$idx < [llength $args]} {incr idx} {
set start $idx
set end [lsearch -start $start -exact $args "|"]
if {$end == -1} {
set finalBlock true
set end [llength $args]
}
# Determine the command to run, it is between the two pipes.
set command [lrange $args $start $end-1]
# Determine if the block is a command prefix or a block of code
#
# This is not easy to do authoritatively, we are just faking it
# we determine that it is a block of code if:
# 1. It is a single list element long
# 2. The string representation begins with the list markers '{' and '}'
# This can fail if you use a command prefix that has special characters in it
# and accepts no additional parameters. You can workaround it by supplying a
# dummy parameter.
set anonymous false
if {[llength $command] == 1 && [string index $command 0] == "\{" && [string index $command end] == "\}"} {
set anonymous true
set command [lindex $command 0]
} else {
switch -- [lindex $command 0] {
"exec" {
set command [join [list \
[list set command $command] \
{if {$inchan ne ""} { lappend command <@ $inchan } } \
{if {$outchan ne ""} { lappend command >@ $outchan } } \
{eval $command} \
] "\n"]
set command [list apply [list {inchan outchan} $command]]
}
"foreach" {
set lineVar [lindex $command 1]
set commandBlock [lindex $command 2]
set command {
uplevel #0 [list set inchan $inchan]
uplevel #0 [list set outchan $outchan]
uplevel #0 {
while true {
gets $inchan @@line@@
if {[eof $inchan] && $@@line@@ eq ""} {
break
}
@@command@@
}
}
}
set command [string map [list @@line@@ $lineVar @@command@@ $commandBlock] $command]
set command [list apply [list {inchan outchan} $command]]
}
"lmap" {
set lineVar [lindex $command 1]
set commandBlock [lindex $command 2]
set command {
uplevel #0 [list set inchan $inchan]
uplevel #0 [list set outchan $outchan]
uplevel #0 {
while true {
gets $inchan @@line@@
if {[eof $inchan] && $@@line@@ eq ""} {
break
}
foreach ::pipethread::internal_line_NOCLONE [@@command@@] { puts $outchan $::pipethread::internal_line_NOCLONE }
}
}
}
set command [string map [list @@line@@ $lineVar @@command@@ $commandBlock] $command]
set command [list apply [list {inchan outchan} $command]]
}
}
}
# Create the pipe to link the output of this thread/block
# to the input of the next thread/block
if {!$finalBlock || ($finalThreadWriter eq "" && !$operateAsync)} {
## For most blocks, a standard pipe is used
## Also for the final block a pipe is created that is
## tied to a callback to read into a variable unless
## an output channel is given with -outchan
##
## If we are operating asyncronously then we do
## not need to do this for the final block
## even if no -outchan was specified since
## we do not read from it, since we cannot
## return it anywhere
set pipe [chan pipe]
} else {
set pipe [list "" $finalThreadWriter]
}
set pipeReader [lindex $pipe 0]
set pipeWriter [lindex $pipe 1]
# If this block needs a new thread, create a new thread and
# park it in the event loop
set thread [thread::create]
# If this is the first block, remember that so we
# know its threadID for later use
if {$start == 0} {
set initialThread $thread
}
# Just keep track of all this information so we can
# start executing code in the threads AFTER we have
# created all the pipes
lappend toRun [list thread $thread anonymous $anonymous command $command pipeReader $pipeReader pipeWriter $pipeWriter]
lappend blockThreadList $thread
set idx $end
}
# Keep track of the threadID for the final block
set finalThread $thread
# If we are operating syncronously and we were not given a channel to
# write the final block to then we created a pipe -- monitor that pipe
# and accumulate its contents into our return buffer for this series
# of blocks.
if {$finalThreadWriter eq "" && !$operateAsync} {
set finalThreadReader $pipeReader
set returnValueVariable "[namespace current]::returnValue_$handle"
fconfigure $finalThreadReader -blocking 0 -translation binary
fileevent $finalThreadReader readable "append $returnValueVariable \[read $finalThreadReader\]"
}
# This is the name of the array that we will be using to get notified
# of thread termination either by [vwait]'ing it directly or by
# [trace add variable] invoking is asyncronously if -async is used
set threadResultVariable "[namespace current]::resultArray_$handle"
# Create the list of variables and their values to pass to our thread
# cleanup closure
set cleanupClosureVars [list]
foreach var [list handle operateAsync initialThread finalThread initialThreadReader finalThreadWriter copyThread copyThreadChannel onCompleteCleanup threadResultVariable] {
if {[info exists $var]} {
lappend cleanupClosureVars $var [set $var]
}
}
# Our thread cleanup closure
#
# This is invoked whenever a thread terminates, local variables are passed in using a dict (_vars),
# each key is the name of a local variable and the value is the value of that variable.
set cleanupClosure {_vars {
foreach {_var _val} $_vars {
set $_var $_val
}
_debugLog "\[$thread\]: <-EXIT-"
thread::send $thread [list foreach chan [dict get $threadToChan $thread] { catch { close $chan } }]
if {![info exists copyThread]} {
if {$thread eq $initialThread} {
if {$initialThreadReader ni [list "" stdin $finalThreadWriter]} {
thread::send $thread [list thread::detach $initialThreadReader]
thread::attach $initialThreadReader
}
}
if {$thread eq $finalThread} {
if {$finalThreadWriter ni [list "" stdout stderr]} {
thread::send $thread [list thread::detach $finalThreadWriter]
thread::attach $finalThreadWriter
}
}
}
_debugLog "\[$thread\]: <-RELEASE-"
thread::release -wait $thread
_debugLog "\[$thread\]: <-DONE-"
if {$finalInvocation} {
# Work around a bug in Tcl:
# http://core.tcl.tk/thread/tktview/84be1b5a7
if {$operateAsync} {
after idle [list unset -nocomplain $threadResultVariable]
} else {
unset -nocomplain $threadResultVariable
}
if {[info exists copyThread]} {
thread::send $copyThread [list terminate]
thread::release -wait $copyThread
thread::attach $copyThreadChannel
}
if {[info exists onCompleteCleanup]} {
uplevel #0 $onCompleteCleanup
}
}
} }
lappend cleanupClosure [namespace current]
# If we are operating asyncronously create the thread result variable and
# add a trace to it. We do this before invoking the blocks so that if they
# terminate we are notified of it even if they do so before we are finished
# requesting all of them start
if {$operateAsync} {
array set $threadResultVariable [list]
trace add variable $threadResultVariable [list write] [list [namespace current]::_invokeClosure [llength $toRun] $cleanupClosureVars $cleanupClosure]
}
# Start executing each block, from first to last
#
# We send the script to each thread asyncronously and configure
# it to notify us via a write to an array with the thread ID as
# the array key
for {set idx 0} {$idx < [llength $toRun]} {incr idx} {
set info [lindex $toRun $idx]
# Determine the input channel and output channel
## The input channel is the read side of the pipe
## from the previous block, if there was no previous
## block then the input channel is the -inchan value
## (blank by default)
if {$idx > 0} {
set inchan [dict get [lindex $toRun $idx-1] pipeReader]
} else {
set inchan $initialThreadReader
}
set outchan [dict get $info pipeWriter]
set thread [dict get $info thread]
set anonymous [dict get $info anonymous]
set command [dict get $info command]
# Transfer channels that this thread needs to use to that
# thread.
#
# Keep track of the channels which the thread needs to close
# when it terminates as well (threadToChan) because we need
# to explicitly do this to avoid background errors.
dict set ${threadResultVariable}(threadToChan) $thread [list]
if {$inchan ne ""} {
if {$inchan ne [list stdin]} {
thread::transfer $thread $inchan
if {$inchan ne $initialThreadReader || [info exists copyThread]} {
dict lappend ${threadResultVariable}(threadToChan) $thread $inchan
}
}
}
if {$outchan ne ""} {
if {$outchan ni [list stdout stderr $inchan]} {
thread::transfer $thread $outchan
if {$outchan ne $finalThreadWriter || [info exists copyThread]} {
dict lappend ${threadResultVariable}(threadToChan) $thread $outchan
}
}
}
# Clone the current interpreter into the thread interpreter as
# requested by the user.
if {$cloneInterpCommand ne ""} {
$cloneInterpCommand $thread
}
# Update the command to execute with the parameters we have determined added.
if {!$anonymous} {
lappend command $inchan $outchan
} else {
set command "set inchan \"$inchan\";set outchan \"$outchan\"\n$command"
}
# Give the thread a variable that lists all the other threads in the
thread::send $thread [list namespace eval ::pipethread [list set ::pipethread::blockThreadList $blockThreadList]]
# Store inchan and outchan in a namespace variable so that we can retrieve it
thread::send $thread [list namespace eval ::pipethread [list set ::pipethread::inchan_NOCLONE $inchan]]
thread::send $thread [list namespace eval ::pipethread [list set ::pipethread::outchan_NOCLONE $outchan]]
# Send the command to the thread interpreter
_debugLog "\[$thread\]: -EXEC-> [string map [list "\n" "\\n"] $command]"
thread::send -async $thread $command ${threadResultVariable}($thread)
}
# At this point, if we are operating asyncronously then we are done.
# The event loop will invoke our variable traces when the commands specified
# in the [thread::send] immediately above completes for each block and we
# will clean those threads then
if {$operateAsync} {
# Return something that could be meaningful, the list of
# threads involved in this group of blocks
return $blockThreadList
}
# If we are operating syncronously then we need to wait here for the result
# array to be written to...
if {!$operateAsync} {
# Add the complete list of channels for each thread to the closure
# variable list, since we know them all now
lappend cleanupClosureVars threadToChan [set ${threadResultVariable}(threadToChan)]
unset ${threadResultVariable}(threadToChan)
# Wait in the event loop for our variable to be written to.
# Wait exactly the same number of times as we created threads
# since each thread will set the variable when it terminates
for {set idx 0} {$idx < [llength $toRun]} {incr idx} {
_debugLog "\[---\]: WAITING"
vwait $threadResultVariable
# Determine the thread ID from the array key that has
# been set
set thread [lindex [array names $threadResultVariable] 0]
unset ${threadResultVariable}($thread)
# If this is the last block to complete, note it to pass
# to the lambda
if {$idx == ([llength $toRun] - 1)} {
set finalInvocation true
} else {
set finalInvocation false
}
# Construct a local copy the local variables dict
# for the lambda so we can update it with values
# that are specific to this invokation
set varsDict $cleanupClosureVars
lappend varsDict thread $thread
lappend varsDict finalInvocation $finalInvocation
# Call the lambda
apply $cleanupClosure $varsDict
}
# At this point all the threads have been terminated, including any
# final cleanup code
#
# If there is no output channel given to the last block that means
# a pipe was created and we should consume anything that remains in
# it and close our side (the other side is closed already, all threads
# are gone) and accumulate it further into our buffer and return
# that buffer
if {$finalThreadWriter eq ""} {
append $returnValueVariable [read $finalThreadReader]
close $finalThreadReader
set returnValue [set $returnValueVariable]
unset $returnValueVariable
return $returnValue
}
}
return
}
# Return the inchan/outchan
proc ::pipethread::inchan {} {
if {![info exists ::pipethread::inchan_NOCLONE]} {
return stdin
}
return $::pipethread::inchan_NOCLONE
}
proc ::pipethread::outchan {} {
if {![info exists ::pipethread::outchan_NOCLONE]} {
return stdout
}
return $::pipethread::outchan_NOCLONE
}
# Provide an infinite buffer that will read from inchan as it is readable and write to
# outchan as it is writable.
proc ::pipethread::infiniteBuffer {inchan outchan} {
fconfigure $inchan -blocking false -translation binary
fconfigure $outchan -blocking false -translation binary
set ::readComplete false
set ::copyComplete false
fileevent $inchan readable [list apply {{inchan outchan} {
set data [read -nonewline $inchan]
if {$data ne ""} {
append ::buffer $data
} else {
if {[eof $inchan]} {
fileevent $inchan readable {}
set ::readComplete true
}
}
if {[fileevent $outchan writable] eq ""} {
fileevent $outchan writable [list apply {{outchan} {
if {[info exists ::buffer]} {
puts -nonewline $outchan $::buffer
flush $outchan
unset ::buffer
}
fileevent $outchan writable {}
if {$::readComplete} {
set ::copyComplete true
}
}} $outchan]
}
}} $inchan $outchan]
vwait ::copyComplete
}
# The found thread may not exist or it may be a different process altogether
proc ::pipethread::getThreadId {relative} {
set threadIndex [lsearch -exact $::pipethread::blockThreadList [thread::id]]
set foundThread [lindex $::pipethread::blockThreadList [expr {$threadIndex+$relative}]]
return $foundThread
}
# XXX: This may go away or be replaced by something better -- its just an experiment
proc ::pipethread::putEof args {
if {[lindex $args 0] eq "-position"} {
set position [lindex $args 1]
set args [lrange $args 2 end]
} else {
set position 1
}
if {[llength $args] != 1} {
return -code error "wrong # args: should be putEof ?-position <position>? handle"
}
set targetThread [getThreadId $position]
set handle [lindex $args 0]
if {[catch {
thread::send $targetThread [list namespace eval ::pipethread [list lappend ::pipethread::incoming_NOCLONE($handle) [list eof]]]
}]} {
return false
}
return true
}
proc ::pipethread::put args {
if {[lindex $args 0] eq "-position"} {
set position [lindex $args 1]
set args [lrange $args 2 end]
} else {
set position 1
}
if {[llength $args] != 2} {
return -code error "wrong # args: should be put ?-position <position>? handle message"
}
set targetThread [getThreadId $position]
set handle [lindex $args 0]
set message [lindex $args 1]
if {[catch {
thread::send $targetThread [list namespace eval ::pipethread [list lappend ::pipethread::incoming_NOCLONE($handle) [list data $message]]]
}]} {
return false
}
return true
}
proc ::pipethread::get {handle messageVar} {
namespace eval ::pipethread {}
set haveMessage false
if {[info exists ::pipethread::incoming_NOCLONE($handle)]} {
if {[llength $::pipethread::incoming_NOCLONE($handle)] > 0} {
set haveMessage true
}
}
if {!$haveMessage} {
vwait ::pipethread::incoming_NOCLONE($handle)
}
set messageType [lindex $::pipethread::incoming_NOCLONE($handle) 0 0]
switch -- $messageType {
"eof" {
unset ::pipethread::incoming_NOCLONE($handle)
return false
}
"data" {
upvar $messageVar message
set message [lindex $::pipethread::incoming_NOCLONE($handle) 0 1]
}
}
set ::pipethread::incoming_NOCLONE($handle) [lrange $::pipethread::incoming_NOCLONE($handle) 1 end]
return true
}
package provide pipethread 0.1
if {[info exists ::pipethread::demos]} {
package forget pipethread
################################################################################
### DEMOS ######################################################################
################################################################################
# Define some basic procedures
package require Ttrace
ttrace::enable
proc a {inchan outchan} {
puts "A: STARTING"
puts $outchan a
puts $outchan b
puts $outchan what
puts $outchan whatwhat
puts $outchan c
puts "A: ENDING"
}
proc b {arg inchan outchan} {
puts "B: STARTING"
while true {
gets $inchan line
if {[eof $inchan] && $line == ""} {
break
}
if {![string match "$arg*" $line]} {
continue
}
puts $outchan $line
puts "B: SENDING: $line"
}
puts "B: ENDING"
}
proc c {inchan outchan} {
puts "C: STARTING"
gets $inchan line
puts "C: Got: $line"
puts "C: ENDING"
}
# Start doing things
set what what
ttrace::disable
# Output:
# --- DEMO 1 ---
# A: STARTING
# A: ENDING
# B: STARTING
# C: STARTING
# B: SENDING: what
# B: SENDING: whatwhat
# B: ENDING
# C: Got: what
# C: ENDING
puts "--- DEMO 1 ---"
::pipethread::pipe -cloneInterpCommand _TTRACE a | b $what | c
# Output:
# --- DEMO 2 ---
# A: STARTING
# A: ENDING
# C: STARTING
# C: Got: ok
# C: ENDING
puts "--- DEMO 2 ---"
::pipethread::pipe a | { puts $outchan ok } | c
# Output:
# --- DEMO 3 ---
# c.1
# b.1
# a.1
# <blank line>
#
# Note the extra blank line at the bottom
# This is because [puts $outchan $item.1] writes a newline
# and puts [pipethread::pipe] also writes a newline
puts "--- DEMO 3 ---"
puts [pipethread::pipe { puts $outchan "a"; puts $outchan "b"; puts $outchan "c" } | { foreach item [lreverse [read $inchan]] { puts $outchan $item.1 } }]
# Output:
# --- DEMO 4 ---
# 1:a
# 2:a
# 3:a
# 1:b
# 2:b
# 3:b
# 1:c
# 2:c
# 3:c
puts "--- DEMO 4 ---"
pipethread::pipe {
puts $outchan "a"
puts $outchan "b"
puts $outchan "c"
} | lmap line { list 1:$line 2:$line 3:$line } | foreach line {
puts $line
}
# Output:
# --- DEMO 5 ---
# LINEx1: 4
# LINEx2: AGAIN: 5
puts "--- DEMO 5 ---"
pipethread::pipe {
puts $outchan 1
puts $outchan 2
puts $outchan 3
puts $outchan 4
puts $outchan 5
} | {
pipethread::pipe -inchan $inchan -outchan $outchan {
gets $inchan ignore
gets $inchan accept
puts $outchan $accept
gets $inchan ignore
gets $inchan accept
puts $outchan $accept
} | {
gets $inchan ignore
gets $inchan accept
puts $outchan $accept
}
pipethread::pipe -inchan $inchan -outchan $outchan {
gets $inchan line
puts $outchan "AGAIN: $line"
}
} | {
gets $inchan line
puts "LINEx1: $line"
gets $inchan line
puts "LINEx2: $line"
}
# Experiment
# --- DEMO 6 ---
# HELLO
puts "--- DEMO 6 ---"
pipethread::pipe -outchan stdout {
pipethread::put default "HELLO"
pipethread::putEof default
while true {
if {![pipethread::put default "WORLD"]} {
break
}
}
} | {
pipethread::get default line
puts $outchan $line
}
# Overloaded exec demo
# Output:
# --- DEMO 7 ---
# <pwd>
# <hostname>
puts "--- DEMO 7 ---"
pipethread::pipe -outchan stdout {
puts $outchan [info hostname]
puts $outchan [pwd]
} | exec tac
# Socket demo: Creates a set of threads for every socket created
# When a client connects to the socket the conversation is as follows:
# [file9] HELLO ! You are 1 !
# <wait for a line of input>
# [file9] GOOD TO KNOW: <the input from above> from 127.0.0.1/47966 [file10<->file13]
# <connection closed>
#
# and the output looks like:
# --- DEMO SOCKET ---
# CLOSING SOCK sock7fa6bc03b630
puts "--- DEMO SOCKET ---"
proc handleConnection {sock addr port} {
incr ::connectionNumber
pipethread::pipe -async "puts stderr \"CLOSING SOCK $sock\"; close $sock" -inchan $sock -outchan $sock {
puts $outchan "HELLO ! You are $::connectionNumber !"
flush $outchan
gets $inchan line
puts $outchan "GOOD TO KNOW: $line from $addr/$port \[$inchan<->$outchan\]"
} | {
while true {
gets $inchan line
if {[eof $inchan] && $line eq ""} {
break
}
puts $outchan "\[$outchan\] $line"
flush $outchan
}
}
}
proc handleConnectionFake {sock addr port} {
after 1 [list handleConnection $sock $addr $port]
}
socket -server handleConnectionFake 8410
vwait forever
}
|