#! /usr/bin/env tclsh package require Tcl 8.6 package require Thread namespace eval ::pipethread {} # Default command to clone an interpreter into a new thread proc ::pipethread::_cloneInterp {thread {namespace "::"}} { # Load all currently loaded packages into the new thread if {$namespace eq "::"} { thread::send $thread [list set auto_path $::auto_path] foreach package [package names] { set present 0 catch { package present $package set present 1 } if {!$present} { continue } thread::send $thread [list package require $package] } } # Create the namespace we are operating on thread::send $thread [list namespace eval $namespace ""] # Create a valid namespace prefix string set namespacePrefix $namespace if {$namespacePrefix ne "::"} { append namespacePrefix "::" } # Copy all namespace variables into the new thread foreach var [info vars ${namespacePrefix}*] { # Skip variables explicitly marked as noclone if {[string match "*_NOCLONE" $var]} { continue } if {[array exists $var]} { thread::send $thread [list array set $var [array get $var]] } elseif {[info exists $var]} { thread::send $thread [list set $var [set $var]] } } if {$namespace eq "::"} { # Copy all local variables into the new thread as global variables # This may not be the best approach -- but it has to go somewhere... if {[uplevel 2 {info level}] != 0} { foreach var [uplevel 2 {info vars}] { if {[uplevel 2 [list array exists $var]]} { thread::send $thread [list array set $var [uplevel 2 [list array get $var]]] } else { thread::send $thread [list set $var [uplevel 2 [list set $var]]] } } } } # Copy all procedures into the new thread foreach proc [info proc ${namespacePrefix}*] { ## Skip imported procs, hopefully they'll get re-imported if {[namespace origin $proc] ne [namespace which $proc]} { continue } set procArgs [list] foreach arg [info args $proc] { if {[info default $proc $arg argDefault]} { lappend procArgs [list $arg $argDefault] } else { lappend procArgs $arg } } thread::send $thread [list proc $proc $procArgs [info body $proc]] } # For every child namespace of the current namespace, repeat this whole process foreach childNamespace [namespace children $namespace] { _cloneInterp $thread $childNamespace } return } proc ::pipethread::_traceInterp {thread} { thread::send $thread [list package require Ttrace] } # This invokes the closure responsible for cleaning up a thread that has been terminated # Arguments: # count -- The number of threads that have been created (also the number of times # this closure will be called) # vars -- A dictionary containing the variables to set in the closure # closure -- The actual lambda itself, it will be [apply]'d # _watchVar -- The name of the variable being traced... this is not # passed through to the closure but used for internal # purposes # thread -- The thread that has been terminated # _watchOp -- The operation that occured -- this is an implementation detail proc ::pipethread::_invokeClosure {count vars closure _watchVar thread _watchOp} { # Ignore writes to our internal bookkeeping values, they're not # relevant to the closure which only [apply]'d when a thread terminates if {$thread == "threadToChan" || $thread == "closureCallCount"} { return } # Determine how many times this closure has been called incr ${_watchVar}(closureCallCount) set closureCallCount [set ${_watchVar}(closureCallCount)] # If this call is the final call, set the appropriate variable # to pass to the lambda if {$closureCallCount == $count} { lappend vars finalInvocation true } else { lappend vars finalInvocation false } # Set additional variables the lambda expects lappend vars thread $thread if {![dict exists $vars threadToChan]} { lappend vars threadToChan [set ${_watchVar}(threadToChan)] } # Invoke the lambda apply $closure $vars } # Debugging log procedure # Replace as needed proc ::pipethread::_debugLog err { return; puts stderr "$err" } # Starts a set of "coprocessing" pipes, similar to UNIX shell # # Each block will execute in its own thread with two additional # channels: inchan and outchan # # These channels represent pipes coming from the previous block (inchan) # or going to the next block (outchan) # # Blocks may either be command prefixes or a block of commands # # If a block is a command prefix then two values are appended to it before # being invoked: inchan outchan # # If a block is a block of commands then two variables are set: inchan outchan # before the block is started. # # Examples of both: # 1. Command prefixes: # pipethread::pipe firstBlock | secondBlock | thirdBlock abcdef # # This creates three threads and in each thread invokes: # a. firstBlock $inchan $outchan # b. secondBlock $inchan $outchan # c. thirdBlock abcdef $inchan $outchan # # 2. Block of commands: # pipethread::pipe { puts $outchan ok } | { gets $inchan line; puts $line } # # This creates two threads and in each thread invokes: # a. set inchan <inchan>; set outchan <outchan>; puts $outchan ok # b. set inchan <inchan>; set outchan <outchan>; gets $inchan line; puts $line # # You may use both types together. # # Optional arguments: # -cloneInterpCommand <commandPrefix> # Specify the command to run to clone the interpreter into each # new thread. If specified as the empty string then new # interpreters will have no initialization done on them. # # This is a command prefix and the thread ID is added to it. # # There are some magic values for this command prefix: # _DEFAULT: Use the default command prefix # _TTRACE: Use the Ttrace mechanism # # -inchan <chan> # Specify a channel to be used as the input channel to the # first block. If this is not specified then the first block # will get "" as its input channel. # # -outchan <chan> # Specify a channel to be used as the output channel from the # final block. If this is not specified then the final block # will get either "" or a pipe channel as its output channel. # The default is to create a pipe channel and anything written # to it will be [return]'d by pipechan::pipe, however if -async # mode is used (below) then this channel will not be created and # the final block will get "" as its output channel unless this # option is used. # # -async <cleanup> # Specify that: # 1. pipethread::pipe should not wait in the event loop (by # calling [vwait]); and # 2. pipethread::pipe should return immediately after starting # the threads and connecting their pipes, the caller is # then responsible for waiting in the event loop so that # thread termination is handled. # When all the blocks terminate (and the final block may not always # by the last block to terminate) the script <cleanup> is invoked # in the global scope # # -- # Signals the end of arguments proc ::pipethread::pipe args { # Create a unique handle for this series of blocks and pipes incr [namespace current]::handles set handle [set [namespace current]::handles] # Special values for the -cloneInterpCommand option set cloneInterpCommandMagic(_DEFAULT) [namespace current]::_cloneInterp set cloneInterpCommandMagic(_TTRACE) [namespace current]::_traceInterp # Process commandline arguments ## Set default values set cloneInterpCommand _DEFAULT set initialThreadReader "" set finalThreadWriter "" set operateAsync false ## Collect arguments for {set idx 0} {$idx < [llength $args]} {incr idx} { set arg [lindex $args $idx] switch -glob -- $arg { "-cloneInterpCommand" { incr idx set cloneInterpCommand [lindex $args $idx] } "-inchan" { incr idx set initialThreadReader [lindex $args $idx] } "-outchan" { incr idx set finalThreadWriter [lindex $args $idx] } "-async" { set operateAsync true incr idx set onCompleteCleanup [lindex $args $idx] } "--" { incr idx break } "-*" { return -code error "Invalid option: $arg, must be one of: -cloneInterpCommand, -inchan, -outchan, -async, or --" } default { break } } } ## Update the list of arguments to remove the ones we consumed set args [lrange $args $idx end] # Replace the cloneInterpCommand magic values with actual ones if appropriate if {[info exists cloneInterpCommandMagic($cloneInterpCommand)]} { set cloneInterpCommand $cloneInterpCommandMagic($cloneInterpCommand) } # If we are asked to read and write from the same channel (for example # a socket) and we have more than one block, we need to create two # channels from it since we need to read from the channel in the first block # and write to it in the final block but we cannot give the channel to two # different threads # # We create 2 new pipe-pairs and a thread to bridge them and we copy between # them in the event loop. if {$initialThreadReader eq $finalThreadWriter && $finalThreadWriter ne "" && [lsearch -exact $args "|"] != -1} { set copyThreadChannel $initialThreadReader set copyThread [thread::create] thread::transfer $copyThread $copyThreadChannel thread::send $copyThread [list set copyThreadChannel $copyThreadChannel] set copyPipe [thread::send $copyThread { set readerPipe [chan pipe] set writerPipe [chan pipe] set readerPipeReader [lindex $readerPipe 0] set readerPipeWriter [lindex $readerPipe 1] set writerPipeReader [lindex $writerPipe 0] set writerPipeWriter [lindex $writerPipe 1] foreach fd [list $readerPipeReader $readerPipeWriter $writerPipeReader $writerPipeWriter] { fconfigure $fd -translation binary } thread::detach $writerPipeReader thread::detach $readerPipeWriter proc copyComplete {var args} { incr ::copyComplete set sock [set ::$var] unset ::$var close $sock } proc terminate {} { if {[info exists ::writerPipeWriter]} { close $::writerPipeWriter } if {[info exists ::readerPipeReader]} { close $::readerPipeReader } thread::detach $::copyThreadChannel } fcopy $copyThreadChannel $writerPipeWriter -command [list copyComplete writerPipeWriter] fcopy $readerPipeReader $copyThreadChannel -command [list copyComplete readerPipeReader] list $writerPipeReader $readerPipeWriter }] set copyPipeReader [lindex $copyPipe 0] set copyPipeWriter [lindex $copyPipe 1] thread::attach $copyPipeReader thread::attach $copyPipeWriter set initialThreadReader $copyPipeReader set finalThreadWriter $copyPipeWriter } # Process the list of blocks set blockThreadList [list] set toRun [list] set finalBlock false for {set idx 0} {$idx < [llength $args]} {incr idx} { set start $idx set end [lsearch -start $start -exact $args "|"] if {$end == -1} { set finalBlock true set end [llength $args] } # Determine the command to run, it is between the two pipes. set command [lrange $args $start $end-1] # Determine if the block is a command prefix or a block of code # # This is not easy to do authoritatively, we are just faking it # we determine that it is a block of code if: # 1. It is a single list element long # 2. The string representation begins with the list markers '{' and '}' # This can fail if you use a command prefix that has special characters in it # and accepts no additional parameters. You can workaround it by supplying a # dummy parameter. set anonymous false if {[llength $command] == 1 && [string index $command 0] == "\{" && [string index $command end] == "\}"} { set anonymous true set command [lindex $command 0] } else { switch -- [lindex $command 0] { "exec" { set command [join [list \ [list set command $command] \ {if {$inchan ne ""} { lappend command <@ $inchan } } \ {if {$outchan ne ""} { lappend command >@ $outchan } } \ {eval $command} \ ] "\n"] set command [list apply [list {inchan outchan} $command]] } "foreach" { set lineVar [lindex $command 1] set commandBlock [lindex $command 2] set command { uplevel #0 [list set inchan $inchan] uplevel #0 [list set outchan $outchan] uplevel #0 { while true { gets $inchan @@line@@ if {[eof $inchan] && $@@line@@ eq ""} { break } @@command@@ } } } set command [string map [list @@line@@ $lineVar @@command@@ $commandBlock] $command] set command [list apply [list {inchan outchan} $command]] } "lmap" { set lineVar [lindex $command 1] set commandBlock [lindex $command 2] set command { uplevel #0 [list set inchan $inchan] uplevel #0 [list set outchan $outchan] uplevel #0 { while true { gets $inchan @@line@@ if {[eof $inchan] && $@@line@@ eq ""} { break } foreach ::pipethread::internal_line_NOCLONE [@@command@@] { puts $outchan $::pipethread::internal_line_NOCLONE } } } } set command [string map [list @@line@@ $lineVar @@command@@ $commandBlock] $command] set command [list apply [list {inchan outchan} $command]] } } } # Create the pipe to link the output of this thread/block # to the input of the next thread/block if {!$finalBlock || ($finalThreadWriter eq "" && !$operateAsync)} { ## For most blocks, a standard pipe is used ## Also for the final block a pipe is created that is ## tied to a callback to read into a variable unless ## an output channel is given with -outchan ## ## If we are operating asyncronously then we do ## not need to do this for the final block ## even if no -outchan was specified since ## we do not read from it, since we cannot ## return it anywhere set pipe [chan pipe] } else { set pipe [list "" $finalThreadWriter] } set pipeReader [lindex $pipe 0] set pipeWriter [lindex $pipe 1] # If this block needs a new thread, create a new thread and # park it in the event loop set thread [thread::create] # If this is the first block, remember that so we # know its threadID for later use if {$start == 0} { set initialThread $thread } # Just keep track of all this information so we can # start executing code in the threads AFTER we have # created all the pipes lappend toRun [list thread $thread anonymous $anonymous command $command pipeReader $pipeReader pipeWriter $pipeWriter] lappend blockThreadList $thread set idx $end } # Keep track of the threadID for the final block set finalThread $thread # If we are operating syncronously and we were not given a channel to # write the final block to then we created a pipe -- monitor that pipe # and accumulate its contents into our return buffer for this series # of blocks. if {$finalThreadWriter eq "" && !$operateAsync} { set finalThreadReader $pipeReader set returnValueVariable "[namespace current]::returnValue_$handle" fconfigure $finalThreadReader -blocking 0 -translation binary fileevent $finalThreadReader readable "append $returnValueVariable \[read $finalThreadReader\]" } # This is the name of the array that we will be using to get notified # of thread termination either by [vwait]'ing it directly or by # [trace add variable] invoking is asyncronously if -async is used set threadResultVariable "[namespace current]::resultArray_$handle" # Create the list of variables and their values to pass to our thread # cleanup closure set cleanupClosureVars [list] foreach var [list handle operateAsync initialThread finalThread initialThreadReader finalThreadWriter copyThread copyThreadChannel onCompleteCleanup threadResultVariable] { if {[info exists $var]} { lappend cleanupClosureVars $var [set $var] } } # Our thread cleanup closure # # This is invoked whenever a thread terminates, local variables are passed in using a dict (_vars), # each key is the name of a local variable and the value is the value of that variable. set cleanupClosure {_vars { foreach {_var _val} $_vars { set $_var $_val } _debugLog "\[$thread\]: <-EXIT-" thread::send $thread [list foreach chan [dict get $threadToChan $thread] { catch { close $chan } }] if {![info exists copyThread]} { if {$thread eq $initialThread} { if {$initialThreadReader ni [list "" stdin $finalThreadWriter]} { thread::send $thread [list thread::detach $initialThreadReader] thread::attach $initialThreadReader } } if {$thread eq $finalThread} { if {$finalThreadWriter ni [list "" stdout stderr]} { thread::send $thread [list thread::detach $finalThreadWriter] thread::attach $finalThreadWriter } } } _debugLog "\[$thread\]: <-RELEASE-" thread::release -wait $thread _debugLog "\[$thread\]: <-DONE-" if {$finalInvocation} { # Work around a bug in Tcl: # http://core.tcl.tk/thread/tktview/84be1b5a7 if {$operateAsync} { after idle [list unset -nocomplain $threadResultVariable] } else { unset -nocomplain $threadResultVariable } if {[info exists copyThread]} { thread::send $copyThread [list terminate] thread::release -wait $copyThread thread::attach $copyThreadChannel } if {[info exists onCompleteCleanup]} { uplevel #0 $onCompleteCleanup } } } } lappend cleanupClosure [namespace current] # If we are operating asyncronously create the thread result variable and # add a trace to it. We do this before invoking the blocks so that if they # terminate we are notified of it even if they do so before we are finished # requesting all of them start if {$operateAsync} { array set $threadResultVariable [list] trace add variable $threadResultVariable [list write] [list [namespace current]::_invokeClosure [llength $toRun] $cleanupClosureVars $cleanupClosure] } # Start executing each block, from first to last # # We send the script to each thread asyncronously and configure # it to notify us via a write to an array with the thread ID as # the array key for {set idx 0} {$idx < [llength $toRun]} {incr idx} { set info [lindex $toRun $idx] # Determine the input channel and output channel ## The input channel is the read side of the pipe ## from the previous block, if there was no previous ## block then the input channel is the -inchan value ## (blank by default) if {$idx > 0} { set inchan [dict get [lindex $toRun $idx-1] pipeReader] } else { set inchan $initialThreadReader } set outchan [dict get $info pipeWriter] set thread [dict get $info thread] set anonymous [dict get $info anonymous] set command [dict get $info command] # Transfer channels that this thread needs to use to that # thread. # # Keep track of the channels which the thread needs to close # when it terminates as well (threadToChan) because we need # to explicitly do this to avoid background errors. dict set ${threadResultVariable}(threadToChan) $thread [list] if {$inchan ne ""} { if {$inchan ne [list stdin]} { thread::transfer $thread $inchan if {$inchan ne $initialThreadReader || [info exists copyThread]} { dict lappend ${threadResultVariable}(threadToChan) $thread $inchan } } } if {$outchan ne ""} { if {$outchan ni [list stdout stderr $inchan]} { thread::transfer $thread $outchan if {$outchan ne $finalThreadWriter || [info exists copyThread]} { dict lappend ${threadResultVariable}(threadToChan) $thread $outchan } } } # Clone the current interpreter into the thread interpreter as # requested by the user. if {$cloneInterpCommand ne ""} { $cloneInterpCommand $thread } # Update the command to execute with the parameters we have determined added. if {!$anonymous} { lappend command $inchan $outchan } else { set command "set inchan \"$inchan\";set outchan \"$outchan\"\n$command" } # Give the thread a variable that lists all the other threads in the thread::send $thread [list namespace eval ::pipethread [list set ::pipethread::blockThreadList $blockThreadList]] # Store inchan and outchan in a namespace variable so that we can retrieve it thread::send $thread [list namespace eval ::pipethread [list set ::pipethread::inchan_NOCLONE $inchan]] thread::send $thread [list namespace eval ::pipethread [list set ::pipethread::outchan_NOCLONE $outchan]] # Send the command to the thread interpreter _debugLog "\[$thread\]: -EXEC-> [string map [list "\n" "\\n"] $command]" thread::send -async $thread $command ${threadResultVariable}($thread) } # At this point, if we are operating asyncronously then we are done. # The event loop will invoke our variable traces when the commands specified # in the [thread::send] immediately above completes for each block and we # will clean those threads then if {$operateAsync} { # Return something that could be meaningful, the list of # threads involved in this group of blocks return $blockThreadList } # If we are operating syncronously then we need to wait here for the result # array to be written to... if {!$operateAsync} { # Add the complete list of channels for each thread to the closure # variable list, since we know them all now lappend cleanupClosureVars threadToChan [set ${threadResultVariable}(threadToChan)] unset ${threadResultVariable}(threadToChan) # Wait in the event loop for our variable to be written to. # Wait exactly the same number of times as we created threads # since each thread will set the variable when it terminates for {set idx 0} {$idx < [llength $toRun]} {incr idx} { _debugLog "\[---\]: WAITING" vwait $threadResultVariable # Determine the thread ID from the array key that has # been set set thread [lindex [array names $threadResultVariable] 0] unset ${threadResultVariable}($thread) # If this is the last block to complete, note it to pass # to the lambda if {$idx == ([llength $toRun] - 1)} { set finalInvocation true } else { set finalInvocation false } # Construct a local copy the local variables dict # for the lambda so we can update it with values # that are specific to this invokation set varsDict $cleanupClosureVars lappend varsDict thread $thread lappend varsDict finalInvocation $finalInvocation # Call the lambda apply $cleanupClosure $varsDict } # At this point all the threads have been terminated, including any # final cleanup code # # If there is no output channel given to the last block that means # a pipe was created and we should consume anything that remains in # it and close our side (the other side is closed already, all threads # are gone) and accumulate it further into our buffer and return # that buffer if {$finalThreadWriter eq ""} { append $returnValueVariable [read $finalThreadReader] close $finalThreadReader set returnValue [set $returnValueVariable] unset $returnValueVariable return $returnValue } } return } # Return the inchan/outchan proc ::pipethread::inchan {} { if {![info exists ::pipethread::inchan_NOCLONE]} { return stdin } return $::pipethread::inchan_NOCLONE } proc ::pipethread::outchan {} { if {![info exists ::pipethread::outchan_NOCLONE]} { return stdout } return $::pipethread::outchan_NOCLONE } # Provide an infinite buffer that will read from inchan as it is readable and write to # outchan as it is writable. proc ::pipethread::infiniteBuffer args { if {[llength $args] < 2} { return -code error "wrong # args: pipethread::pipe ?args? inchan outchan" } set inchan [lindex $args end-1] set outchan [lindex $args end] set args [lrange $args 0 end-2] set ::options(-bufferOnWriteError) false fconfigure $inchan -blocking false -translation binary fconfigure $outchan -blocking false -translation binary foreach arg $args { switch -- $arg { "-bufferOnWriteError" { set ::options($arg) true } "-terminateOnWriteError" { set ::options(-bufferOnWriteError) false } } } set ::readComplete false set ::writeComplete false set ::copyComplete false fileevent $inchan readable [list apply {{inchan outchan} { set data [read $inchan] if {$data ne ""} { append ::buffer $data } else { if {[eof $inchan]} { fileevent $inchan readable {} set ::readComplete true } } if {$::writeComplete} { unset -nocomplain ::buffer if {$::readComplete} { set ::copyComplete true } return } if {[fileevent $outchan writable] eq ""} { fileevent $outchan writable [list apply {{outchan} { if {[info exists ::buffer]} { if {[catch { puts -nonewline $outchan $::buffer flush $outchan unset ::buffer }]} { if {$::options(-bufferOnWriteError)} { set ::writeComplete true } else { set ::copyComplete true } } } fileevent $outchan writable {} if {$::readComplete} { set ::copyComplete true } }} $outchan] } }} $inchan $outchan] vwait ::copyComplete } # The found thread may not exist or it may be a different process altogether proc ::pipethread::getThreadId {relative} { set threadIndex [lsearch -exact $::pipethread::blockThreadList [thread::id]] set foundThread [lindex $::pipethread::blockThreadList [expr {$threadIndex+$relative}]] return $foundThread } # XXX: This may go away or be replaced by something better -- its just an experiment proc ::pipethread::putEof args { if {[lindex $args 0] eq "-position"} { set position [lindex $args 1] set args [lrange $args 2 end] } else { set position 1 } if {[llength $args] != 1} { return -code error "wrong # args: should be putEof ?-position <position>? handle" } set targetThread [getThreadId $position] set handle [lindex $args 0] if {[catch { thread::send $targetThread [list namespace eval ::pipethread [list lappend ::pipethread::incoming_NOCLONE($handle) [list eof]]] }]} { return false } return true } proc ::pipethread::put args { if {[lindex $args 0] eq "-position"} { set position [lindex $args 1] set args [lrange $args 2 end] } else { set position 1 } if {[llength $args] != 2} { return -code error "wrong # args: should be put ?-position <position>? handle message" } set targetThread [getThreadId $position] set handle [lindex $args 0] set message [lindex $args 1] if {[catch { thread::send $targetThread [list namespace eval ::pipethread [list lappend ::pipethread::incoming_NOCLONE($handle) [list data $message]]] }]} { return false } return true } proc ::pipethread::get {handle messageVar} { namespace eval ::pipethread {} set haveMessage false if {[info exists ::pipethread::incoming_NOCLONE($handle)]} { if {[llength $::pipethread::incoming_NOCLONE($handle)] > 0} { set haveMessage true } } if {!$haveMessage} { vwait ::pipethread::incoming_NOCLONE($handle) } set messageType [lindex $::pipethread::incoming_NOCLONE($handle) 0 0] switch -- $messageType { "eof" { unset ::pipethread::incoming_NOCLONE($handle) return false } "data" { upvar $messageVar message set message [lindex $::pipethread::incoming_NOCLONE($handle) 0 1] } } set ::pipethread::incoming_NOCLONE($handle) [lrange $::pipethread::incoming_NOCLONE($handle) 1 end] return true } package provide pipethread 0.1 if {[info exists ::pipethread::demos]} { package forget pipethread ################################################################################ ### DEMOS ###################################################################### ################################################################################ # Define some basic procedures package require Ttrace ttrace::enable proc a {inchan outchan} { puts "A: STARTING" puts $outchan a puts $outchan b puts $outchan what puts $outchan whatwhat puts $outchan c puts "A: ENDING" } proc b {arg inchan outchan} { puts "B: STARTING" while true { gets $inchan line if {[eof $inchan] && $line == ""} { break } if {![string match "$arg*" $line]} { continue } puts $outchan $line puts "B: SENDING: $line" } puts "B: ENDING" } proc c {inchan outchan} { puts "C: STARTING" gets $inchan line puts "C: Got: $line" puts "C: ENDING" } # Start doing things set what what ttrace::disable # Output: # --- DEMO 1 --- # A: STARTING # A: ENDING # B: STARTING # C: STARTING # B: SENDING: what # B: SENDING: whatwhat # B: ENDING # C: Got: what # C: ENDING puts "--- DEMO 1 ---" ::pipethread::pipe -cloneInterpCommand _TTRACE a | b $what | c # Output: # --- DEMO 2 --- # A: STARTING # A: ENDING # C: STARTING # C: Got: ok # C: ENDING puts "--- DEMO 2 ---" ::pipethread::pipe a | { puts $outchan ok } | c # Output: # --- DEMO 3 --- # c.1 # b.1 # a.1 # <blank line> # # Note the extra blank line at the bottom # This is because [puts $outchan $item.1] writes a newline # and puts [pipethread::pipe] also writes a newline puts "--- DEMO 3 ---" puts [pipethread::pipe { puts $outchan "a"; puts $outchan "b"; puts $outchan "c" } | { foreach item [lreverse [read $inchan]] { puts $outchan $item.1 } }] # Output: # --- DEMO 4 --- # 1:a # 2:a # 3:a # 1:b # 2:b # 3:b # 1:c # 2:c # 3:c puts "--- DEMO 4 ---" pipethread::pipe { puts $outchan "a" puts $outchan "b" puts $outchan "c" } | lmap line { list 1:$line 2:$line 3:$line } | foreach line { puts $line } # Output: # --- DEMO 5 --- # LINEx1: 4 # LINEx2: AGAIN: 5 puts "--- DEMO 5 ---" pipethread::pipe { puts $outchan 1 puts $outchan 2 puts $outchan 3 puts $outchan 4 puts $outchan 5 } | { pipethread::pipe -inchan $inchan -outchan $outchan { gets $inchan ignore gets $inchan accept puts $outchan $accept gets $inchan ignore gets $inchan accept puts $outchan $accept } | { gets $inchan ignore gets $inchan accept puts $outchan $accept } pipethread::pipe -inchan $inchan -outchan $outchan { gets $inchan line puts $outchan "AGAIN: $line" } } | { gets $inchan line puts "LINEx1: $line" gets $inchan line puts "LINEx2: $line" } # Experiment # --- DEMO 6 --- # HELLO puts "--- DEMO 6 ---" pipethread::pipe -outchan stdout { pipethread::put default "HELLO" pipethread::putEof default while true { if {![pipethread::put default "WORLD"]} { break } } } | { pipethread::get default line puts $outchan $line } # Overloaded exec demo # Output: # --- DEMO 7 --- # <pwd> # <hostname> puts "--- DEMO 7 ---" pipethread::pipe -outchan stdout { puts $outchan [info hostname] puts $outchan [pwd] } | exec tac # Socket demo: Creates a set of threads for every socket created # When a client connects to the socket the conversation is as follows: # [file9] HELLO ! You are 1 ! # <wait for a line of input> # [file9] GOOD TO KNOW: <the input from above> from 127.0.0.1/47966 [file10<->file13] # <connection closed> # # and the output looks like: # --- DEMO SOCKET --- # CLOSING SOCK sock7fa6bc03b630 puts "--- DEMO SOCKET ---" proc handleConnection {sock addr port} { incr ::connectionNumber pipethread::pipe -async "puts stderr \"CLOSING SOCK $sock\"; close $sock" -inchan $sock -outchan $sock { puts $outchan "HELLO ! You are $::connectionNumber !" flush $outchan gets $inchan line puts $outchan "GOOD TO KNOW: $line from $addr/$port \[$inchan<->$outchan\]" } | { while true { gets $inchan line if {[eof $inchan] && $line eq ""} { break } puts $outchan "\[$outchan\] $line" flush $outchan } } } proc handleConnectionFake {sock addr port} { after 1 [list handleConnection $sock $addr $port] } socket -server handleConnectionFake 8410 vwait forever }