#! /bin/env tclsh
package require {ycl test}
proc suite_main {} {
package require {ycl dict}
namespace import [yclprefix]::dict::getdefault
package require {ycl list}
namespace import [yclprefix]::list::sl
variable cleanup1
namespace upvar [yclprefix]::test cleanup1 cleanup1
[yclprefix]::test::init
namespace import [yclprefix]::test::cleanup1
package require [list ycl daerth]
namespace import [yclprefix]::daerth
namespace import [yclprefix]::daerth::queue
variable setup1 {
}
test basic {} -body {
variable res
set tnames [thread::names]
set length1 [llength $tnames]
#puts stderr [list {existing threads} $tnames]
[daerth queue new queue1] .init
[daerth queue new queue2] .init
[daerth queue new queue3] .init
set station1 [daerth transport [queue1 thread] [queue2 thread] {{daerth number} {
while 1 {
# puts stderr [list [thread::id] station1 working on $number]
lassign [$daerth deliver [
expr {$number + $number}]] number
}
}}]
#puts stderr [list station1 $station1]
set station2 [daerth transport [queue2 thread] [queue3 thread] \
{{daerth number} {
while 1 {
# puts [list [thread::id] station2 working on $number]
lassign [$daerth deliver [
list $number [expr {$number + 1}]]] number
}
}}
]
set producer [daerth producer [queue1 thread] {{daerth {
for {set i 0} {$i < 10} {incr i} {
$daerth deliver $i
}
# puts stderr [list [thread::id] leaving work]
}}}]
#puts stderr [list producer $producer]
collect queue3
queue1 exit
queue2 exit
queue3 exit
set elapsed 0
set wait 100
while {$elapsed < 30000} {
set tnames2 [thread::names]
set length2 [llength $tnames2]
set res2 [expr {$length1 == $length2}]
if {$length1 == $length2} break
after $wait
incr elapsed 100
}
lappend res $res2
return $res
} -cleanup [cleanup1] -result [sl {
{0 1} {2 3} {4 5} {6 7} {8 9} {10 11} {12 13} {14 15} {16 17} {18 19} 1
}]
test daerth {} -setup $setup1 -body {
variable res
set tnames [thread names]
set res2 {}
foreach {consumercount filtercount producercount} [sl {
1 1 1
1 1 2
1 1 3
1 1 4
1 2 1
1 3 1
1 4 1
1 4 2
}] {
factory1 [list station_produce_odd_numbers 500 $producercount] $consumercount $filtercount
set list1 [lsort -integer [::tsv::get tres list]]
set res [lsort -integer $res[set res {}]]
if {[llength $res] != [llength $list1]} {
lappend res2 [list {res length} [llength $res] vs {list1 length} [
llength $list1]]
}
for {set i 0} {$i < [llength $list1]} {incr i} {
if {[lindex $res $i] != [lindex $list1 $i] * 2} {
lappend res [list {res not double list1} $i [lindex $list1 $i] [
lindex $res $i]]
return $res2
}
}
for {set i [expr {[llength $list1] - 10}]} {$i < [llength $list1]} {incr i} {
if {[lindex $res $i] != [lindex $list1 $i] * 2} {
lappend res [list {res not double list1} $i [lindex $list1 $i] [
lindex $res $i]]
return $res2
}
}
lappend res2 done
}
set tnames2 [thread names]
lappend res2 [expr {[llength $tnames] == [llength $tnames2]}]
return $res2
} -cleanup [cleanup1] -result [sl {
done done done done done done done done 1
}]
test daerth_primes {} -setup $setup1 -body {
variable res
set tnames [thread names]
foreach {consumercount filtercount producercount} [sl {
1 1 1
}] {
factory1 [list station_produce_primes 500 $producercount] $consumercount $filtercount
set list1 [lsort -integer [::tsv::get tres list]]
set res [lsort -integer $res[set res {}]]
for {set i 0} {$i < [llength $list1]} {incr i} {
if {[lindex $res $i] != [lindex $list1 $i] * 2} {
lappend res2 [list {res not double list1} $i [lindex $list1 $i] [
lindex $res $i]]
return $res2
}
}
lappend res2 done
}
set tnames2 [thread names]
lappend res2 [expr {[llength $tnames] == [llength $tnames2]}]
return $res2
} -cleanup [cleanup1] -result [sl {
done 1
}]
# Warning, this can take several minutes to run
test daerth_primes_loadbalanced {} -body {
set res2 {}
set tnames [thread names]
foreach {consumercount filtercount producercount} [sl {
1 1 1
# this runs about twice as fast as the previous run
1 2 1
}] {
factory_primes_loadbalanced [
list station_produce_odd_numbers 1000 $producercount] \
$consumercount $filtercount
set list1 [lsort -integer [::tsv::get tres list]]
lappend res2 [llength $list1]
}
set tnames2 [thread names]
lappend res2 [expr {[llength $tnames] == [llength $tnames2]}]
set res2
} -cleanup [cleanup1] -result [sl {
167 167 1
}]
test pipeline {} -body {
variable res
set names [lsort [thread::names]]
daerth pipeline p1 {
{
{comm {
set x 0
for {set i 0} {$i < 10} {incr i} {
$comm deliver [incr x $i]
}
}}
}
} {
{
{{comm data} {
while 1 {
lassign [$comm deliver [list $data $data]] data
}
}}
}
} {
{
{{comm data} {
lassign $data a1 a2
while 1 {
lassign [$comm take] b1 b2
lassign [$comm deliver [list $a1 $a2 $b1 $b2]] a1 a2
}
}}
}
}
coroutine collect1 ::apply [list pipe {
yield
while 1 {
lappend res [$pipe]
}
set [namespace current]::res $res
} [namespace current]] [namespace which p1]
after 0 [list [namespace which collect1]]
vwait [namespace current]::res
for {set i 0} {$i < 100} {incr i} {
set clean clean
foreach name1 [lsort [thread::names]] name2 $names {
if {$name1 ne $name2} {
set clean dirty
}
}
if {$clean eq {clean}} break
after 100
}
lappend res $clean
return $res
} -cleanup [cleanup1] -result [sl {
{0 0 1 1} {3 3 6 6} {10 10 15 15} {21 21 28 28} {36 36 45 45} clean
}]
cleanupTests
}
proc go producers {
foreach worker $producers {
# puts stderr [list [thread::id] telling $worker to receive]
thread::send -async $worker receive
}
}
proc alldone master {
# alldone must respond immediately so that the caller isn't waiting
after 0 [list after idle [list apply [list {master prefix} {
# puts stderr [list telling $master to quit]
thread::send -async $master [daerth prefix]::quit
incr [namespace current]::done
# puts stderr [::tsv::llength result list]
# exit 0
} [namespace current]] $master [daerth prefix]]]
}
proc bgerror {eres opts} {
variable bgopts
variable res
puts stderr [namespace current]::bgerror:\n[dict get $opts -errorinfo]
set bgopts $opts
set res $eres
}
proc collect queue {
after 0 [list coroutine collector ::apply [list queue {
variable res
set res1 {}
while 1 {
# puts stderr [list [thread::id] collecting from $queue [$queue thread]]
lappend res1 [$queue take]
}
set res $res1
} [namespace current]] $queue]
vwait [namespace current]::res
}
proc factory1 {producers consumercount filtercount} {
tsv::set tres list {}
[daerth queue new queue1] .init
[daerth queue new queue2] .init
[daerth queue new queue3] .init
# puts stderr [list queue1 is [queue1 thread]]
# puts stderr [list queue2 is [queue2 thread]]
set producers [{*}$producers [queue1 thread]]
# These filters are fed by the producers, and produce to the leaf consumers.
set filters [station_push_tres [queue1 thread] [queue2 thread] $filtercount]
## leaf consumers
set consumers [station_push_tres2 [queue2 thread] [queue3 thread] $consumercount]
collect queue3
queue1 exit
queue2 exit
queue3 exit
}
proc factory_primes_loadbalanced {producers consumercount filtercount} {
tsv::set tres list {}
[daerth queue new queue1] .init
[daerth queue new queue2] .init
[daerth queue new queue3] .init
# puts stderr [list queue1 is [queue1 thread]]
# puts stderr [list queue2 is [queue2 thread]]
# puts stderr [list queue3 is [queue3 thread]]
set producers [{*}$producers [queue1 thread]]
# puts stderr [list producers are $producers]
set filters [station_isprime [queue1 thread] [queue2 thread] $filtercount]
# puts stderr [list filters are $filters]
set consumers [station_push_tres [queue2 thread] [queue3 thread] $consumercount]
collect queue3
queue1 exit
queue2 exit
queue3 exit
}
proc station_isprime {source dest count} {
set transports [daerth station count $count transport [
list $source $dest {{daerth candidate} {
proc isprime x {
expr {$x > 1 && ![regexp {^(oo+?)\1+$} [string repeat o $x]]}
}
while 1 {
if {[isprime $candidate] == 1} {
# puts stderr [list [thread::id] prime $candidate]
lassign [$daerth deliver $candidate] candidate
} else {
lassign [$daerth take] candidate
}
}
}}]
]
}
proc station_produce_odd_numbers {chunk count thread} {
set producers {}
for {set i 0} {$i < $count * $chunk} {incr i $chunk} {
lappend producers [daerth producer $thread [
list {{first count daerth} {
if {$first % 2 == 0} {
incr first
}
set last [expr {$first + $count}]
# puts stderr [list [thread::id] produce odd numbers]
for {set i $first} {$i < $last} {incr i 2} {
$daerth deliver $i
}
# puts stderr [list [thread::id] done producing odd numbers]
}} $i $chunk]]
}
return $producers
}
proc station_produce_primes {chunk count thread} {
set producers {}
for {set i 0} {$i < $count * $chunk} {incr i $chunk} {
lappend producers [daerth producer $thread [list {{first count daerth} {
# a progessively slower checker, good for these unit tests
proc isprime x {
expr {$x > 1 && ![regexp {^(oo+?)\1+$} [string repeat o $x]]}
}
if {$first % 2 == 0} {
incr first
}
set last [expr {$first + $count}]
for {set i $first} {$i < $last} {incr i 2} {
# puts stderr [list [thread::id] checking $i]
if {[isprime $i] == 1} {
# puts stderr [list [thread::id] prime $i]
$daerth deliver $i
}
}
# puts stderr [list [thread::id] done producing primes]
}} $i $chunk]]
}
# puts stderr [list producers are $producers]
return $producers
}
proc station_push_tres {source dest count} {
set transports {}
for {set i 0} {$i < $count} {incr i} {
lappend transports [daerth transport $source $dest {{daerth value} {
set count 0
while 1 {
::tsv::lpush tres list $value
incr count
lassign [$daerth deliver $value] value
}
# puts stderr [list [thread::id] push_tres produced $count]
}}]
}
return $transports
}
proc station_push_tres2 {source dest count} {
set transports [daerth station count $count transport [
list $source $dest {{daerth value} {
set count 0
while 1 {
# puts stderr [list [thread::id] consumer received $value for $deliver]
incr count
lassign [$daerth deliver [expr {$value * 2}]] value
}
# puts stderr [list [thread::id] push_tres2 produced $count]
}}
]]
# puts stderr [list transports for push_tres2 are $transports]
}
interp bgerror {} [namespace current]::bgerror