#! /bin/env tclsh
proc suite_main {} {
package require {ycl dict}
namespace import [yclprefix]::dict::getdefault
package require {ycl list}
namespace import [yclprefix]::list::sl
package require {ycl test}
variable cleanup1
namespace upvar [yclprefix]::test cleanup1 cleanup1
[yclprefix]::test::init
package require [list ycl daerth]
namespace import [yclprefix]::daerth
namespace import [yclprefix]::daerth::station
variable prefix
set setup1 [set [yclprefix]::test::setup1]\n[set [namespace current]::setup1]
set cleanup1 [set [yclprefix]::test::cleanup1]
test daerth {} -setup $setup1 -body {
foreach {consumercount filtercount producercount} [sl {
1 1 1
1 1 2
1 1 3
1 1 4
1 2 1
1 3 1
1 4 1
1 4 2
}] {
factory1 station_produce_odd_numbers 500 $consumercount $filtercount $producercount
vwait [namespace current]::done
set list1 [lsort -integer [::tsv::get tres list]]
set list2 [lsort -integer [::tsv::get tres2 list]]
set length_expected [expr {500 * $producercount * $filtercount / 2}]
if {[llength $list1] != $length_expected} {
lappend res [list {list1 length} [llength $list1] vs $length_expected \
$consumercount $filtercount $producercount]
}
if {[llength $list2] != [llength $list1]} {
lappend res [list {list2 length} [llength $list2] vs {list1 length} [
llength $list1]]
}
for {set i 0} {$i < [llength $list1]} {incr i} {
if {[lindex $list2 $i] != [lindex $list1 $i] * 2} {
lappend res [list {list2 not double list1} $i [lindex $list1 $i] [
lindex $list2 $i]]
return $res
}
}
for {set i [expr [llength $list1] - 10]} {$i < [llength $list1]} {incr i} {
if {[lindex $list2 $i] != [lindex $list1 $i] * 2} {
lappend res [list {list2 not double list1} $i [lindex $list1 $i] [
lindex $list2 $i]]
return $res
}
}
}
set res
} -cleanup $cleanup1 -result [sl {
}]
test daerth_primes {} -setup $setup1 -body {
foreach {consumercount filtercount producercount} [sl {
1 1 1
}] {
factory1 station_produce_primes 500 $consumercount $filtercount $producercount
vwait [namespace current]::done
set list1 [lsort -integer [::tsv::get tres list]]
set list2 [lsort -integer [::tsv::get tres2 list]]
for {set i 0} {$i < [llength $list1]} {incr i} {
if {[lindex $list2 $i] != [lindex $list1 $i] * 2} {
lappend res [list {list2 not double list1} $i [lindex $list1 $i] [
lindex $list2 $i]]
return $res
}
}
}
} -cleanup $cleanup1 -result {}
# Warning, this can take several minutes to run
test daerth_primes_loadbalanced {} -setup $setup1 -body {
foreach {consumercount filtercount producercount} [sl {
1 1 1
#this have about twice the performance of the previous run
1 2 1
}] {
factory_primes_loadbalanced 1000 $consumercount $filtercount $producercount
vwait [namespace current]::done
set list1 [lsort -integer [::tsv::get tres list]]
lappend res [llength $list1]
}
set res
} -cleanup $cleanup1 -result [sl {
167 167
}]
cleanupTests
}
proc go producers {
foreach worker $producers {
#puts stderr [list [thread::id] telling $worker to eat]
thread::send -async $worker eat
}
}
proc alldone master {
variable prefix
#alldone must respond immediately so that the caller isn't waiting
after 0 [list after idle [list apply [list {master prefix} {
#puts stderr [list telling $master to quit]
thread::send -async $master ${prefix}::quit
incr [namespace current]::done
# puts stderr [::tsv::llength result list]
# exit 0
} [namespace current]] $master $prefix]]
}
proc bgerror {res opts} {
variable bgres
variable bgopts
variable done
set bgres $res
set bgopts $opts
set done 1
}
proc factory1 {of chunk consumercount filtercount producercount} {
variable prefix
#puts stderr [list {main thread is} [thread::id]]
set daerth [daerth new onfinish [
list [thread::id] [namespace current]::alldone]]
#puts stderr [list [thread::id] {daerth is} $daerth]
## leaf consumers
set consumers [station_push_tres2 $consumercount]
#puts stderr [list [thread::id] consumers are $consumers]
thread::send -async $daerth [list ${prefix}::add consumers $consumers]
# These filters are fed by the producers, and feed the leaf consumers.
set filters [station_push_tres $filtercount]
foreach station $filters {
thread::send -async $station [list ${prefix}::consumers {*}$consumers]
}
thread::send -async $daerth [list ${prefix}::add filters $filters]
# These producers consume nothing.
set producers [$of $chunk $producercount]
foreach station $producers {
thread::send -async $station [list ${prefix}::consumers {*}$filters]
}
#puts stderr [list [thread::id] adding producers $producers]
thread::send -async $daerth [list ${prefix}::add producers $producers]
#puts stderr [list {producers are} $producers]
tsv::set tres list {}
tsv::set tres2 list {}
#puts [list [thread::id] {sending to daerth} ${prefix}::ready [thread::id] [namespace current]::go]
thread::send -async $daerth [list ${prefix}::ready [thread::id] [namespace current]::go]
}
proc factory_primes_loadbalanced {chunk consumercount filtercount producercount} {
variable prefix
set daerth [daerth new onfinish [
list [thread::id] [namespace current]::alldone]]
#puts stderr [list [thread::id] daerth $daerth]
set producers [station_produce_odd_numbers $chunk $producercount]
thread::send -async $daerth [list ${prefix}::add producers $producers]
#puts stderr [list [thread::id] producers $producers]
set consumers [station_push_tres 1]
thread::send -async $daerth [list ${prefix}::add consumers $consumers]
#puts stderr [list [thread::id] consumers $consumers ]
set filters [station_isprime $filtercount]
foreach filter $filters {
thread::send -async $filter [list ${prefix}::consumers {*}$consumers]
}
thread::send -async $daerth [list ${prefix}::add filters $filters]
#puts stderr [list [thread::id] filters $filters]
foreach producer $producers {
thread::send -async $producer [list ${prefix}::select balance]
thread::send -async $producer [list ${prefix}::consumers {*}$filters]
#comment this line out and watch performance sink
}
tsv::set tres list {}
thread::send -async $daerth [list ${prefix}::ready [thread::id] [namespace current]::go]
}
proc station_push_tres2 count {
station new count $count limit 10 script {
variable work {
lassign $data[set data {}] value
#puts stderr [list [thread::id] consumer received $value]
tsv::lpush tres2 list [expr {$value * 2}]
}
}
}
proc station_push_tres count {
station new count $count limit 10 script {
variable work {
# set last [::tsv::lpop result list]
#puts stderr [list [thread::id] {working on} [list $data]]
lassign $data[set data {}] value
# do not use lappend! It returns the list, and therefore is dog-slow!
# ::tsv::lappend tres list [thread::id]::hello ]
# ::tsv::linsert is ok
# ::tsv::linsert tres list end [thread::id]::$value
::tsv::lpush tres list $value
#puts [list [thread::id] {feeding value} [list $value]]
feed $value
# artificial delay
# after 20
}
}
}
proc station_produce_odd_numbers {chunk count} {
for {set i 0} {$i < $count * $chunk} {incr i $chunk} {
set station [station new limit 10 script {
proc isprime x {
expr {$x > 1 && ![regexp {^(oo+?)\1+$} [string repeat o $x]]}
}
variable init {
lassign $args first count
if {$first % 2 == 0} {
incr first
}
set last [expr {$first + $count}]
}
variable work {
# puts stderr [list [thread::id] produce primes]
for {set i $first} {$i < $last} {incr i 2} {
set consumer [feed $i]
}
# puts stderr [list [thread::id] producer done]
done
}
} args $i $chunk]
lappend stations $station
}
return $stations
}
proc station_produce_primes {chunk count} {
for {set i 0} {$i < $count * $chunk} {incr i $chunk} {
set station [station new limit 10 script {
#an progessively slower checker, good for these unit tests
proc isprime x {
expr {$x > 1 && ![regexp {^(oo+?)\1+$} [string repeat o $x]]}
}
variable init {
lassign $args first count
if {$first % 2 == 0} {
incr first
}
set last [expr {$first + $count}]
}
variable work {
# puts stderr [list [thread::id] produce primes]
for {set i $first} {$i < $last} {incr i 2} {
#puts stderr [list [thread::id] checking $i]
if {[isprime $i] == 1} {
#puts stderr [list [thread::id] prime $i]
feed $i
}
}
# puts stderr [list [thread::id] producer done]
done
}
} args $i $chunk]
lappend stations $station
# simulate workers at different rates
# thread::send $worker [list incr timer [incr delay 100]]
# thread::send $worker [list set delay 0]
}
return $stations
}
proc station_isprime count {
station new count $count limit 10 script {
proc isprime x {
expr {$x > 1 && ![regexp {^(oo+?)\1+$} [string repeat o $x]]}
}
variable init {
lassign $args tsvvar
}
variable work {
lassign $data[set data {}] candidate
if {[isprime $candidate] == 1} {
#puts stderr [list [thread::id] prime $candidate]
feed $candidate
}
}
}
}
variable prefix ::ws
variable setup1 {
variable bgres
variable bgopts
variable res {}
}
interp bgerror {} [namespace current]::bgerror