Attachment "threads - small tpool test.tcl" to
ticket [90de9ddf94]
added by
sebres
2019-05-07 17:27:20.
package require Thread
## ------ small async logger ----------------------------------------------
tsv::set logger async 1; # async logging is faster but non sequential (out of execution real sequence)
tsv::set logger tid [thread::create {
proc log {thid args} {
puts -nonewline stderr [format "%04s: " [regsub {^tid0+} $thid {}]]
puts stderr {*}$args; flush stderr
}
log [thread::id] "logger thread started (async-mode: [tsv::get logger async])"
thread::wait
update
}]
proc log {args} {
if [tsv::get logger async] {
thread::send -async [tsv::get logger tid] [list log [thread::id] {*}$args]
} else {
thread::send [tsv::get logger tid] [list log [thread::id] {*}$args]
}
}
proc force-log {} {
if [tsv::get logger async] {
thread::send [tsv::get logger tid] {}
}
}
tsv::set logger body [info body log]
## ------------------------------------------------------------------------
proc perform_command {ctx {wait 1}} {
set ctx [list context $ctx]
tsv::set $ctx pcm [set mtx [thread::mutex create]]
thread::mutex lock $mtx
try {
tsv::set $ctx working 0
} finally {
thread::mutex unlock $mtx
}
for {set x 1} {$x <= 6} {incr x} {
set t($x) [thread::create {
set ::status ""
proc log {args} [tsv::get logger body]
proc handle_input {ctx {task {}}} {
set mtx [tsv::get $ctx pcm]; ## if needed mutex is here.
set ::status ""
log " [tsv::incr $ctx working] thread(s) are working, task $task."
while {$::status ni {"endoftask" "shutdown"}} {
vwait ::status; # wait for end of work
}
tsv::incr $ctx working -1
log " input finished, task $task"
# result to caller thread:
thread::send $::mtid [list lappend ::result($ctx,$task) 1]
}
log " thread [thread::id] is started."
while {$::status ne "shutdown"} {
vwait ::status; # wait for works or shutdown
}
log " thread [thread::id] is down."
}]
thread::send -async $t($x) [list set ::mtid [thread::id]]
}
log "Worker threads were created."
# if it is needed to know they are really started:
if 1 {
set ::signal($ctx) 0
for {set x 1} {$x <= 6} {incr x} {
thread::send -async $t($x) [list thread::send -async [thread::id] [list incr signal($ctx)]]
}
while {$::signal($ctx) < 6} { vwait signal($ctx) }
log "Worker threads were started."
}
# several tasks here:
foreach task {1 2 3} {
log "$ctx. >> task $task ..."
set ::result($ctx,$task) {}
for {set x 1} {$x <= 6} {incr x} {
thread::send -async $t($x) [list handle_input $ctx $task]
foreach part {A B C} {
thread::send -async $t($x) [list log " task: $task: part $part"]
}
# ...
thread::send -async $t($x) {set ::status "endoftask"}
}
# wait for result:
while {[llength $::result($ctx,$task)] < 6} { vwait ::result($ctx,$task) }
log "$ctx. << task $task done ... result: $::result($ctx,$task)"
unset ::result($ctx,$task)
# force-log; # all messages into log
}
# shutdown pool:
log "signal shutdown ..."
for {set x 1} {$x <= 6} {incr x} {
thread::send -async $t($x) {set ::status "shutdown"}
}
if {$wait} {
## wait using release:
for {set x 1} {$x <= 6} {incr x} {
catch { thread::release -wait $t($x) }
}
}
log "done, [tsv::get $ctx working] workers, threads ([thread::names])."
# force-log
thread::mutex destroy $mtx
tsv::unset $ctx
unset -nocomplain ::signal($ctx)
}
## ------------------------------------------------------------------------
proc repeat_command {} {
for {set ctx 1} {$ctx <= 2} {incr ctx} {
log "$ctx. run"
perform_command $ctx 1
}
}
repeat_command
thread::send [tsv::get logger tid] {}; # wait for last event is done (everything is logged).
thread::release -wait [tsv::get logger tid]