Documentation
#! /usr/bin/env tclsh

package require Tcl 8.6
package require Thread

namespace eval ::pipethread {}

# Default command to clone an interpreter into a new thread
proc ::pipethread::_cloneInterp {thread {namespace "::"}} {
	# Load all currently loaded packages into the new thread
	if {$namespace eq "::"} {
		thread::send $thread [list set auto_path $::auto_path]

		foreach package [package names] {
			set present 0
			catch {
				package present $package
				set present 1
			}

			if {!$present} {
				continue
			}

			thread::send $thread [list package require $package]
		}
	}

	# Create the namespace we are operating on
	thread::send $thread [list namespace eval $namespace ""]

	# Create a valid namespace prefix string
	set namespacePrefix $namespace
	if {$namespacePrefix ne "::"} {
		append namespacePrefix "::"
	}

	# Copy all namespace variables into the new thread
	foreach var [info vars ${namespacePrefix}*] {
		# Skip variables explicitly marked as noclone
		if {[string match "*_NOCLONE" $var]} {
			continue
		}

		if {[array exists $var]} {
			thread::send $thread [list array set $var [array get $var]]
		} elseif {[info exists $var]} {
			thread::send $thread [list set $var [set $var]]
		}
	}

	if {$namespace eq "::"} {
		# Copy all local variables into the new thread as global variables
		# This may not be the best approach -- but it has to go somewhere...
		if {[uplevel 2 {info level}] != 0} {
			foreach var [uplevel 2 {info vars}] {
				if {[uplevel 2 [list array exists $var]]} {
					thread::send $thread [list array set $var [uplevel 2 [list array get $var]]]
				} else {
					thread::send $thread [list set $var [uplevel 2 [list set $var]]]
				}
			}
		}
	}

	# Copy all procedures into the new thread
	foreach proc [info proc ${namespacePrefix}*] {
		## Skip imported procs, hopefully they'll get re-imported
		if {[namespace origin $proc] ne [namespace which $proc]} {
			continue
		}

		set procArgs [list]
		foreach arg [info args $proc] {
			if {[info default $proc $arg argDefault]} {
				lappend procArgs [list $arg $argDefault]
			} else {
				lappend procArgs $arg
			}
		}

		thread::send $thread [list proc $proc $procArgs [info body $proc]]
	}

	# For every child namespace of the current namespace, repeat this whole process
	foreach childNamespace [namespace children $namespace] {
		_cloneInterp $thread $childNamespace
	}

	return
}

proc ::pipethread::_traceInterp {thread} {
	thread::send $thread [list package require Ttrace]
}

# This invokes the closure responsible for cleaning up a thread that has been terminated
# Arguments:
#      count     -- The number of threads that have been created (also the number of times
#                   this closure will be called)
#      vars      -- A dictionary containing the variables to set in the closure
#      closure   -- The actual lambda itself, it will be [apply]'d
#      _watchVar -- The name of the variable being traced... this is not
#                   passed through to the closure but used for internal
#                   purposes
#      thread    -- The thread that has been terminated
#      _watchOp  -- The operation that occured -- this is an implementation detail
proc ::pipethread::_invokeClosure {count vars closure _watchVar thread _watchOp} {
	# Ignore writes to our internal bookkeeping values, they're not
	# relevant to the closure which only [apply]'d when a thread terminates
	if {$thread == "threadToChan" || $thread == "closureCallCount"} {
		return
	}

	# Determine how many times this closure has been called
	incr ${_watchVar}(closureCallCount)

	set closureCallCount [set ${_watchVar}(closureCallCount)]

	# If this call is the final call, set the appropriate variable
	# to pass to the lambda
	if {$closureCallCount == $count} {
		lappend vars finalInvocation true
	} else {
		lappend vars finalInvocation false
	}

	# Set additional variables the lambda expects
	lappend vars thread $thread
	if {![dict exists $vars threadToChan]} {
		lappend vars threadToChan [set ${_watchVar}(threadToChan)]
	}

	# Invoke the lambda
	apply $closure $vars
}

# Debugging log procedure
# Replace as needed
proc ::pipethread::_debugLog err { return; puts stderr "$err" }

# Starts a set of "coprocessing" pipes, similar to UNIX shell
#
# Each block will execute in its own thread with two additional
# channels: inchan and outchan
#
# These channels represent pipes coming from the previous block (inchan)
# or going to the next block (outchan)
#
# Blocks may either be command prefixes or a block of commands
#
# If a block is a command prefix then two values are appended to it before
# being invoked: inchan outchan
#
# If a block is a block of commands then two variables are set: inchan outchan
# before the block is started.
#
# Examples of both:
#     1. Command prefixes:
#          pipethread::pipe firstBlock | secondBlock | thirdBlock abcdef
#
#        This creates three threads and in each thread invokes:
#            a. firstBlock $inchan $outchan
#            b. secondBlock $inchan $outchan
#            c. thirdBlock abcdef $inchan $outchan
#
#     2. Block of commands:
#          pipethread::pipe { puts $outchan ok } | { gets $inchan line; puts $line }
#
#        This creates two threads and in each thread invokes:
#            a. set inchan <inchan>; set outchan <outchan>; puts $outchan ok
#            b. set inchan <inchan>; set outchan <outchan>; gets $inchan line; puts $line
#
# You may use both types together.
#
# Optional arguments:
#      -cloneInterpCommand <commandPrefix>
#           Specify the command to run to clone the interpreter into each
#           new thread.  If specified as the empty string then new
#           interpreters will have no initialization done on them.
#
#           This is a command prefix and the thread ID is added to it.
#
#           There are some magic values for this command prefix:
#                  _DEFAULT: Use the default command prefix
#                  _TTRACE:  Use the Ttrace mechanism
#
#      -inchan <chan>
#           Specify a channel to be used as the input channel to the
#           first block.  If this is not specified then the first block
#           will get "" as its input channel.
#
#      -outchan <chan>
#           Specify a channel to be used as the output channel from the
#           final block.  If this is not specified then the final block
#           will get either "" or a pipe channel as its output channel.
#           The default is to create a pipe channel and anything written
#           to it will be [return]'d by pipechan::pipe, however if -async
#           mode is used (below) then this channel will not be created and
#           the final block will get "" as its output channel unless this
#           option is used.
#
#      -async <cleanup>
#           Specify that:
#               1. pipethread::pipe should not wait in the event loop (by
#                  calling [vwait]); and
#               2. pipethread::pipe should return immediately after starting
#                  the threads and connecting their pipes, the caller is
#                  then responsible for waiting in the event loop so that
#                  thread termination is handled.
#           When all the blocks terminate (and the final block may not always
#           by the last block to terminate) the script <cleanup> is invoked
#           in the global scope
#
#      --
#           Signals the end of arguments
proc ::pipethread::pipe args {
	# Create a unique handle for this series of blocks and pipes
	incr [namespace current]::handles
	set handle [set [namespace current]::handles]

	# Special values for the -cloneInterpCommand option
	set cloneInterpCommandMagic(_DEFAULT) [namespace current]::_cloneInterp
	set cloneInterpCommandMagic(_TTRACE) [namespace current]::_traceInterp

	# Process commandline arguments
	## Set default values
	set cloneInterpCommand _DEFAULT
	set initialThreadReader ""
	set finalThreadWriter ""
	set operateAsync false

	## Collect arguments
	for {set idx 0} {$idx < [llength $args]} {incr idx} {
		set arg [lindex $args $idx]
		switch -glob -- $arg {
			"-cloneInterpCommand" {
				incr idx

				set cloneInterpCommand [lindex $args $idx]
			}
			"-inchan" {
				incr idx

				set initialThreadReader [lindex $args $idx]
			}
			"-outchan" {
				incr idx

				set finalThreadWriter [lindex $args $idx]
			}
			"-async" {
				set operateAsync true

				incr idx

				set onCompleteCleanup [lindex $args $idx]
			}
			"--" {
				incr idx

				break
			}
			"-*" {
				return -code error "Invalid option: $arg, must be one of: -cloneInterpCommand, -inchan, -outchan, -async, or --"
			}
			default {
				break
			}
		}
	}

	## Update the list of arguments to remove the ones we consumed
	set args [lrange $args $idx end]

	# Replace the cloneInterpCommand magic values with actual ones if appropriate
	if {[info exists cloneInterpCommandMagic($cloneInterpCommand)]} {
		set cloneInterpCommand $cloneInterpCommandMagic($cloneInterpCommand)
	}

	# If we are asked to read and write from the same channel (for example
	# a socket) and we have more than one block, we need to create two
	# channels from it since we need to read from the channel in the first block
	# and write to it in the final block but we cannot give the channel to two
	# different threads
	#
	# We create 2 new pipe-pairs and a thread to bridge them and we copy between
	# them in the event loop.
	if {$initialThreadReader eq $finalThreadWriter && $finalThreadWriter ne "" && [lsearch -exact $args "|"] != -1} {
		set copyThreadChannel $initialThreadReader

		set copyThread [thread::create]

		thread::transfer $copyThread $copyThreadChannel
		thread::send $copyThread [list set copyThreadChannel $copyThreadChannel]
		set copyPipe [thread::send $copyThread {
			set readerPipe [chan pipe]
			set writerPipe [chan pipe]

			set readerPipeReader [lindex $readerPipe 0]
			set readerPipeWriter [lindex $readerPipe 1]
			set writerPipeReader [lindex $writerPipe 0]
			set writerPipeWriter [lindex $writerPipe 1]

			foreach fd [list $readerPipeReader $readerPipeWriter $writerPipeReader $writerPipeWriter] {
				fconfigure $fd -translation binary
			}

			thread::detach $writerPipeReader
			thread::detach $readerPipeWriter

			proc copyComplete {var args} {
				incr ::copyComplete

				set sock [set ::$var]

				unset ::$var

				close $sock
			}

			proc terminate {} {
				if {[info exists ::writerPipeWriter]} {
					close $::writerPipeWriter
				}

				if {[info exists ::readerPipeReader]} {
					close $::readerPipeReader
				}

				thread::detach $::copyThreadChannel
			}

			fcopy $copyThreadChannel $writerPipeWriter -command [list copyComplete writerPipeWriter]
			fcopy $readerPipeReader $copyThreadChannel -command [list copyComplete readerPipeReader]

			list $writerPipeReader $readerPipeWriter
		}]

		set copyPipeReader [lindex $copyPipe 0]
		set copyPipeWriter [lindex $copyPipe 1]

		thread::attach $copyPipeReader
		thread::attach $copyPipeWriter

		set initialThreadReader $copyPipeReader
		set finalThreadWriter $copyPipeWriter
	}

	# Process the list of blocks
	set blockThreadList [list]
	set toRun [list]
	set finalBlock false
	for {set idx 0} {$idx < [llength $args]} {incr idx} {
		set start $idx

		set end [lsearch -start $start -exact $args "|"]

		if {$end == -1} {
			set finalBlock true

			set end [llength $args]
		}

		# Determine the command to run, it is between the two pipes.
		set command [lrange $args $start $end-1]

		# Determine if the block is a command prefix or a block of code
		#
		# This is not easy to do authoritatively, we are just faking it
		# we determine that it is a block of code if:
		#      1. It is a single list element long
		#      2. The string representation begins with the list markers '{' and '}'
		# This can fail if you use a command prefix that has special characters in it
		# and accepts no additional parameters.  You can workaround it by supplying a
		# dummy parameter.
		set anonymous false
		if {[llength $command] == 1 && [string index $command 0] == "\{" && [string index $command end] == "\}"} {
			set anonymous true

			set command [lindex $command 0]
		} else {
			switch -- [lindex $command 0] {
				"exec" {
					set command [join [list \
						[list set command $command] \
						{if {$inchan ne ""} { lappend command <@ $inchan } } \
						{if {$outchan ne ""} { lappend command >@ $outchan } } \
						{eval $command} \
					] "\n"]
					set command [list apply [list {inchan outchan} $command]]
				}
				"foreach" {
					set lineVar [lindex $command 1]
					set commandBlock [lindex $command 2]
					set command {
						uplevel #0 [list set inchan $inchan]
						uplevel #0 [list set outchan $outchan]
						uplevel #0 {
							while true {
								gets $inchan @@line@@
								if {[eof $inchan] && $@@line@@ eq ""} {
									break
								}

								@@command@@
							}
						}
					}

					set command [string map [list @@line@@ $lineVar @@command@@ $commandBlock] $command]
					set command [list apply [list {inchan outchan} $command]]
				}
				"lmap" {
					set lineVar [lindex $command 1]
					set commandBlock [lindex $command 2]
					set command {
						uplevel #0 [list set inchan $inchan]
						uplevel #0 [list set outchan $outchan]
						uplevel #0 {
							while true {
								gets $inchan @@line@@
								if {[eof $inchan] && $@@line@@ eq ""} {
									break
								}

								foreach ::pipethread::internal_line_NOCLONE [@@command@@] { puts $outchan $::pipethread::internal_line_NOCLONE }
							}
						}
					}

					set command [string map [list @@line@@ $lineVar @@command@@ $commandBlock] $command]
					set command [list apply [list {inchan outchan} $command]]
				}
			}
		}

		# Create the pipe to link the output of this thread/block
		# to the input of the next thread/block
		if {!$finalBlock || ($finalThreadWriter eq "" && !$operateAsync)} {
			## For most blocks, a standard pipe is used
			## Also for the final block a pipe is created that is
			## tied to a callback to read into a variable unless
			## an output channel is given with -outchan
			##
			## If we are operating asyncronously then we do
			## not need to do this for the final block
			## even if no -outchan was specified since
			## we do not read from it, since we cannot
			## return it anywhere
			set pipe [chan pipe]
		} else {
			set pipe [list "" $finalThreadWriter]
		}

		set pipeReader [lindex $pipe 0]
		set pipeWriter [lindex $pipe 1]

		# If this block needs a new thread, create a new thread and
		# park it in the event loop
		set thread [thread::create]

		# If this is the first block, remember that so we
		# know its threadID for later use
		if {$start == 0} {
			set initialThread $thread
		}

		# Just keep track of all this information so we can
		# start executing code in the threads AFTER we have
		# created all the pipes
		lappend toRun [list thread $thread anonymous $anonymous command $command pipeReader $pipeReader pipeWriter $pipeWriter]
		lappend blockThreadList $thread

		set idx $end
	}

	# Keep track of the threadID for the final block
	set finalThread $thread

	# If we are operating syncronously and we were not given a channel to
	# write the final block to then we created a pipe -- monitor that pipe
	# and accumulate its contents into our return buffer for this series
	# of blocks. 
	if {$finalThreadWriter eq "" && !$operateAsync} {
		set finalThreadReader $pipeReader

		set returnValueVariable "[namespace current]::returnValue_$handle"

		fconfigure $finalThreadReader -blocking 0 -translation binary
		fileevent $finalThreadReader readable "append $returnValueVariable \[read $finalThreadReader\]"
	}

	# This is the name of the array that we will be using to get notified
	# of thread termination either by [vwait]'ing it directly or by
	# [trace add variable] invoking is asyncronously if -async is used
	set threadResultVariable "[namespace current]::resultArray_$handle"

	# Create the list of variables and their values to pass to our thread
	# cleanup closure
	set cleanupClosureVars [list]
	foreach var [list handle operateAsync initialThread finalThread initialThreadReader finalThreadWriter copyThread copyThreadChannel onCompleteCleanup threadResultVariable] {
		if {[info exists $var]} {
			lappend cleanupClosureVars $var [set $var]
		}
	}

	# Our thread cleanup closure
	#
	# This is invoked whenever a thread terminates, local variables are passed in using a dict (_vars),
	# each key is the name of a local variable and the value is the value of that variable.
	set cleanupClosure {_vars {
		foreach {_var _val} $_vars {
			set $_var $_val
		}

		_debugLog "\[$thread\]: <-EXIT-"

		thread::send $thread [list foreach chan [dict get $threadToChan $thread] { catch { close $chan } }]

		if {![info exists copyThread]} {
			if {$thread eq $initialThread} {
				if {$initialThreadReader ni [list "" stdin $finalThreadWriter]} {
					thread::send $thread [list thread::detach $initialThreadReader]

					thread::attach $initialThreadReader
				}
			}

			if {$thread eq $finalThread} {
				if {$finalThreadWriter ni [list "" stdout stderr]} {
					thread::send $thread [list thread::detach $finalThreadWriter]

					thread::attach $finalThreadWriter
				}
			}
		}

		_debugLog "\[$thread\]: <-RELEASE-"

		thread::release -wait $thread

		_debugLog "\[$thread\]: <-DONE-"

		if {$finalInvocation} {
			# Work around a bug in Tcl:
			#    http://core.tcl.tk/thread/tktview/84be1b5a7
			if {$operateAsync} {
				after idle [list unset -nocomplain $threadResultVariable]
			} else {
				unset -nocomplain $threadResultVariable
			}

			if {[info exists copyThread]} {
				thread::send $copyThread [list terminate]

				thread::release -wait $copyThread

				thread::attach $copyThreadChannel
			}

			if {[info exists onCompleteCleanup]} {
				uplevel #0 $onCompleteCleanup
			}
		}
	} }

	lappend cleanupClosure [namespace current]

	# If we are operating asyncronously create the thread result variable and
	# add a trace to it.  We do this before invoking the blocks so that if they
	# terminate we are notified of it even if they do so before we are finished
	# requesting all of them start
	if {$operateAsync} {
		array set $threadResultVariable [list]

		trace add variable $threadResultVariable [list write] [list [namespace current]::_invokeClosure [llength $toRun] $cleanupClosureVars $cleanupClosure]
	}

	# Start executing each block, from first to last
	#
	# We send the script to each thread asyncronously and configure
	# it to notify us via a write to an array with the thread ID as
	# the array key
	for {set idx 0} {$idx < [llength $toRun]} {incr idx} {
		set info [lindex $toRun $idx]

		# Determine the input channel and output channel
		## The input channel is the read side of the pipe
		## from the previous block, if there was no previous
		## block then the input channel is the -inchan value
		## (blank by default)
		if {$idx > 0} {
			set inchan [dict get [lindex $toRun $idx-1] pipeReader]
		} else {
			set inchan $initialThreadReader
		}
		set outchan [dict get $info pipeWriter]

		set thread [dict get $info thread]
		set anonymous [dict get $info anonymous]
		set command [dict get $info command]

		# Transfer channels that this thread needs to use to that
		# thread.
		#
		# Keep track of the channels which the thread needs to close
		# when it terminates as well (threadToChan) because we need
		# to explicitly do this to avoid background errors.
		dict set ${threadResultVariable}(threadToChan) $thread [list]

		if {$inchan ne ""} {
			if {$inchan ne [list stdin]} {
				thread::transfer $thread $inchan

				if {$inchan ne $initialThreadReader || [info exists copyThread]} {
					dict lappend ${threadResultVariable}(threadToChan) $thread $inchan
				}
			}
		}

		if {$outchan ne ""} {
			if {$outchan ni [list stdout stderr $inchan]} {
				thread::transfer $thread $outchan

				if {$outchan ne $finalThreadWriter || [info exists copyThread]} {
					dict lappend ${threadResultVariable}(threadToChan) $thread $outchan
				}
			}
		}

		# Clone the current interpreter into the thread interpreter as
		# requested by the user.
		if {$cloneInterpCommand ne ""} {
			$cloneInterpCommand $thread
		}

		# Update the command to execute with the parameters we have determined added.
		if {!$anonymous} {
			lappend command $inchan $outchan
		} else {
			set command "set inchan \"$inchan\";set outchan \"$outchan\"\n$command"
		}

		# Give the thread a variable that lists all the other threads in the
		thread::send $thread [list namespace eval ::pipethread [list set ::pipethread::blockThreadList $blockThreadList]]

		# Store inchan and outchan in a namespace variable so that we can retrieve it
		thread::send $thread [list namespace eval ::pipethread [list set ::pipethread::inchan_NOCLONE $inchan]]
		thread::send $thread [list namespace eval ::pipethread [list set ::pipethread::outchan_NOCLONE $outchan]]

		# Send the command to the thread interpreter
		_debugLog "\[$thread\]: -EXEC-> [string map [list "\n" "\\n"] $command]"
		thread::send -async $thread $command ${threadResultVariable}($thread)
	}

	# At this point, if we are operating asyncronously then we are done.
	# The event loop will invoke our variable traces when the commands specified
	# in the [thread::send] immediately above completes for each block and we
	# will clean those threads then
	if {$operateAsync} {
		# Return something that could be meaningful, the list of
		# threads involved in this group of blocks
		return $blockThreadList
	}

	# If we are operating syncronously then we need to wait here for the result
	# array to be written to...
	if {!$operateAsync} {
		# Add the complete list of channels for each thread to the closure
		# variable list, since we know them all now
		lappend cleanupClosureVars threadToChan [set ${threadResultVariable}(threadToChan)]
		unset ${threadResultVariable}(threadToChan)

		# Wait in the event loop for our variable to be written to.
		# Wait exactly the same number of times as we created threads
		# since each thread will set the variable when it terminates
		for {set idx 0} {$idx < [llength $toRun]} {incr idx} {
			_debugLog "\[---\]: WAITING"
			vwait $threadResultVariable

			# Determine the thread ID from the array key that has
			# been set
			set thread [lindex [array names $threadResultVariable] 0]
			unset ${threadResultVariable}($thread)

			# If this is the last block to complete, note it to pass
			# to the lambda
			if {$idx == ([llength $toRun] - 1)} {
				set finalInvocation true
			} else {
				set finalInvocation false
			}

			# Construct a local copy the local variables dict
			# for the lambda so we can update it with values
			# that are specific to this invokation
			set varsDict $cleanupClosureVars

			lappend varsDict thread $thread
			lappend varsDict finalInvocation $finalInvocation

			# Call the lambda
			apply $cleanupClosure $varsDict
		}

		# At this point all the threads have been terminated, including any
		# final cleanup code
		#
		# If there is no output channel given to the last block that means
		# a pipe was created and we should consume anything that remains in
		# it and close our side (the other side is closed already, all threads
		# are gone) and accumulate it further into our buffer and return
		# that buffer
		if {$finalThreadWriter eq ""} {
			append $returnValueVariable [read $finalThreadReader]

			close $finalThreadReader

			set returnValue [set $returnValueVariable]

			unset $returnValueVariable

			return $returnValue
		}
	}

	return
}

# Return the inchan/outchan
proc ::pipethread::inchan {} {
	if {![info exists ::pipethread::inchan_NOCLONE]} {
		return stdin
	}

	return $::pipethread::inchan_NOCLONE
}

proc ::pipethread::outchan {} {
	if {![info exists ::pipethread::outchan_NOCLONE]} {
		return stdout
	}

	return $::pipethread::outchan_NOCLONE
}

# Provide an infinite buffer that will read from inchan as it is readable and write to
# outchan as it is writable.
proc ::pipethread::infiniteBuffer args {
	if {[llength $args] < 2} {
		return -code error "wrong # args: pipethread::pipe ?args? inchan outchan"
	}

	set inchan [lindex $args end-1]
	set outchan [lindex $args end]
	set args [lrange $args 0 end-2]
	set ::options(-bufferOnWriteError) false

	fconfigure $inchan -blocking false -translation binary
	fconfigure $outchan -blocking false -translation binary

	foreach arg $args {
		switch -- $arg {
			"-bufferOnWriteError" {
				set ::options($arg) true
			}
			"-terminateOnWriteError" {
				set ::options(-bufferOnWriteError) false
			}
		}
	}

	set ::readComplete false
	set ::writeComplete false
	set ::copyComplete false

	fileevent $inchan readable [list apply {{inchan outchan} {
		set data [read $inchan]

		if {$data ne ""} {
			append ::buffer $data
		} else {
			if {[eof $inchan]} {
				fileevent $inchan readable {}

				set ::readComplete true
			}
		}

		if {$::writeComplete} {
			unset -nocomplain ::buffer

			if {$::readComplete} {
				set ::copyComplete true
			}

			return
		}


		if {[fileevent $outchan writable] eq ""} {
			fileevent $outchan writable [list apply {{outchan} {
				if {[info exists ::buffer]} {
					if {[catch {
						puts -nonewline $outchan $::buffer
						flush $outchan
						unset ::buffer
					}]} {
						if {$::options(-bufferOnWriteError)} {
							set ::writeComplete true
						} else {
							set ::copyComplete true
						}
					}

				}

				fileevent $outchan writable {}

				if {$::readComplete} {
					set ::copyComplete true
				}
			}} $outchan]
		}
	}} $inchan $outchan]

	vwait ::copyComplete
}

# The found thread may not exist or it may be a different process altogether
proc ::pipethread::getThreadId {relative} {
	set threadIndex [lsearch -exact $::pipethread::blockThreadList [thread::id]]
	set foundThread [lindex $::pipethread::blockThreadList [expr {$threadIndex+$relative}]]

	return $foundThread
}

# XXX: This may go away or be replaced by something better -- its just an experiment
proc ::pipethread::putEof args {
	if {[lindex $args 0] eq "-position"} {
		set position [lindex $args 1]

		set args [lrange $args 2 end]
	} else {
		set position 1
	}

	if {[llength $args] != 1} {
		return -code error "wrong # args: should be putEof ?-position <position>? handle"
	}

	set targetThread [getThreadId $position]

	set handle [lindex $args 0]

	if {[catch {
		thread::send $targetThread [list namespace eval ::pipethread [list lappend ::pipethread::incoming_NOCLONE($handle) [list eof]]]
	}]} {
		return false
	}

	return true
}

proc ::pipethread::put args {
	if {[lindex $args 0] eq "-position"} {
		set position [lindex $args 1]

		set args [lrange $args 2 end]
	} else {
		set position 1
	}

	if {[llength $args] != 2} {
		return -code error "wrong # args: should be put ?-position <position>? handle message"
	}

	set targetThread [getThreadId $position]

	set handle [lindex $args 0]
	set message [lindex $args 1]

	if {[catch {
		thread::send $targetThread [list namespace eval ::pipethread [list lappend ::pipethread::incoming_NOCLONE($handle) [list data $message]]]
	}]} {
		return false
	}

	return true
}

proc ::pipethread::get {handle messageVar} {
	namespace eval ::pipethread {}

	set haveMessage false
	if {[info exists ::pipethread::incoming_NOCLONE($handle)]} {
		if {[llength $::pipethread::incoming_NOCLONE($handle)] > 0} {
			set haveMessage true
		}
	}

	if {!$haveMessage} {
		vwait ::pipethread::incoming_NOCLONE($handle)
	}

	set messageType [lindex $::pipethread::incoming_NOCLONE($handle) 0 0]

	switch -- $messageType {
		"eof" {
			unset ::pipethread::incoming_NOCLONE($handle)

			return false
		}
		"data" {
			upvar $messageVar message

			set message [lindex $::pipethread::incoming_NOCLONE($handle) 0 1]
		}
	}

	set ::pipethread::incoming_NOCLONE($handle) [lrange $::pipethread::incoming_NOCLONE($handle) 1 end]

	return true
}

package provide pipethread 0.1

if {[info exists ::pipethread::demos]} {
package forget pipethread
################################################################################
### DEMOS ######################################################################
################################################################################

# Define some basic procedures
package require Ttrace
ttrace::enable
proc a {inchan outchan} {
	puts "A: STARTING"
	puts $outchan a
	puts $outchan b
	puts $outchan what
	puts $outchan whatwhat
	puts $outchan c
	puts "A: ENDING"
}

proc b {arg inchan outchan} {
	puts "B: STARTING"
	while true {
		gets $inchan line
		if {[eof $inchan] && $line == ""} {
			break
		}

		if {![string match "$arg*" $line]} {
			continue
		}

		puts $outchan $line
		puts "B: SENDING: $line"
	}
	puts "B: ENDING"
}

proc c {inchan outchan} {
	puts "C: STARTING"
	gets $inchan line
	puts "C: Got: $line"
	puts "C: ENDING"
}

# Start doing things
set what what
ttrace::disable

# Output:
#         --- DEMO 1 ---
#         A: STARTING
#         A: ENDING
#         B: STARTING
#         C: STARTING
#         B: SENDING: what
#         B: SENDING: whatwhat
#         B: ENDING
#         C: Got: what
#         C: ENDING
puts "--- DEMO 1 ---"
::pipethread::pipe -cloneInterpCommand _TTRACE a | b $what | c

# Output:
#         --- DEMO 2 ---
#         A: STARTING
#         A: ENDING
#         C: STARTING
#         C: Got: ok
#         C: ENDING
puts "--- DEMO 2 ---"
::pipethread::pipe a | { puts $outchan ok } | c

# Output:
#         --- DEMO 3 ---
#         c.1
#         b.1
#         a.1
#         <blank line>
#
# Note the extra blank line at the bottom
# This is because [puts $outchan $item.1] writes a newline
# and puts [pipethread::pipe] also writes a newline
puts "--- DEMO 3 ---"
puts [pipethread::pipe { puts $outchan "a"; puts $outchan "b"; puts $outchan "c" } | { foreach item [lreverse [read $inchan]] { puts $outchan $item.1 } }]

# Output:
#         --- DEMO 4 ---
#         1:a
#         2:a
#         3:a
#         1:b
#         2:b
#         3:b
#         1:c
#         2:c
#         3:c
puts "--- DEMO 4 ---"
pipethread::pipe {
	puts $outchan "a"
	puts $outchan "b"
	puts $outchan "c"
} | lmap line { list 1:$line 2:$line 3:$line } | foreach line {
	puts $line
}

# Output:
#         --- DEMO 5 ---
#         LINEx1: 4
#         LINEx2: AGAIN: 5
puts "--- DEMO 5 ---"
pipethread::pipe {
	puts $outchan 1
	puts $outchan 2
	puts $outchan 3
	puts $outchan 4
	puts $outchan 5
} | {
	pipethread::pipe -inchan $inchan -outchan $outchan {
		gets $inchan ignore
		gets $inchan accept
		puts $outchan $accept
		gets $inchan ignore
		gets $inchan accept
		puts $outchan $accept
	} | {
		gets $inchan ignore
		gets $inchan accept
		puts $outchan $accept
	}

	pipethread::pipe -inchan $inchan -outchan $outchan {
		gets $inchan line
		puts $outchan "AGAIN: $line"
	}
} | {
	gets $inchan line
	puts "LINEx1: $line"

	gets $inchan line
	puts "LINEx2: $line"
}

# Experiment
#         --- DEMO 6 ---
#         HELLO
puts "--- DEMO 6 ---"
pipethread::pipe -outchan stdout {
	pipethread::put default "HELLO"
	pipethread::putEof default
	while true {
		if {![pipethread::put default "WORLD"]} {
			break
		}
	}
} | {
	pipethread::get default line
	puts $outchan $line
}

# Overloaded exec demo
# Output:
#         --- DEMO 7 ---
#         <pwd>
#         <hostname>
puts "--- DEMO 7 ---"
pipethread::pipe -outchan stdout {
	puts $outchan [info hostname]
	puts $outchan [pwd]
} | exec tac

# Socket demo: Creates a set of threads for every socket created
# When a client connects to the socket the conversation is as follows:
#         [file9] HELLO !  You are 1 !
#         <wait for a line of input>
#         [file9] GOOD TO KNOW: <the input from above> from 127.0.0.1/47966 [file10<->file13]
#         <connection closed>
#
# and the output looks like:
#         --- DEMO SOCKET ---
#         CLOSING SOCK sock7fa6bc03b630
puts "--- DEMO SOCKET ---"
proc handleConnection {sock addr port} {
	incr ::connectionNumber

	pipethread::pipe -async "puts stderr \"CLOSING SOCK $sock\"; close $sock" -inchan $sock -outchan $sock {
		puts $outchan "HELLO !  You are $::connectionNumber !"
		flush $outchan
		gets $inchan line
		puts $outchan "GOOD TO KNOW: $line from $addr/$port \[$inchan<->$outchan\]"
	} | {
		while true {
			gets $inchan line
			if {[eof $inchan] && $line eq ""} {
				break
			}

			puts $outchan "\[$outchan\] $line"
			flush $outchan
		}
	}
}

proc handleConnectionFake {sock addr port} {
	after 1 [list handleConnection $sock $addr $port]
}

socket -server handleConnectionFake 8410

vwait forever
}