Tcl package Thread source code

Artifact [32b06371c8]
Login
EuroTcl/OpenACS 11 - 12 JULY 2024, VIENNA

Artifact 32b06371c85f847bd68ef6271d1fc886bd4f7421b67b821355aea969ac319e7a:

Attachment "threads - small tpool test.mod-org.tcl" to ticket [90de9ddf94] added by sebres 2019-05-07 20:32:14.
package require Thread

## ------ small async logger ----------------------------------------------

tsv::set logger async 1; # async logging is faster but non sequential (out of execution real sequence)

tsv::set logger tid [thread::create {
  proc log {thid args} {
    puts -nonewline stderr [format "%04s: " [regsub {^tid0+} $thid {}]]
    puts stderr {*}$args; flush stderr
  }
  log [thread::id] "logger thread started (async-mode: [tsv::get logger async])"
  thread::wait
  update
}]
proc log {args} {
  if [tsv::get logger async] {
    thread::send -async [tsv::get logger tid] [list log [thread::id] {*}$args]
  } else {
    thread::send [tsv::get logger tid] [list log [thread::id] {*}$args]
  }
}
proc force-log {} {
  if [tsv::get logger async] {
    thread::send [tsv::get logger tid] {}
  }
}
tsv::set logger body [info body log]

## ------------------------------------------------------------------------

proc perform_command {ctx args} {
  tsv::set shared context $ctx
  tsv::set [tsv::get shared context] pcm [thread::mutex create]
  thread::mutex lock [tsv::get [tsv::get shared context] pcm]
  tsv::set [tsv::get shared context] pcc [thread::cond create]

  try {
    tsv::set [tsv::get shared context] working 0
    tsv::set [tsv::get shared context] finish_notification 0
  } finally {
    thread::mutex unlock [tsv::get [tsv::get shared context] pcm]
  }

  for {set x 1} {$x <= 6} {incr x} {
    set t($x) [thread::create {
    proc log {args} [tsv::get logger body]
    proc handle_input {} {

      ## **** STEP 1 ****
      thread::mutex lock [tsv::get [tsv::get shared context] pcm]
      try {
        ## **** HERE IS A RACE CONDITION: YOU CAN OBTAIN MUTEX AFTER MAIN THREAD LEAVE perform_command
        log "[tsv::incr [tsv::get shared context] working] threads are working."
      } finally {
        thread::mutex unlock [tsv::get [tsv::get shared context] pcm]
      }
      
      ## **** STEP 2 ****
      thread::mutex lock [tsv::get [tsv::get shared context] pcm]

      try {

        ## **** HERE IS A RACE CONDITION: YOU CAN NOTIFY A CALLER THAT THE LAST THREAD IS GOING, BUT SOME MAY BE STILL THERE
        tsv::incr [tsv::get shared context] working -1
        
        if [tsv::get [tsv::get shared context] finish_notification] {
          log "thread::cond notify finish"
          thread::cond notify [tsv::get [tsv::get shared context] pcc]
        }
      } finally {
        thread::mutex unlock [tsv::get [tsv::get shared context] pcm]
      }
    }

    handle_input
    }]
  }

  log "Worker threads were started."
  thread::mutex lock [tsv::get [tsv::get shared context] pcm]

  try {
    tsv::set [tsv::get shared context] finish_notification 1

    ## **** HERE IS A RACE CONDITION: IF working IS 0 (YOU DON'T ENTER WAIT AT ALL)
    while {[set w [tsv::get [tsv::get shared context] working]] > 0} {
      log "cond wait finish for $w threads"
      thread::cond wait [tsv::get [tsv::get shared context] pcc] [tsv::get [tsv::get shared context] pcm]

      ## **** HERE IS A RACE CONDITION: MUTEX LOCKED AND IF WORKING WILL GET 0, 
      ## **** WE WILL TRY TO EXIT (BUT THERE ARE POSSIBLY SOME WORKERS STILL AT STEP 1, SO STILL DON'T INCREMENTED IT)
    }

  } finally {
    thread::mutex unlock [tsv::get [tsv::get shared context] pcm]
  }

  if 1 {
    ## **** IMPORTANT =========================================================
    ## HERE WE ARE WAITING FOR ALL THREADS GOING DOWN (BEFORE REPEAT THE NEXT RUN AND SWITCH CONTEXT):
    for {set x 1} {$x <= 6} {incr x} {
      catch { thread::release -wait $t($x) }
    }
    log "Pool is down."
    ## **** /IMPORTANT ========================================================
  }

  global results
  lappend results [llength [thread::names]]
  # thread::cond destroy [tsv::get [tsv::get shared context] pcc]
  # thread::mutex destroy [tsv::get [tsv::get shared context] pcm]
}

## ------------------------------------------------------------------------

proc repeat_command {} {
  for {set ctx 1} {$ctx <= 12} {incr ctx} {
    log "$ctx. run ============="
    perform_command $ctx 1
  }
}

repeat_command
thread::send [tsv::get logger tid] {}; # wait for last event is done (everything is logged).
thread::release -wait [tsv::get logger tid]