ycl

Artifact [2b50128fe9]
Login

Artifact [2b50128fe9]

Artifact 2b50128fe9d52908650151f5ba0e7fd896ab72e2:


#! /bin/env tclsh

proc suite_main {} {
	package require {ycl dict}
	namespace import [yclprefix]::dict::getdefault
	package require {ycl list}
	namespace import [yclprefix]::list::sl
	package require {ycl test}
	variable cleanup1
	namespace upvar [yclprefix]::test cleanup1 cleanup1
	[yclprefix]::test::init
	package require [list ycl daerth]
	namespace import [yclprefix]::daerth
	namespace import [yclprefix]::daerth::station

	variable prefix

	set setup1 [set [yclprefix]::test::setup1]\n[set [namespace current]::setup1]
	set cleanup1 [set [yclprefix]::test::cleanup1]

	test daerth {} -setup $setup1 -body {
		foreach {consumercount filtercount producercount} [sl {
			1 1 1
			1 1 2
			1 1 3
			1 1 4
			1 2 1
			1 3 1
			1 4 1
			1 4 2
		}] {
			factory1 station_produce_odd_numbers 500 $consumercount $filtercount $producercount
			vwait [namespace current]::done
			set list1 [lsort -integer [::tsv::get tres list]]
			set list2 [lsort -integer [::tsv::get tres2 list]]
			set length_expected [expr {500 * $producercount * $filtercount / 2}] 
			if {[llength $list1] != $length_expected} {
				lappend res [list {list1 length} [llength $list1] vs $length_expected \
					$consumercount $filtercount $producercount]
			}
			if {[llength $list2] != [llength $list1]} {
				lappend res [list {list2 length} [llength $list2] vs {list1 length} [
					llength $list1]]
			}
			for {set i 0} {$i < [llength $list1]} {incr i} {
				if {[lindex $list2 $i] != [lindex $list1 $i] * 2} {
					lappend res [list {list2 not double list1} $i [lindex $list1 $i] [
						lindex $list2 $i]]
					return $res
				}
			}
			for {set i [expr [llength $list1] - 10]} {$i < [llength $list1]} {incr i} {
				if {[lindex $list2 $i] != [lindex $list1 $i] * 2} {
					lappend res [list {list2 not double list1} $i [lindex $list1 $i] [
						lindex $list2 $i]]
					return $res
				}
			}
		}
		set res
	} -cleanup $cleanup1 -result [sl {
	}]

	test daerth_primes {} -setup $setup1 -body {
		foreach {consumercount filtercount producercount} [sl {
			1 1 1
		}] {
			factory1 station_produce_primes 500 $consumercount $filtercount $producercount
			vwait [namespace current]::done
			set list1 [lsort -integer [::tsv::get tres list]]
			set list2 [lsort -integer [::tsv::get tres2 list]]
			for {set i 0} {$i < [llength $list1]} {incr i} {
				if {[lindex $list2 $i] != [lindex $list1 $i] * 2} {
					lappend res [list {list2 not double list1} $i [lindex $list1 $i] [
						lindex $list2 $i]]
					return $res
				}
			}
		}
	} -cleanup $cleanup1 -result {}

	# Warning, this can take several minutes to run
	test daerth_primes_loadbalanced {} -setup $setup1 -body {
		foreach {consumercount filtercount producercount} [sl {
			1 1 1
			#this have about twice the performance of the previous run
			1 2 1 
		}] {
			factory_primes_loadbalanced 1000 $consumercount $filtercount $producercount
			vwait [namespace current]::done
			set list1 [lsort -integer [::tsv::get tres list]]
			lappend res [llength $list1]
		}
		set res
	} -cleanup $cleanup1 -result [sl {
		167 167
		
	}]
	
	cleanupTests
}

proc go producers {
	foreach worker $producers {
		#puts stderr [list [thread::id] telling $worker to eat]
		thread::send -async $worker eat 
	}
}

proc alldone master {
	variable prefix
	#alldone must respond immediately so that the caller isn't waiting
	after 0 [list after idle [list apply [list {master prefix} {
		#puts stderr [list telling $master to quit]
		thread::send -async $master ${prefix}::quit

		incr [namespace current]::done
		# puts stderr [::tsv::llength result list]
		# exit 0
	} [namespace current]] $master $prefix]]
}

proc bgerror {res opts} {
	variable bgres 
	variable bgopts
	variable done
	set bgres $res
	set bgopts $opts
	set done 1
}

proc factory1 {of chunk consumercount filtercount producercount} {
	variable prefix
	#puts stderr [list {main thread is} [thread::id]]
	set daerth [daerth new onfinish [
		list [thread::id] [namespace current]::alldone]]
	#puts stderr [list [thread::id] {daerth is} $daerth]

	## leaf consumers
	set consumers [station_push_tres2 $consumercount]
	#puts stderr [list [thread::id] consumers are $consumers]
	thread::send -async $daerth [list ${prefix}::add consumers $consumers]

	# These filters are fed by the producers, and feed the leaf consumers.
	set filters [station_push_tres $filtercount]
	foreach station $filters {
		thread::send -async $station [list ${prefix}::consumers {*}$consumers]
	}
	thread::send -async $daerth [list ${prefix}::add filters $filters] 

	# These producers consume nothing.
	set producers [$of $chunk $producercount] 
	foreach station $producers {
		thread::send -async $station [list ${prefix}::consumers {*}$filters]
	}
	#puts stderr [list [thread::id] adding producers $producers]
	thread::send -async $daerth [list ${prefix}::add producers $producers]
	#puts stderr [list {producers are} $producers]

	tsv::set tres list {} 
	tsv::set tres2 list {} 
	#puts [list [thread::id] {sending to daerth} ${prefix}::ready [thread::id] [namespace current]::go]
	thread::send -async $daerth [list ${prefix}::ready [thread::id] [namespace current]::go]
}

proc factory_primes_loadbalanced {chunk consumercount filtercount producercount} {
	variable prefix
	set daerth [daerth new onfinish [
		list [thread::id] [namespace current]::alldone]]
	#puts stderr [list [thread::id] daerth $daerth]

	set producers [station_produce_odd_numbers $chunk $producercount]
	thread::send -async $daerth [list ${prefix}::add producers $producers]
	#puts stderr [list [thread::id] producers $producers]

	set consumers [station_push_tres 1] 
	thread::send -async $daerth [list ${prefix}::add consumers $consumers]
	#puts stderr [list [thread::id] consumers $consumers ]

	set filters [station_isprime $filtercount]
	foreach filter $filters {
		thread::send -async $filter [list ${prefix}::consumers {*}$consumers]
	}
	thread::send -async $daerth [list ${prefix}::add filters $filters]
	#puts stderr [list [thread::id] filters $filters]

	foreach producer $producers {
		thread::send -async $producer [list ${prefix}::select balance]
		thread::send -async $producer [list ${prefix}::consumers {*}$filters]
		#comment this line out and watch performance sink
	}

	tsv::set tres list {} 
	thread::send -async $daerth [list ${prefix}::ready [thread::id] [namespace current]::go]
}

proc station_push_tres2 count {
	station new count $count limit 10 script {
		variable work {
			lassign $data[set data {}] value
			#puts stderr [list [thread::id] consumer received $value]
			tsv::lpush tres2 list [expr {$value * 2}]
		}
	}
}

proc station_push_tres count {
	station new count $count limit 10 script {
		variable work {
			# set last [::tsv::lpop result list]

			#puts stderr [list [thread::id] {working on} [list $data]]
			lassign $data[set data {}] value
			# do not use lappend!  It returns the list, and therefore is dog-slow!
			# ::tsv::lappend tres list [thread::id]::hello ]

			# ::tsv::linsert is ok
			# ::tsv::linsert tres list end [thread::id]::$value

			::tsv::lpush tres list $value 
			#puts [list [thread::id] {feeding value} [list $value]]
			feed $value 

			# artificial delay
			# after 20 
		}
	}
}

proc station_produce_odd_numbers {chunk count} {
	for {set i 0} {$i < $count * $chunk} {incr i $chunk} {
		set station [station new limit 10 script {

			proc isprime x {
				expr {$x > 1 && ![regexp {^(oo+?)\1+$} [string repeat o $x]]}
			}

			variable init {
				lassign $args first count
				if {$first % 2 == 0} {
					incr first 
				}
				set last [expr {$first + $count}]
			}

			variable work {
				# puts stderr [list [thread::id] produce primes]
				for {set i $first} {$i < $last} {incr i 2} {
					set consumer [feed $i]
				}
				# puts stderr [list [thread::id] producer done]
				done
			}

		} args $i $chunk] 

		lappend stations $station
	}
	return $stations
}

proc station_produce_primes {chunk count} {
	for {set i 0} {$i < $count * $chunk} {incr i $chunk} {
		set station [station new limit 10 script {

			#an progessively slower checker, good for these unit tests
			proc isprime x {
				expr {$x > 1 && ![regexp {^(oo+?)\1+$} [string repeat o $x]]}
			}

			variable init {
				lassign $args first count
				if {$first % 2 == 0} {
					incr first 
				}
				set last [expr {$first + $count}]
			}

			variable work {
				# puts stderr [list [thread::id] produce primes]
				for {set i $first} {$i < $last} {incr i 2} {
					#puts stderr [list [thread::id] checking $i]
					if {[isprime $i] == 1} {
						#puts stderr [list [thread::id] prime $i]
					    feed $i
					}
				}
				# puts stderr [list [thread::id] producer done]
				done
			}

		} args $i $chunk] 

		lappend stations $station

		# simulate workers at different rates
		# thread::send $worker [list incr timer [incr delay 100]]
		# thread::send $worker [list set delay 0]
	}
	return $stations
}

proc station_isprime count {
	station new count $count limit 10 script {

		proc isprime x {
			expr {$x > 1 && ![regexp {^(oo+?)\1+$} [string repeat o $x]]}
		}

		variable init {
			lassign $args tsvvar
		}

		variable work {
			lassign $data[set data {}] candidate
			if {[isprime $candidate] == 1} {
				#puts stderr [list [thread::id] prime $candidate]
				feed $candidate
			}
		}
	}
}

variable prefix ::ws

variable setup1 {
	variable bgres
	variable bgopts
	variable res {}
}


interp bgerror {} [namespace current]::bgerror