ycl

Artifact [fbd1486756]
Login

Artifact [fbd1486756]

Artifact fbd14867560a25af621a23dd66644325692fa3f3:


#! /bin/env tclsh

package require Thread

package require {ycl proc}
namespace import [yclprefix]::proc::checkargs

variable doc::tplex {
	description {

		Connect the outputs of one channel to the inputs of zero or more
		channels.
		
		Returns a list containing a new channel and a thread id.  The  new
		channel has all the attributes of $chan . The new channel is a pipe
		whose other end is in a separate thread which buffers the output of
		$chan , allowing the new channel to act as a synchronous channel , with
		the asynchronous event queue activity confined to the separate thread .

		The buffer uses an additive-increase/multiplicative-decrease strategy
		to avoid overflowing memory, and also to avoid congestion in
		multiplexing scenarios.

		By default, $chan is closed when [eof $chan] is true, and the close
		information stored in 

			$closestatus

			$closeres

			$closeopts

		To use tplex as a sink, set $background to true and don't provide $to.
	}
	args {
		chan {
			description {
				The channel to accumulate data from . 
			}
			positional true
		}
		background {
			description {
				If true, don't return a read channel.
			}
			default {lindex false}
		}
		buffsize {
			description {
				The size of the write buffer .
			}
			default {lindex 8192}
		}
		connect {
			description {
				What sort of connection to establish to the tplex instance.
				"background" means no connection, "read" means a channel from
				which the multiplexed data can be read, and "wait" means a
				channel that becomes readable and reaches EOF when the tplex
				instance source channel is closed.
			}
			validate {$connect in {background read wait}}
			default {lindex read}
		}
		eofaction {
			description {
				What to do when [eof $chan] is true.
			}
			validate {$eofaction in {close detach nothing}}
			default {lindex close}
		}
		init {
			description {
				A command prefix to execute to initialize the thread .
				$chan and a list of output channels are appended to the prefix .
			}
			default {lindex [list apply {{chan to} {}}]}
		}
		persist {
			description {
				Don't release the thread once all the channels are closed.
			}
			default {lindex false} 
		}
		paused {
			description {
				Create the buffer in a paused state.  The buffer can be started
				by executing [run] in the buffer's thread.
			}
			default {lindex false}
		}
		readsize {
			description {
				The number of characters to read at a time 
			}
			default {lindex 8192}
		}
		to {
			description {
				A list of channels to multiplex the data from $chan to , rather
				than creating and returning a new proxy chan .
			}
			default {lindex {}}
		}
	}
	implementation {
		Currently , channels are closed on EOF because there isn't any way to
		signal an EOF on a pipe other than closing it . If that ever changes,
		modify this code to take advantabe of that . 
	}
}

proc tplex {chan args} {
	variable state
	set donechan {} 
	checkargs [set doc::[lindex [info level 0] 0]] {*}$args
	set tid [thread::create]
	switch $connect {
		background {
			set returnchan {}
		}
		default {
			lassign [chan pipe] pr pw
			chan configure $pr {*}[chan configure $chan]
			chan configure $pr -blocking 1
			chan configure $chan -translation binary
			switch $connect {
				read {
					chan configure $pw -blocking 0
					lappend to $pw
				}
				wait {
					set donechan $pw 
				}
			}
			set returnchan $pr
		}
	}
		
	chan configure $chan -blocking 0
	foreach toname $to {
		thread::transfer $tid $toname
	}
	thread::transfer $tid $chan
	thread::send $tid [list {*}$init $chan $to]
	thread::send $tid [list variable buffsize $buffsize]
	thread::send $tid [list variable donechan $donechan]
	thread::send $tid [list variable eofaction $eofaction]
	thread::send $tid [list variable persist $persist]
	thread::send $tid [list variable readsize $readsize]
	thread::send $tid [list variable to $to]
	thread::send $tid [list apply [list chan {

		variable wait 10
		set [namespace current]::chan $chan

		proc run {} {
			variable chan
			chan event $chan readable [list apply [list chan {
				variable buffsize
				variable closeres
				variable eofaction
				variable closeopts
				variable closestatus
				variable done 0
				variable readsize
				variable to
				variable wait
				if {[eof $chan]} {
					switch $eofaction {
						close {
							set closestatus [catch {close $chan} closeres closeopts]
						}
						detach {
							thread::detach $chan
						}
						nothing {
						}
					}
					# Blocking channel caused a deadlock for some reason.
					# Maybe a bug.  See
					#chan configure $pw -blocking 1
					#chan close $pw

					# Todo: is the blocking close above still causing deadlock in
					# newer versions of Tcl?
					foreach to1 $to {
						if {$to1 in {stdout stderr}} {
							flush $to1
							continue
						}
						chan event $to1 writable [list apply [list to1 {
							variable donechan
							variable persist
							variable to
							if {[chan pending output $to1]} {
								flush $to1
							} else {
								# Todo: is there any way yet in Tcl to signal an EOF on a pipe
								# short of closing the pipe ?
								chan configure $to1 -blocking 1
								close $to1
								foreach to2 $to {
									if {$to2 in [chan names] && $to2 ni [
										list $donechan stdout stderr]} {
										return
									}
								}
								if {$donechan ne {}} {
									close $donechan
								}
								if {$persist} {
									set [namespace current]::done 1
								} else {
									thread::release
								}
							}

						} [namespace current]] $to1]
					}
				} else {
					foreach to1 $to {
						if {[chan pending output $to1] > $buffsize} {
							after $wait
							set wait [expr {$wait + max(entier($wait * .20), 1)}]
							return
						}
					}
					set data [read $chan $readsize]
					if {$data ne {}} {
						foreach to1 $to {
							puts -nonewline $to1 $data[set data {}]
						}
					}
					if {$wait > 10} {
						set wait [expr {entier($wait / 2)}]
						if {$wait < 10} {
							set wait 10
						}
					}
				}
			} [namespace current]] $chan]
		}

		proc subscribe chan {
			variable to 
			if {$chan ni $to} {
				lappend chan $to
			}
		}
	}] $chan]
	if {!$paused} {
		thread::send -async $tid run
	}
	return [list $returnchan $tid]
}