Tcl package Thread source code

Artifact [7ac3bb868d]
Login

Artifact 7ac3bb868d20e1e1fce8f9c0449e5250f84e544a002689b15ce70e40a6acbcda:

Attachment "threads - small tpool test.tcl" to ticket [90de9ddf94] added by sebres 2019-05-07 17:27:20.
package require Thread

## ------ small async logger ----------------------------------------------

tsv::set logger async 1; # async logging is faster but non sequential (out of execution real sequence)

tsv::set logger tid [thread::create {
  proc log {thid args} {
    puts -nonewline stderr [format "%04s: " [regsub {^tid0+} $thid {}]]
    puts stderr {*}$args; flush stderr
  }
  log [thread::id] "logger thread started (async-mode: [tsv::get logger async])"
  thread::wait
  update
}]
proc log {args} {
  if [tsv::get logger async] {
    thread::send -async [tsv::get logger tid] [list log [thread::id] {*}$args]
  } else {
    thread::send [tsv::get logger tid] [list log [thread::id] {*}$args]
  }
}
proc force-log {} {
  if [tsv::get logger async] {
    thread::send [tsv::get logger tid] {}
  }
}
tsv::set logger body [info body log]

## ------------------------------------------------------------------------

proc perform_command {ctx {wait 1}} {
  set ctx [list context $ctx]
  tsv::set $ctx pcm [set mtx [thread::mutex create]]
  thread::mutex lock $mtx
  try {
    tsv::set $ctx working 0
  } finally {
    thread::mutex unlock $mtx
  }

  for {set x 1} {$x <= 6} {incr x} {
    set t($x) [thread::create {
      set ::status ""

      proc log {args} [tsv::get logger body]
      proc handle_input {ctx {task {}}} {
        
        set mtx [tsv::get $ctx pcm]; ## if needed mutex is here.
        set ::status ""

        log "  [tsv::incr $ctx working] thread(s) are working, task $task."

        while {$::status ni {"endoftask" "shutdown"}} {
          vwait ::status; # wait for end of work
        }
        
        tsv::incr $ctx working -1

        log "  input finished, task $task"
        # result to caller thread:
        thread::send $::mtid [list lappend ::result($ctx,$task) 1]
      }

      log "  thread [thread::id] is started."

      while {$::status ne "shutdown"} {
        vwait ::status; # wait for works or shutdown
      }
      log "  thread [thread::id] is down."
    }]

    thread::send -async $t($x) [list set ::mtid [thread::id]]
  }
  
  log "Worker threads were created."
  
  # if it is needed to know they are really started:
  if 1 {
    set ::signal($ctx) 0
    for {set x 1} {$x <= 6} {incr x} {
      thread::send -async $t($x) [list thread::send -async [thread::id] [list incr signal($ctx)]]
    }
    while {$::signal($ctx) < 6} { vwait signal($ctx) }
    log "Worker threads were started."
  }

  # several tasks here:
  foreach task {1 2 3} {
    log "$ctx. >> task $task ..."
    set ::result($ctx,$task) {}

    for {set x 1} {$x <= 6} {incr x} {
      thread::send -async $t($x) [list handle_input $ctx $task]
      foreach part {A B C} {
        thread::send -async $t($x) [list log "     task: $task: part $part"]
      }
      # ...
      thread::send -async $t($x) {set ::status "endoftask"}
    }

    # wait for result:
    while {[llength $::result($ctx,$task)] < 6} { vwait ::result($ctx,$task) }
    log "$ctx. << task $task done ... result: $::result($ctx,$task)"
    unset ::result($ctx,$task)

    # force-log; # all messages into log
  }

  # shutdown pool:
  log "signal shutdown ..."
  for {set x 1} {$x <= 6} {incr x} {
    thread::send -async $t($x) {set ::status "shutdown"}
  }
  if {$wait} {
    ## wait using release:
    for {set x 1} {$x <= 6} {incr x} {
      catch { thread::release -wait $t($x) }
    }
  }
  log "done, [tsv::get $ctx working] workers, threads ([thread::names])."

  # force-log

  thread::mutex destroy $mtx
  tsv::unset $ctx
  unset -nocomplain ::signal($ctx)
}

## ------------------------------------------------------------------------

proc repeat_command {} {
  for {set ctx 1} {$ctx <= 2} {incr ctx} {
    log "$ctx. run"
    perform_command $ctx 1
  }
}

repeat_command
thread::send [tsv::get logger tid] {}; # wait for last event is done (everything is logged).
thread::release -wait [tsv::get logger tid]