ycl

Check-in [6852f3d753]
Login

Check-in [6852f3d753]

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:daerth
add [exit] method to queues
test for thread cleanup
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 6852f3d75345905bb31919d3da9b915bf69f7da1
User & Date: pooryorick 2018-07-21 19:56:37.027
Context
2018-07-23
10:37
add althttpdctl check-in: 76aabcdd07 user: pooryorick tags: trunk
2018-07-21
19:56
daerth
add [exit] method to queues
test for thread cleanup
check-in: 6852f3d753 user: pooryorick tags: trunk
2018-07-20
20:50
ycl daerth
rewrite complete
all tests pass
check-in: 2eebbe4391 user: pooryorick tags: trunk
Changes
Unified Diff Ignore Whitespace Patch
Changes to packages/daerth/lib/daerth.tcl.
76
77
78
79
80
81
82

83
84
85
86
87
88
89
		list ::coroutine main ::apply [list {work args} {

		variable queue
		thread::send -async $queue [list [namespace current] sourceon [list [thread::id] [info coroutine]]]
		::apply $work {*}$args [namespace current]
		#puts stderr [list [thread::id] sending source off to $queue]
		thread::send -async $queue [list [namespace current] sourceoff [list [thread::id] [info coroutine]]]

	} [namespace current]] $work {*}$args]]
	
	return $producer
}


proc station args {







>







76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
		list ::coroutine main ::apply [list {work args} {

		variable queue
		thread::send -async $queue [list [namespace current] sourceon [list [thread::id] [info coroutine]]]
		::apply $work {*}$args [namespace current]
		#puts stderr [list [thread::id] sending source off to $queue]
		thread::send -async $queue [list [namespace current] sourceoff [list [thread::id] [info coroutine]]]
		thread::release
	} [namespace current]] $work {*}$args]]
	
	return $producer
}


proc station args {
Changes to packages/daerth/lib/daerth.test.tcl.
17
18
19
20
21
22
23



24
25
26
27
28
29
30
31
32
33
34
35
36
37

38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

57
58







59
60
61
62
63
64
65
66

67
68
69
70
71
72
73
	variable setup1 {
	}


	test basic {} -body {
		variable res




		[daerth queue .spawn queue1] init
		[daerth queue .spawn queue2] init
		[daerth queue .spawn queue3] init


		set station1 [daerth transport source [queue1 thread] dest [
			queue2 thread] work {{daerth number} {

			while 1 {
				# puts stderr [list [thread::id] station1 working on $number]
				lassign [$daerth deliver [
					expr {$number + $number}]] number
			}
		}}]



		set station2 [daerth transport source  [queue2 thread] dest [
			queue3 thread] work {{daerth number} {

			while 1 {
				# puts [list [thread::id] station2 working on $number]
				lassign [$daerth deliver [
					list $number [expr {$number + 1}]]] number
			}
		}}]


		set producer [daerth producer queue [queue1 thread] work {daerth {
			for {set i 0} {$i < 10} {incr i} {
				$daerth deliver $i
			}
			# puts stderr [list [thread::id] leaving work]
		}}]


		collect queue3







		return $res
	} -cleanup [cleanup1] -result [sl {
		{0 1} {2 3} {4 5} {6 7} {8 9} {10 11} {12 13} {14 15} {16 17} {18 19}
	}]


	test daerth {} -setup $setup1 -body {
		variable res

		set res2 {}
		foreach {consumercount filtercount producercount} [sl {
			1 1 1
			1 1 2
			1 1 3
			1 1 4
			1 2 1







>
>
>














>



















>


>
>
>
>
>
>
>


|





>







17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
	variable setup1 {
	}


	test basic {} -body {
		variable res

		set tnames [thread::names]
		#puts stderr [list {existing threads} $tnames]

		[daerth queue .spawn queue1] init
		[daerth queue .spawn queue2] init
		[daerth queue .spawn queue3] init


		set station1 [daerth transport source [queue1 thread] dest [
			queue2 thread] work {{daerth number} {

			while 1 {
				# puts stderr [list [thread::id] station1 working on $number]
				lassign [$daerth deliver [
					expr {$number + $number}]] number
			}
		}}]
		#puts stderr [list station1 $station1]


		set station2 [daerth transport source  [queue2 thread] dest [
			queue3 thread] work {{daerth number} {

			while 1 {
				# puts [list [thread::id] station2 working on $number]
				lassign [$daerth deliver [
					list $number [expr {$number + 1}]]] number
			}
		}}]


		set producer [daerth producer queue [queue1 thread] work {daerth {
			for {set i 0} {$i < 10} {incr i} {
				$daerth deliver $i
			}
			# puts stderr [list [thread::id] leaving work]
		}}]
		#puts stderr [list producer $producer]

		collect queue3

		thread send [queue1 thread] {thread::release}
		thread send [queue2 thread] {thread::release}
		thread send [queue3 thread] {thread::release}

		set tnames2 [thread::names]
		lappend res [expr {[llength $tnames] == [llength $tnames2]}]
		return $res
	} -cleanup [cleanup1] -result [sl {
		{0 1} {2 3} {4 5} {6 7} {8 9} {10 11} {12 13} {14 15} {16 17} {18 19} 1
	}]


	test daerth {} -setup $setup1 -body {
		variable res
		set tnames [thread names]
		set res2 {}
		foreach {consumercount filtercount producercount} [sl {
			1 1 1
			1 1 2
			1 1 3
			1 1 4
			1 2 1
95
96
97
98
99
100
101


102
103
104
105
106
107
108
109

110
111
112
113
114
115
116
117
118
119
120
121
122
123
124


125
126
127
128
129
130
131
132

133
134
135
136
137
138
139
140
141
142
143


144
145
146
147
148
149
150
					lappend res [list {res not double list1} $i [lindex $list1 $i] [
						lindex $res $i]]
					return $res2
				}
			}
			lappend res2 done
		}


		return $res2
	} -cleanup [cleanup1] -result [sl {
		done done done done done done done done
	}]


	test daerth_primes {} -setup $setup1 -body {
		variable res

		foreach {consumercount filtercount producercount} [sl {
			1 1 1
		}] {
			factory1 [list station_produce_primes 500 $producercount] $consumercount $filtercount
			set list1 [lsort -integer [::tsv::get tres list]]
			set res [lsort -integer $res[set res {}]]
			for {set i 0} {$i < [llength $list1]} {incr i} {
				if {[lindex $res $i] != [lindex $list1 $i] * 2} {
					lappend res2 [list {res not double list1} $i [lindex $list1 $i] [
						lindex $res $i]]
					return $res2
				}
			}
			lappend res2 done
		}


		return $res2
	} -cleanup [cleanup1] -result [sl {
		done
	}]

	# Warning, this can take several minutes to run
	test daerth_primes_loadbalanced {} -body {
		set res2  {}

		foreach {consumercount filtercount producercount} [sl {
			1 1 1
			# this runs about twice as fast as the previous run
			1 2 1 
		}] {
			factory_primes_loadbalanced [
				list station_produce_odd_numbers 1000 $producercount] \
					$consumercount $filtercount
			set list1 [lsort -integer [::tsv::get tres list]]
			lappend res2 [llength $list1]
		}


		set res2
	} -cleanup [cleanup1] -result [sl {
		167 167
	}]
	
	cleanupTests
}







>
>


|





>















>
>


|





>











>
>







108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
					lappend res [list {res not double list1} $i [lindex $list1 $i] [
						lindex $res $i]]
					return $res2
				}
			}
			lappend res2 done
		}
		set tnames2 [thread names]
		lappend res2 [expr {[llength $tnames] == [llength $tnames2]}]
		return $res2
	} -cleanup [cleanup1] -result [sl {
		done done done done done done done done 1
	}]


	test daerth_primes {} -setup $setup1 -body {
		variable res
		set tnames [thread names]
		foreach {consumercount filtercount producercount} [sl {
			1 1 1
		}] {
			factory1 [list station_produce_primes 500 $producercount] $consumercount $filtercount
			set list1 [lsort -integer [::tsv::get tres list]]
			set res [lsort -integer $res[set res {}]]
			for {set i 0} {$i < [llength $list1]} {incr i} {
				if {[lindex $res $i] != [lindex $list1 $i] * 2} {
					lappend res2 [list {res not double list1} $i [lindex $list1 $i] [
						lindex $res $i]]
					return $res2
				}
			}
			lappend res2 done
		}
		set tnames2 [thread names]
		lappend res2 [expr {[llength $tnames] == [llength $tnames2]}]
		return $res2
	} -cleanup [cleanup1] -result [sl {
		done 1
	}]

	# Warning, this can take several minutes to run
	test daerth_primes_loadbalanced {} -body {
		set res2  {}
		set tnames [thread names]
		foreach {consumercount filtercount producercount} [sl {
			1 1 1
			# this runs about twice as fast as the previous run
			1 2 1 
		}] {
			factory_primes_loadbalanced [
				list station_produce_odd_numbers 1000 $producercount] \
					$consumercount $filtercount
			set list1 [lsort -integer [::tsv::get tres list]]
			lappend res2 [llength $list1]
		}
		set tnames2 [thread names]
		lappend res2 [expr {[llength $tnames] == [llength $tnames2]}]
		set res2
	} -cleanup [cleanup1] -result [sl {
		167 167
	}]
	
	cleanupTests
}
211
212
213
214
215
216
217




218
219
220
221
222
223
224
	# These filters are fed by the producers, and produce to  the leaf consumers.
	set filters [station_push_tres [queue1 thread] [queue2 thread] $filtercount]

	## leaf consumers
	set consumers [station_push_tres2 [queue2 thread] [queue3 thread] $consumercount]

	collect queue3




}


proc factory_primes_loadbalanced {producers consumercount filtercount} {

	tsv::set tres list {} 








>
>
>
>







232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
	# These filters are fed by the producers, and produce to  the leaf consumers.
	set filters [station_push_tres [queue1 thread] [queue2 thread] $filtercount]

	## leaf consumers
	set consumers [station_push_tres2 [queue2 thread] [queue3 thread] $consumercount]

	collect queue3

	thread send [queue1 thread] {thread::release}
	thread send [queue2 thread] {thread::release}
	thread send [queue3 thread] {thread::release}
}


proc factory_primes_loadbalanced {producers consumercount filtercount} {

	tsv::set tres list {} 

233
234
235
236
237
238
239




240
241
242
243
244
245
246
	set producers [{*}$producers [queue1 thread]]
	# puts stderr [list producers are $producers]
	set filters [station_isprime [queue1 thread] [queue2 thread] $filtercount]
	# puts stderr [list filters are $filters]
	set consumers [station_push_tres [queue2 thread] [queue3 thread] $consumercount] 

	collect queue3




}


proc station_isprime {source dest count} {
	set transports [daerth station count $count transport [
		list source $source dest $dest init {
			proc isprime x {







>
>
>
>







258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
	set producers [{*}$producers [queue1 thread]]
	# puts stderr [list producers are $producers]
	set filters [station_isprime [queue1 thread] [queue2 thread] $filtercount]
	# puts stderr [list filters are $filters]
	set consumers [station_push_tres [queue2 thread] [queue3 thread] $consumercount] 

	collect queue3

	queue1 exit
	queue2 exit
	queue3 exit
}


proc station_isprime {source dest count} {
	set transports [daerth station count $count transport [
		list source $source dest $dest init {
			proc isprime x {
Changes to packages/daerth/lib/queue.tcl.
1
2
3
4
5
6





7
8
9
10
11
12
13
#! /bin/env tclsh

namespace import [yclprefix]::util 
package require {ycl daerth util}
namespace import [yclprefix]::daerth::util







proc init {_ args} {
	$_ .vars tid
	variable setup
	set opts [list limit 10 queuesize 10 tid {}]

	util opts opts args






>
>
>
>
>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#! /bin/env tclsh

namespace import [yclprefix]::util 
package require {ycl daerth util}
namespace import [yclprefix]::daerth::util

proc exit _ {
	$_ .vars tid
	thread::send $tid thread::release
}
.my .method exit

proc init {_ args} {
	$_ .vars tid
	variable setup
	set opts [list limit 10 queuesize 10 tid {}]

	util opts opts args
Changes to packages/daerth/pkgIndex.tcl.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#! /bin/env tclsh


package ifneeded [list ycl daerth] 0.1 [list apply {{dir} {
	package require {ycl package}
	package require {ycl shelf shelf}
	set name [yclprefix]::daerth::queue
	[[yclprefix] shelf shelf .spawn $name] init
	$name .eval [list source $dir/lib/queue.tcl]

	[yclprefix] package source daerth $dir/lib/daerth.tcl
	package provide [list ycl daerth] 0.1
}} $dir]


package ifneeded [list ycl daerth test] 0.1 [list apply {{dir} {
	package require {ycl package}
	[yclprefix] package source daerth::test $dir/lib/daerth.test.tcl
	package provide [list ycl daerth test] 0.1
}} $dir]


package ifneeded [list ycl daerth util] 0.1 [list apply {{dir} {
	package require {ycl package}
	[yclprefix] package source daerth::util $dir/lib/util.tcl
	package provide [list ycl daerth util] 0.1
}} $dir]



|







|



|


|



|


|

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#! /bin/env tclsh


package ifneeded [list ycl daerth] 0.2 [list apply {{dir} {
	package require {ycl package}
	package require {ycl shelf shelf}
	set name [yclprefix]::daerth::queue
	[[yclprefix] shelf shelf .spawn $name] init
	$name .eval [list source $dir/lib/queue.tcl]

	[yclprefix] package source daerth $dir/lib/daerth.tcl
	package provide [list ycl daerth] 0.2
}} $dir]


package ifneeded [list ycl daerth test] 0.2 [list apply {{dir} {
	package require {ycl package}
	[yclprefix] package source daerth::test $dir/lib/daerth.test.tcl
	package provide [list ycl daerth test] 0.2
}} $dir]


package ifneeded [list ycl daerth util] 0.2 [list apply {{dir} {
	package require {ycl package}
	[yclprefix] package source daerth::util $dir/lib/util.tcl
	package provide [list ycl daerth util] 0.2
}} $dir]