ycl

Artifact [1c264ed8b9]
Login

Artifact 1c264ed8b95dcda0a2441b6f3b8119e570f9b4b7:


#! /bin/env tclsh


package require {ycl test}

proc suite_main {} {
	package require {ycl dict}
	namespace import [yclprefix]::dict::getdefault
	package require {ycl list}
	namespace import [yclprefix]::list::sl
	variable cleanup1
	namespace upvar [yclprefix]::test cleanup1 cleanup1
	[yclprefix]::test::init
	namespace import [yclprefix]::test::cleanup1
	package require [list ycl daerth]
	namespace import [yclprefix]::daerth
	namespace import [yclprefix]::daerth::queue

	variable setup1 {
	}


	test basic {} -body {
		variable res

		set tnames [thread::names]
		set length1 [llength $tnames]
		#puts stderr [list {existing threads} $tnames]

		[daerth queue new queue1] .init
		[daerth queue new queue2] .init
		[daerth queue new queue3] .init

		set station1 [daerth transport [queue1 thread] [queue2 thread] {{daerth number} {
			while 1 {
				# puts stderr [list [thread::id] station1 working on $number]
				lassign [$daerth deliver [
					expr {$number + $number}]] number
			}
		}}]
		#puts stderr [list station1 $station1]
		

		set station2 [daerth transport [queue2 thread] [queue3 thread] \
			{{daerth number} {
				while 1 {
					# puts [list [thread::id] station2 working on $number]
					lassign [$daerth deliver [
						list $number [expr {$number + 1}]]] number
				}
			}}
		]


		set producer [daerth producer [queue1 thread] {{daerth {
			for {set i 0} {$i < 10} {incr i} {
				$daerth deliver $i
			}
			# puts stderr [list [thread::id] leaving work]
		}}}]
		#puts stderr [list producer $producer]

		collect queue3

		queue1 exit
		queue2 exit
		queue3 exit

		set elapsed 0
		set wait 100
		while {$elapsed < 30000} {
			set tnames2 [thread::names]
			set length2 [llength $tnames2]
			set res2 [expr {$length1 == $length2}]
			if {$length1 == $length2} break
			after $wait
			incr elapsed 100
		}

		lappend res $res2
		return $res
	} -cleanup [cleanup1] -result [sl {
		{0 1} {2 3} {4 5} {6 7} {8 9} {10 11} {12 13} {14 15} {16 17} {18 19} 1
	}]


	test daerth {} -setup $setup1 -body {
		variable res
		set tnames [thread names]
		set res2 {}
		foreach {consumercount filtercount producercount} [sl {
			1 1 1
			1 1 2
			1 1 3
			1 1 4
			1 2 1
			1 3 1
			1 4 1
			1 4 2
		}] {

			factory1 [list station_produce_odd_numbers 500 $producercount] $consumercount $filtercount
			set list1 [lsort -integer [::tsv::get tres list]]
			set res [lsort -integer $res[set res {}]]
			if {[llength $res] != [llength $list1]} {
				lappend res2 [list {res length} [llength $res] vs {list1 length} [
					llength $list1]]
			}
			for {set i 0} {$i < [llength $list1]} {incr i} {
				if {[lindex $res $i] != [lindex $list1 $i] * 2} {
					lappend res [list {res not double list1} $i [lindex $list1 $i] [
						lindex $res $i]]
					return $res2
				}
			}
			for {set i [expr {[llength $list1] - 10}]} {$i < [llength $list1]} {incr i} {
				if {[lindex $res $i] != [lindex $list1 $i] * 2} {
					lappend res [list {res not double list1} $i [lindex $list1 $i] [
						lindex $res $i]]
					return $res2
				}
			}
			lappend res2 done
		}
		set tnames2 [thread names]
		lappend res2 [expr {[llength $tnames] == [llength $tnames2]}]
		return $res2
	} -cleanup [cleanup1] -result [sl {
		done done done done done done done done 1
	}]


	test daerth_primes {} -setup $setup1 -body {
		variable res
		set tnames [thread names]
		foreach {consumercount filtercount producercount} [sl {
			1 1 1
		}] {
			factory1 [list station_produce_primes 500 $producercount] $consumercount $filtercount
			set list1 [lsort -integer [::tsv::get tres list]]
			set res [lsort -integer $res[set res {}]]
			for {set i 0} {$i < [llength $list1]} {incr i} {
				if {[lindex $res $i] != [lindex $list1 $i] * 2} {
					lappend res2 [list {res not double list1} $i [lindex $list1 $i] [
						lindex $res $i]]
					return $res2
				}
			}
			lappend res2 done
		}
		set tnames2 [thread names]
		lappend res2 [expr {[llength $tnames] == [llength $tnames2]}]
		return $res2
	} -cleanup [cleanup1] -result [sl {
		done 1
	}]

	# Warning, this can take several minutes to run
	test daerth_primes_loadbalanced {} -body {
		set res2  {}
		set tnames [thread names]
		foreach {consumercount filtercount producercount} [sl {
			1 1 1
			# this runs about twice as fast as the previous run
			1 2 1 
		}] {
			factory_primes_loadbalanced [
				list station_produce_odd_numbers 1000 $producercount] \
					$consumercount $filtercount
			set list1 [lsort -integer [::tsv::get tres list]]
			lappend res2 [llength $list1]
		}
		set tnames2 [thread names]
		lappend res2 [expr {[llength $tnames] == [llength $tnames2]}]
		set res2
	} -cleanup [cleanup1] -result [sl {
		167 167 1
	}]


	test pipeline {} -body {
		variable res
		set names [lsort [thread::names]]
		daerth pipeline p1 {
			{
				{comm {
					set x 0
					for {set i 0} {$i < 10} {incr i} {
						$comm deliver [incr x $i]
					}
				}}
			}
		} {
			{
				{{comm data} {
					while 1 {
						lassign [$comm deliver [list $data $data]] data
					}
				}}
			}
		} {
			{
				{{comm data} {
					lassign $data a1 a2
					while 1 {
						lassign [$comm take] b1 b2 
						lassign [$comm deliver [list $a1 $a2 $b1 $b2]] a1 a2
					}
				}}
			}
		} 

		coroutine collect1 ::apply [list pipe {
			yield
			while 1 {
				lappend res [$pipe]

			}
			set [namespace current]::res $res
		} [namespace current]] [namespace which p1]

		after 0 [list [namespace which collect1]]

		vwait [namespace current]::res

		for {set i 0} {$i < 100} {incr i} {
			set clean clean
			foreach name1 [lsort [thread::names]] name2 $names {
				if {$name1 ne $name2} {
					set clean dirty
				}
			}
			if {$clean eq {clean}} break
			after 100 
		}
		lappend res $clean
		return $res
	} -cleanup [cleanup1] -result [sl {
		{0 0 1 1} {3 3 6 6} {10 10 15 15} {21 21 28 28} {36 36 45 45} clean
	}]

	cleanupTests
}

	

proc go producers {
	foreach worker $producers {
		# puts stderr [list [thread::id] telling $worker to receive]
		thread::send -async $worker receive 
	}
}


proc alldone master {
	# alldone must respond immediately so that the caller isn't waiting
	after 0 [list after idle [list apply [list {master prefix} {
		# puts stderr [list telling $master to quit]
		thread::send -async $master [daerth prefix]::quit

		incr [namespace current]::done
		# puts stderr [::tsv::llength result list]
		# exit 0
	} [namespace current]] $master [daerth prefix]]]
}


proc bgerror {eres opts} {
	variable bgopts
	variable res
	puts stderr [namespace current]::bgerror:\n[dict get $opts -errorinfo]
	set bgopts $opts
	set res $eres 
}


proc collect queue {
	after 0 [list coroutine collector ::apply [list queue {
		variable res
		set res1 {}
		while 1 {
			# puts stderr [list [thread::id] collecting from $queue [$queue thread]]
			lappend res1 [$queue take]
		}
		set res $res1
	} [namespace current]] $queue]
	
	vwait [namespace current]::res
}


proc factory1 {producers consumercount filtercount} {

	tsv::set tres list {} 

	[daerth queue new queue1] .init
	[daerth queue new queue2] .init
	[daerth queue new queue3] .init
	# puts stderr [list queue1 is [queue1 thread]]
	# puts stderr [list queue2 is [queue2 thread]]

	set producers [{*}$producers [queue1 thread]]

	# These filters are fed by the producers, and produce to  the leaf consumers.
	set filters [station_push_tres [queue1 thread] [queue2 thread] $filtercount]

	## leaf consumers
	set consumers [station_push_tres2 [queue2 thread] [queue3 thread] $consumercount]

	collect queue3

	queue1 exit
	queue2 exit
	queue3 exit
}


proc factory_primes_loadbalanced {producers consumercount filtercount} {

	tsv::set tres list {} 

	[daerth queue new queue1] .init
	[daerth queue new queue2] .init
	[daerth queue new queue3] .init
	# puts stderr [list queue1 is [queue1 thread]]
	# puts stderr [list queue2 is [queue2 thread]]
	# puts stderr [list queue3 is [queue3 thread]]


	set producers [{*}$producers [queue1 thread]]
	# puts stderr [list producers are $producers]
	set filters [station_isprime [queue1 thread] [queue2 thread] $filtercount]
	# puts stderr [list filters are $filters]
	set consumers [station_push_tres [queue2 thread] [queue3 thread] $consumercount] 

	collect queue3

	queue1 exit
	queue2 exit
	queue3 exit
}


proc station_isprime {source dest count} {
	set transports [daerth station count $count transport [
		list $source $dest {{daerth candidate} {
			proc isprime x {
				expr {$x > 1 && ![regexp {^(oo+?)\1+$} [string repeat o $x]]}
			}
			while 1 {
				if {[isprime $candidate] == 1} {
					# puts stderr [list [thread::id] prime $candidate]
					lassign [$daerth deliver $candidate] candidate
				} else {
					lassign [$daerth take] candidate
				}
			}
		}}]
	]
}


proc station_produce_odd_numbers {chunk count thread} {
	set producers {}
	for {set i 0} {$i < $count * $chunk} {incr i $chunk} {

		lappend producers [daerth producer $thread [
			list {{first count daerth} {
				if {$first % 2 == 0} {
					incr first 
				}
				set last [expr {$first + $count}]

				# puts stderr [list [thread::id] produce odd numbers]
				for {set i $first} {$i < $last} {incr i 2} {
					$daerth deliver $i
				}
				# puts stderr [list [thread::id] done producing odd numbers]
		}} $i $chunk]]
	}
	return $producers
}


proc station_produce_primes {chunk count thread} {
	set producers {} 
	for {set i 0} {$i < $count * $chunk} {incr i $chunk} {
		lappend producers [daerth producer $thread [list {{first count daerth} {
				# a progessively slower checker, good for these unit tests
				proc isprime x {
					expr {$x > 1 && ![regexp {^(oo+?)\1+$} [string repeat o $x]]}
				}

				if {$first % 2 == 0} {
					incr first 
				}
				set last [expr {$first + $count}]

				for {set i $first} {$i < $last} {incr i 2} {
					# puts stderr [list [thread::id] checking $i]
					if {[isprime $i] == 1} {
						# puts stderr [list [thread::id] prime $i]
						$daerth deliver $i
					}
				}
				# puts stderr [list [thread::id] done producing primes]
		}} $i $chunk]]
	}
	# puts stderr [list producers are $producers]
	return $producers
}


proc station_push_tres {source dest count} {
	set transports {}
	for {set i 0} {$i < $count} {incr i} {
		lappend transports [daerth transport $source $dest {{daerth value} {
			set count 0
			while 1 {
				::tsv::lpush tres list $value 
				incr count
				lassign [$daerth deliver $value] value 
			}
			# puts stderr [list [thread::id] push_tres produced $count]
		}}]
	}
	return $transports
}


proc station_push_tres2 {source dest count} {
	set transports [daerth station count $count transport [
		list $source $dest {{daerth value} {
			set count 0
			while 1 {
				# puts stderr [list [thread::id] consumer received $value for $deliver]
				incr count
				lassign [$daerth deliver [expr {$value * 2}]] value
			}
			# puts stderr [list [thread::id] push_tres2 produced $count]
		}}
	]]
	# puts stderr [list transports for push_tres2 are $transports]
}


interp bgerror {} [namespace current]::bgerror