Browse Source

syslog and logfile format changes n shelthread logging

master
Julian Noble 2 weeks ago
parent
commit
11fd906cb4
  1. 3
      src/bootsupport/modules/punk/args-0.2.1.tm
  2. 18
      src/bootsupport/modules/punk/args/moduledoc/tclcore-0.1.0.tm
  3. 120
      src/bootsupport/modules/shellthread-1.6.2.tm
  4. 9470
      src/bootsupport/modules/tomlish-1.1.7.tm
  5. 853
      src/modules/shellthread-999999.0a1.0.tm
  6. 3
      src/modules/shellthread-buildversion.txt
  7. 3
      src/project_layouts/custom/_project/punk.project-0.1/src/bootsupport/modules/punk/args-0.2.1.tm
  8. 18
      src/project_layouts/custom/_project/punk.project-0.1/src/bootsupport/modules/punk/args/moduledoc/tclcore-0.1.0.tm
  9. 853
      src/project_layouts/custom/_project/punk.project-0.1/src/bootsupport/modules/shellthread-1.6.2.tm
  10. 9470
      src/project_layouts/custom/_project/punk.project-0.1/src/bootsupport/modules/tomlish-1.1.7.tm
  11. 3
      src/project_layouts/custom/_project/punk.shell-0.1/src/bootsupport/modules/punk/args-0.2.1.tm
  12. 18
      src/project_layouts/custom/_project/punk.shell-0.1/src/bootsupport/modules/punk/args/moduledoc/tclcore-0.1.0.tm
  13. 853
      src/project_layouts/custom/_project/punk.shell-0.1/src/bootsupport/modules/shellthread-1.6.2.tm
  14. 9470
      src/project_layouts/custom/_project/punk.shell-0.1/src/bootsupport/modules/tomlish-1.1.7.tm
  15. 853
      src/vfs/_vfscommon.vfs/modules/shellthread-1.6.2.tm

3
src/bootsupport/modules/punk/args-0.2.1.tm

@ -6345,7 +6345,8 @@ tcl::namespace::eval punk::args {
} }
} }
indexexpression { indexexpression {
if {[catch {lindex {} $e_check}]} { #tcl 9.1+? tip 615 'string is index'
if {$echeck eq "" || [catch {lindex {} $e_check}]} {
set msg "$argclass $argname for %caller% requires type indexexpression. An index as used in Tcl list commands. Received: '$e_check'" set msg "$argclass $argname for %caller% requires type indexexpression. An index as used in Tcl list commands. Received: '$e_check'"
#return -options [list -code error -errorcode [list PUNKARGS VALIDATION [list typemismatch $type] -badarg $argname -argspecs $argspecs]] $msg #return -options [list -code error -errorcode [list PUNKARGS VALIDATION [list typemismatch $type] -badarg $argname -argspecs $argspecs]] $msg
lset clause_results $c_idx $a_idx [list errorcode [list PUNKARGS VALIDATION [list typemismatch $type] -badarg $argname -argspecs $argspecs] msg $msg] lset clause_results $c_idx $a_idx [list errorcode [list PUNKARGS VALIDATION [list typemismatch $type] -badarg $argname -argspecs $argspecs] msg $msg]

18
src/bootsupport/modules/punk/args/moduledoc/tclcore-0.1.0.tm

@ -6020,6 +6020,13 @@ tcl::namespace::eval punk::args::moduledoc::tclcore {
@values -min 3 -max -1 @values -min 3 -max -1
listVar -type string -help\ listVar -type string -help\
"Existing list variable name" "Existing list variable name"
#note if tip 615 implemented for 9.1 'first' and 'last' need to accept empty string too
#same for lrange, lreplace, string range, string replace
#if {[package vsatisfies [package provide Tcl] 9.1-]} {
# first -type {indexexpression|literal()}
#} else {
# first -type indexexpression
#}
first -type indexexpression first -type indexexpression
last -type indexexpression last -type indexexpression
value -type any -optional 1 -multiple 1 value -type any -optional 1 -multiple 1
@ -6086,10 +6093,21 @@ tcl::namespace::eval punk::args::moduledoc::tclcore {
If additional index arguments are supplied, then each argument is used in turn If additional index arguments are supplied, then each argument is used in turn
to select an element from the previous indexing operation, allowing the script to select an element from the previous indexing operation, allowing the script
to select elements from sublists." to select elements from sublists."
@form -form separate
@values -min 1 -max -1 @values -min 1 -max -1
list -type list -help\ list -type list -help\
"tcl list as a value" "tcl list as a value"
index -type indexexpression -multiple 1 -optional 1 index -type indexexpression -multiple 1 -optional 1
@form -form combined
@values -min 2 -max 2
list -type list -help\
"tcl list as a value"
#list of indexexpression
indexlist -type list -optional 0 -help\
"list of indexexpressions"
} "@doc -name Manpage: -url [manpage_tcl lindex]"\ } "@doc -name Manpage: -url [manpage_tcl lindex]"\
{ {
@examples -help { @examples -help {

120
src/modules/shellthread-1.6.1.tm → src/bootsupport/modules/shellthread-1.6.2.tm

@ -236,12 +236,10 @@ namespace eval shellthread::worker {
variable logfile variable logfile
variable settings variable settings
set logchunk $msg
if {![dict get $settings -raw]} { if {![dict get $settings -raw]} {
set tail_crlf 0 set logchunk $msg
set tail_lf 0 set le "none"
set tail_cr 0
#for cooked - always remove the trailing newline before splitting.. #for cooked - always remove the trailing newline before splitting..
# #
#note that if we got our data from reading a non-line-buffered binary channel - then this naive line splitting will not split neatly for mixed line-endings. #note that if we got our data from reading a non-line-buffered binary channel - then this naive line splitting will not split neatly for mixed line-endings.
@ -251,20 +249,29 @@ namespace eval shellthread::worker {
#we can always split on \n - and any adjacent \r will be preserved in the rejoin #we can always split on \n - and any adjacent \r will be preserved in the rejoin
set lastchar [string range $logchunk end end] set lastchar [string range $logchunk end end]
if {[string range $logchunk end-1 end] eq "\r\n"} { if {[string range $logchunk end-1 end] eq "\r\n"} {
set tail_crlf 1 set le "crlf"
set logchunk [string range $logchunk 0 end-2] #set logchunk [string range $logchunk 0 end-2]
} else { } else {
if {$lastchar eq "\n"} { if {$lastchar eq "\n"} {
set tail_lf 1 set le "lf"
set logchunk [string range $logchunk 0 end-1] #set logchunk [string range $logchunk 0 end-1]
} elseif {$lastchar eq "\r"} { } elseif {$lastchar eq "\r"} {
#\r line-endings are obsolete..and unlikely... and ugly as they can hide characters on the console. but we'll pass through anyway. #\r as line-endings are obsolete..and unlikely... and ugly as they can hide characters on the console.
set tail_cr 1 #If we're writing log lines to a file, we'll end up appending a \n to a trailing \r
set logchunk [string range $logchunk 0 end-1] #For writing to a syslog target - we'll pass it through as is for the syslog target to display as it wills
set le "cr"
#set logchunk [string range $logchunk 0 end-1]
} else { } else {
#possibly a single line with no linefeed.. or has linefeeds only in the middle #possibly a single line with no linefeed.. or has linefeeds only in the middle
#when writing to syslog we'll pass it through without a trailing linefeed.
#when writing to a file we'll append \n
} }
} }
#split on \n no matter the actual line-ending in use
#shouldn't matter as long as we don't add anything at the end of the line other than the raw data
#ie - don't quote or add spaces
set lines [split $logchunk \n]
set lcount [llength $lines]
if {$ts_sent != 0} { if {$ts_sent != 0} {
set micros [lindex [split [expr {$ts_sent / 1000000.0}] .] end] set micros [lindex [split [expr {$ts_sent / 1000000.0}] .] end]
@ -279,19 +286,12 @@ namespace eval shellthread::worker {
set idtail [string range $client_tid end-8 end] ;#enough for display purposes id - mostly zeros anyway set idtail [string range $client_tid end-8 end] ;#enough for display purposes id - mostly zeros anyway
#set col0 [string repeat " " 9]
#set col1 [string repeat " " 27]
#set col2 [string repeat " " 11]
#set col3 [string repeat " " 22]
##do not columnize the final data column or append to tail - or we could muck up the crlf integrity
#lassign [list [overtype::left $col0 $idtail] [overtype::left $col1 $time_info] [overtype::left $col2 $lagfp] [overtype::left $col3 $source]] c0 c1 c2 c3
set w0 9 set w0 9
set w1 27 set w1 27
set w2 11 set w2 11
set w3 22 ;#review - this can truncate source name without indication tail is missing set w3 22 ;#review - this can truncate source name without indication tail is missing
#do not columnize the final data column or append to tail - or we could muck up the crlf integrity set w4 [expr {1 + ([::tcl::string::length $lcount] *2)}] ;#eg 999/999
#do not columnize the final data column or append anything to end - or we could muck up the crlf integrity
lassign [list \ lassign [list \
[format %-${w0}s $idtail]\ [format %-${w0}s $idtail]\
[format %-${w1}s $time_info]\ [format %-${w1}s $time_info]\
@ -301,53 +301,77 @@ namespace eval shellthread::worker {
set c2_blank [string repeat " " $w2] set c2_blank [string repeat " " $w2]
#split on \n no matter the actual line-ending in use if {[::tcl::string::length $sysloghost_port]} {
#shouldn't matter as long as we don't add anything at the end of the line other than the raw data _initsock
#ie - don't quote or add spaces }
set lines [split $logchunk \n]
set i 1
set outlines [list] set outlines [list]
set lnum 0
foreach ln $lines { foreach ln $lines {
if {$i == 1} { incr lnum
lappend outlines "$c0 $c1 $c2 $c3 $ln" set c4 [format %-${w4}s $lnum/$lcount]
if {$lnum == 1} {
lappend outlines "$c0 $c1 $c2 $c3 $c4 $ln"
} else { } else {
lappend outlines "$c0 $c1 $c2_blank $c3 $ln" lappend outlines "$c0 $c1 $c2_blank $c3 $c4 $ln"
} }
incr i if {[::tcl::string::length $sysloghost_port]} {
#send each line as a separate syslog message
#even if they arrive out of order or interleaved with records from other sources -
#they can be tied together and ordered using id,source, timestamp, n/numlines fields
#we lose information about the line-endings though
catch {puts -nonewline $sock [lindex $outlines end]}
} }
if {$tail_lf} { }
#todo - setting to maintain open filehandle and reduce io.
# possible settings for buffersize - and maybe logrotation, although this could be left to client
#for now - default to safe option of open/close each write despite the overhead.
if {[string length $logfile]} {
switch -- $le {
lf {
set logchunk "[join $outlines \n]\n" set logchunk "[join $outlines \n]\n"
} elseif {$tail_crlf} { }
set logchunk "[join $outlines \r\n]\r\n" crlf {
} elseif {$tail_cr} { #join with \n because we still did split on \n
set logchunk "[join $outlines \r]\r" set logchunk "[join $outlines \n]\r\n"
} else { }
#no trailing linefeed cr {
set logchunk "[join $outlines \n]\r"
}
none {
set logchunk [join $outlines \n] set logchunk [join $outlines \n]
} }
}
#set logchunk "[overtype::left $col0 $idtail] [overtype::left $col1 $time_info] [overtype::left $col2 "+$lagfp"] [overtype::left $col3 $source] $msg" set fd [open $logfile a]
if {$le in {cr none}} {
append logchunk \n
}
puts -nonewline $fd $logchunk
close $fd
} }
} else {
#raw
if {[string length $sysloghost_port]} { if {[string length $sysloghost_port]} {
_initsock _initsock
catch {puts -nonewline $sock $logchunk} catch {puts -nonewline $sock $msg}
} }
#todo - sockets etc?
if {[string length $logfile]} { if {[string length $logfile]} {
#todo - setting to maintain open filehandle and reduce io.
# possible settings for buffersize - and maybe logrotation, although this could be left to client
#for now - default to safe option of open/close each write despite the overhead.
set fd [open $logfile a] set fd [open $logfile a]
chan configure $fd -translation auto -buffering $writebuffering puts -nonewline $fd $msg
#whether line buffered or not - by now our logchunk includes newlines
puts -nonewline $fd $logchunk
close $fd close $fd
} }
} }
#todo - sockets etc?
}
# - withdraw just this client # - withdraw just this client
proc finish {tidclient} { proc finish {tidclient} {
variable client_ids variable client_ids
@ -816,7 +840,7 @@ namespace eval shellthread::manager {
package provide shellthread [namespace eval shellthread { package provide shellthread [namespace eval shellthread {
variable version variable version
set version 1.6.1 set version 1.6.2
}] }]

9470
src/bootsupport/modules/tomlish-1.1.7.tm

File diff suppressed because it is too large Load Diff

853
src/modules/shellthread-999999.0a1.0.tm

@ -0,0 +1,853 @@
#package require logger
package require Thread
namespace eval shellthread {
proc iso8601 {{tsmicros ""}} {
if {$tsmicros eq ""} {
set tsmicros [tcl::clock::microseconds]
} else {
set microsnow [tcl::clock::microseconds]
if {[tcl::string::length $tsmicros] != [tcl::string::length $microsnow]} {
error "iso8601 requires 'clock micros' or empty string to create timestamp"
}
}
set seconds [expr {$tsmicros / 1000000}]
return [tcl::clock::format $seconds -format "%Y-%m-%d_%H-%M-%S"]
}
}
namespace eval shellthread::worker {
variable settings
variable sysloghost_port
variable sock
variable logfile ""
variable fd
variable client_ids [list]
variable ts_start_micros
variable errorlist [list]
variable inpipe ""
proc bgerror {args} {
variable errorlist
lappend errorlist $args
}
proc send_errors_now {tidcli} {
variable errorlist
thread::send -async $tidcli [list shellthread::manager::report_worker_errors [list worker_tid [thread::id] errors $errorlist]]
}
proc add_client_tid {tidcli} {
variable client_ids
if {$tidcli ni $client_ids} {
lappend client_ids $tidcli
}
}
proc init {tidclient start_m settingsdict} {
variable sysloghost_port
variable logfile
variable settings
interp bgerror {} shellthread::worker::bgerror
#package require overtype ;#overtype uses tcllib textutil, punk::char etc - currently too heavyweight in terms of loading time for use in threads.
variable client_ids
variable ts_start_micros
lappend client_ids $tidclient
set ts_start_micros $start_m
set defaults [list -raw 0 -file "" -syslog "" -direction out]
set settings [dict merge $defaults $settingsdict]
set syslog [dict get $settings -syslog]
if {[string length $syslog]} {
lassign [split $syslog :] s_host s_port
set sysloghost_port [list $s_host $s_port]
if {[catch {package require udp} errm]} {
#disable rather than bomb and interfere with any -file being written
#review - log/notify?
set sysloghost_port ""
}
} else {
set sysloghost_port ""
}
set logfile [dict get $settings -file]
}
proc start_pipe_read {source readchan args} {
#assume 1 inpipe for now
variable inpipe
variable sysloghost_port
variable logfile
set defaults [dict create -buffering \uFFFF ]
set opts [dict merge $defaults $args]
if {[dict exists $opts -readbuffering]} {
set readbuffering [dict get $opts -readbuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
#get buffering setting from the channel as it was set prior to thread::transfer
set readbuffering [chan configure $readchan -buffering]
} else {
set readbuffering [dict get $opts -buffering]
chan configure $readchan -buffering $readbuffering
}
}
if {[dict exists $opts -writebuffering]} {
set writebuffering [dict get $opts -writebuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
set writebuffering line
#set writebuffering [chan configure $writechan -buffering]
} else {
set writebuffering [dict get $opts -buffering]
#can configure $writechan -buffering $writebuffering
}
}
chan configure $readchan -translation lf
if {$readchan ni [chan names]} {
error "shellthread::worker::start_pipe_read - inpipe not configured. Use shellthread::manager::set_pipe_read_from_client to thread::transfer the pipe end"
}
set inpipe $readchan
chan configure $readchan -blocking 0
set waitvar ::shellthread::worker::wait($inpipe,[clock micros])
#tcl::chan::fifo2 based pipe seems slower to establish events upon than Memchan
chan event $readchan readable [list ::shellthread::worker::pipe_read $readchan $source $waitvar $readbuffering $writebuffering]
vwait $waitvar
}
proc pipe_read {chan source waitfor readbuffering writebuffering} {
if {$readbuffering eq "line"} {
set chunksize [chan gets $chan chunk]
if {$chunksize >= 0} {
if {![chan eof $chan]} {
::shellthread::worker::log pipe 0 - $source - info $chunk\n $writebuffering
} else {
::shellthread::worker::log pipe 0 - $source - info $chunk $writebuffering
}
}
} else {
set chunk [chan read $chan]
::shellthread::worker::log pipe 0 - $source - info $chunk $writebuffering
}
if {[chan eof $chan]} {
chan event $chan readable {}
set $waitfor "pipe"
chan close $chan
}
}
proc start_pipe_write {source writechan args} {
variable outpipe
set defaults [dict create -buffering \uFFFF ]
set opts [dict merge $defaults $args]
#todo!
set readchan stdin
if {[dict exists $opts -readbuffering]} {
set readbuffering [dict get $opts -readbuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
set readbuffering [chan configure $readchan -buffering]
} else {
set readbuffering [dict get $opts -buffering]
chan configure $readchan -buffering $readbuffering
}
}
if {[dict exists $opts -writebuffering]} {
set writebuffering [dict get $opts -writebuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
#nothing explicitly set - take from transferred channel
set writebuffering [chan configure $writechan -buffering]
} else {
set writebuffering [dict get $opts -buffering]
can configure $writechan -buffering $writebuffering
}
}
if {$writechan ni [chan names]} {
error "shellthread::worker::start_pipe_write - outpipe not configured. Use shellthread::manager::set_pipe_write_to_client to thread::transfer the pipe end"
}
set outpipe $writechan
chan configure $readchan -blocking 0
chan configure $writechan -blocking 0
set waitvar ::shellthread::worker::wait($outpipe,[clock micros])
chan event $readchan readable [list apply {{chan writechan source waitfor readbuffering} {
if {$readbuffering eq "line"} {
set chunksize [chan gets $chan chunk]
if {$chunksize >= 0} {
if {![chan eof $chan]} {
puts $writechan $chunk
} else {
puts -nonewline $writechan $chunk
}
}
} else {
set chunk [chan read $chan]
puts -nonewline $writechan $chunk
}
if {[chan eof $chan]} {
chan event $chan readable {}
set $waitfor "pipe"
chan close $writechan
if {$chan ne "stdin"} {
chan close $chan
}
}
}} $readchan $writechan $source $waitvar $readbuffering]
vwait $waitvar
}
proc _initsock {} {
variable sysloghost_port
variable sock
if {[string length $sysloghost_port]} {
if {[catch {chan configure $sock} state]} {
set sock [udp_open]
chan configure $sock -buffering none -translation binary
chan configure $sock -remote $sysloghost_port
}
}
}
proc _reconnect {} {
variable sock
catch {close $sock}
_initsock
return [chan configure $sock]
}
proc send_info {client_tid ts_sent source msg} {
set ts_received [clock micros]
set lag_micros [expr {$ts_received - $ts_sent}]
set lag [expr {$lag_micros / 1000000.0}] ;#lag as x.xxxxxx seconds
log $client_tid $ts_sent $lag $source - info $msg line 1
}
proc log {client_tid ts_sent lag source service level msg writebuffering {islog 0}} {
variable sock
variable fd
variable sysloghost_port
variable logfile
variable settings
if {![dict get $settings -raw]} {
set logchunk $msg
set le "none"
#for cooked - always remove the trailing newline before splitting..
#
#note that if we got our data from reading a non-line-buffered binary channel - then this naive line splitting will not split neatly for mixed line-endings.
#
#Possibly not critical as cooked is for logging and we are still preserving all \r and \n chars - but review and consider implementing a better split
#but add it back exactly as it was afterwards
#we can always split on \n - and any adjacent \r will be preserved in the rejoin
set lastchar [string range $logchunk end end]
if {[string range $logchunk end-1 end] eq "\r\n"} {
set le "crlf"
#set logchunk [string range $logchunk 0 end-2]
} else {
if {$lastchar eq "\n"} {
set le "lf"
#set logchunk [string range $logchunk 0 end-1]
} elseif {$lastchar eq "\r"} {
#\r as line-endings are obsolete..and unlikely... and ugly as they can hide characters on the console.
#If we're writing log lines to a file, we'll end up appending a \n to a trailing \r
#For writing to a syslog target - we'll pass it through as is for the syslog target to display as it wills
set le "cr"
#set logchunk [string range $logchunk 0 end-1]
} else {
#possibly a single line with no linefeed.. or has linefeeds only in the middle
#when writing to syslog we'll pass it through without a trailing linefeed.
#when writing to a file we'll append \n
}
}
#split on \n no matter the actual line-ending in use
#shouldn't matter as long as we don't add anything at the end of the line other than the raw data
#ie - don't quote or add spaces
set lines [split $logchunk \n]
set lcount [llength $lines]
if {$ts_sent != 0} {
set micros [lindex [split [expr {$ts_sent / 1000000.0}] .] end]
set time_info [::shellthread::iso8601 $ts_sent].$micros
#set time_info "${time_info}+$lag"
set lagfp "+[format %f $lag]"
} else {
#from pipe - no ts_sent/lag info available
set time_info ""
set lagfp ""
}
set idtail [string range $client_tid end-8 end] ;#enough for display purposes id - mostly zeros anyway
set w0 9
set w1 27
set w2 11
set w3 22 ;#review - this can truncate source name without indication tail is missing
set w4 [expr {1 + ([::tcl::string::length $lcount] *2)}] ;#eg 999/999
#do not columnize the final data column or append anything to end - or we could muck up the crlf integrity
lassign [list \
[format %-${w0}s $idtail]\
[format %-${w1}s $time_info]\
[format %-${w2}s $lagfp]\
[format %-${w3}s $source]\
] c0 c1 c2 c3
set c2_blank [string repeat " " $w2]
if {[::tcl::string::length $sysloghost_port]} {
_initsock
}
set outlines [list]
set lnum 0
foreach ln $lines {
incr lnum
set c4 [format %-${w4}s $lnum/$lcount]
if {$lnum == 1} {
lappend outlines "$c0 $c1 $c2 $c3 $c4 $ln"
} else {
lappend outlines "$c0 $c1 $c2_blank $c3 $c4 $ln"
}
if {[::tcl::string::length $sysloghost_port]} {
#send each line as a separate syslog message
#even if they arrive out of order or interleaved with records from other sources -
#they can be tied together and ordered using id,source, timestamp, n/numlines fields
#we lose information about the line-endings though
catch {puts -nonewline $sock [lindex $outlines end]}
}
}
#todo - setting to maintain open filehandle and reduce io.
# possible settings for buffersize - and maybe logrotation, although this could be left to client
#for now - default to safe option of open/close each write despite the overhead.
if {[string length $logfile]} {
switch -- $le {
lf {
set logchunk "[join $outlines \n]\n"
}
crlf {
#join with \n because we still did split on \n
set logchunk "[join $outlines \n]\r\n"
}
cr {
set logchunk "[join $outlines \n]\r"
}
none {
set logchunk [join $outlines \n]
}
}
set fd [open $logfile a]
if {$le in {cr none}} {
append logchunk \n
}
puts -nonewline $fd $logchunk
close $fd
}
} else {
#raw
if {[string length $sysloghost_port]} {
_initsock
catch {puts -nonewline $sock $msg}
}
if {[string length $logfile]} {
set fd [open $logfile a]
puts -nonewline $fd $msg
close $fd
}
}
#todo - sockets etc?
}
# - withdraw just this client
proc finish {tidclient} {
variable client_ids
if {($tidclient in $clientids) && ([llength $clientids] == 1)} {
terminate $tidclient
} else {
set posn [lsearch $client_ids $tidclient]
set client_ids [lreplace $clientids $posn $posn]
}
}
#allow any client to terminate
proc terminate {tidclient} {
variable sock
variable fd
variable client_ids
if {$tidclient in $client_ids} {
catch {close $sock}
catch {close $fd}
set client_ids [list]
#review use of thread::release -wait
#docs indicate deprecated for regular use, and that we should use thread::join
#however.. how can we set a timeout on a thread::join ?
#by telling the thread to release itself - we can wait on the thread::send variable
# This needs review - because it's unclear that -wait even works on self
# (what does it mean to wait for the target thread to exit if the target is self??)
thread::release -wait
return [thread::id]
} else {
return ""
}
}
}
namespace eval shellthread::manager {
variable workers [dict create]
variable worker_errors [list]
variable timeouts
variable free_threads [list]
#variable log_threads
proc dict_getdef {dictValue args} {
if {[llength $args] < 2} {
error {wrong # args: should be "dict_getdef dictValue ?key ...? key default"}
}
set keys [lrange $args 0 end-1]
if {[tcl::dict::exists $dictValue {*}$keys]} {
return [tcl::dict::get $dictValue {*}$keys]
} else {
return [lindex $args end]
}
}
#new datastructure regarding workers and sourcetags required.
#one worker can service multiple sourcetags - but each sourcetag may be used by multiple threads too.
#generally each thread will use a specific sourcetag - but we may have pools doing similar things which log to same destination.
#
#As a convention we may use a sourcetag for the thread which started the worker that isn't actually used for logging - but as a common target for joins
#If the thread which started the thread calls leave_worker with that 'primary' sourcetag it means others won't be able to use that target - which seems reasonable.
#If another thread want's to maintain joinability beyond the span provided by the starting client,
#it can join with both the primary tag and a tag it will actually use for logging.
#A thread can join the logger with any existingtag - not just the 'primary'
#(which is arbitrary anyway. It will usually be the first in the list - but may be unsubscribed by clients and disappear)
proc join_worker {existingtag sourcetaglist} {
set client_tid [thread::id]
#todo - allow a source to piggyback on existing worker by referencing one of the sourcetags already using the worker
}
proc new_pipe_worker {sourcetaglist {settingsdict {}}} {
if {[dict exists $settingsdict -workertype]} {
if {[string tolower [dict get $settingsdict -workertype]] ne "pipe"} {
error "new_pipe_worker error: -workertype ne 'pipe'. Set to 'pipe' or leave empty"
}
}
dict set settingsdict -workertype pipe
new_worker $sourcetaglist $settingsdict
}
#it is up to caller to use a unique sourcetag (e.g by prefixing with own thread::id etc)
# This allows multiple threads to more easily write to the same named sourcetag if necessary
# todo - change sourcetag for a list of tags which will be handled by the same thread. e.g for multiple threads logging to same file
#
# todo - some protection mechanism for case where target is a file to stop creation of multiple worker threads writing to same file.
# Even if we use open fd,close fd wrapped around writes.. it is probably undesirable to have multiple threads with same target
# On the other hand socket targets such as UDP can happily be written to by multiple threads.
# For now the mechanism is that a call to new_worker (rename to open_worker?) will join the same thread if a sourcetag matches.
# but, as sourcetags can get removed(unsubbed via leave_worker) this doesn't guarantee two threads with same -file settings won't fight.
# Also.. the settingsdict is ignored when joining with a tag that exists.. this is problematic.. e.g logrotation where previous file still being written by existing worker
# todo - rename 'sourcetag' concept to 'targettag' ?? the concept is a mixture of both.. it is somewhat analagous to a syslog 'facility'
# probably new_worker should disallow auto-joining and we allow different workers to handle same tags simultaneously to support overlap during logrotation etc.
proc new_worker {sourcetaglist {settingsdict {}}} {
variable workers
set ts_start [clock micros]
set tidclient [thread::id]
set sourcetag [lindex $sourcetaglist 0] ;#todo - use all
set defaults [dict create\
-workertype message\
]
set settingsdict [dict merge $defaults $settingsdict]
set workertype [string tolower [dict get $settingsdict -workertype]]
set known_workertypes [list pipe message]
if {$workertype ni $known_workertypes} {
error "new_worker - unknown -workertype $workertype. Expected one of '$known_workertypes'"
}
if {[dict exists $workers $sourcetag]} {
set winfo [dict get $workers $sourcetag]
if {[dict get $winfo tid] ne "noop" && [thread::exists [dict get $winfo tid]]} {
#add our client-info to existing worker thread
dict lappend winfo list_client_tids $tidclient
dict set workers $sourcetag $winfo ;#writeback
return [dict get $winfo tid]
}
}
#noop fake worker for empty syslog and empty file
if {$workertype eq "message"} {
if {[dict_getdef $settingsdict -syslog ""] eq "" && [dict_getdef $settingsdict -file ""] eq ""} {
set winfo [dict create tid noop list_client_tids [list $tidclient] ts_start $ts_start ts_end_list [list] workertype "message"]
dict set workers $sourcetag $winfo
return noop
}
}
#check if there is an existing unsubscribed thread first
#don't use free_threads for pipe workertype for now..
variable free_threads
if {$workertype ne "pipe"} {
if {[llength $free_threads]} {
#todo - re-use from tail - as most likely to have been doing similar work?? review
set free_threads [lassign $free_threads tidworker]
#todo - keep track of real ts_start of free threads... kill when too old
set winfo [dict create tid $tidworker list_client_tids [list $tidclient] ts_start $ts_start ts_end_list [list] workertype [dict get $settingsdict -workertype]]
#puts stderr "shellfilter::new_worker Re-using free worker thread: $tidworker with tag $sourcetag"
dict set workers $sourcetag $winfo
return $tidworker
}
}
#set ts_start [::shellthread::iso8601]
set tidworker [thread::create -preserved]
set init_script [string map [list %ts_start% $ts_start %mp% [tcl::tm::list] %ap% $::auto_path %tidcli% $tidclient %sd% $settingsdict] {
#set tclbase [file dirname [file dirname [info nameofexecutable]]]
#set tcllib $tclbase/lib
#if {$tcllib ni $::auto_path} {
# lappend ::auto_path $tcllib
#}
set ::settingsinfo [dict create %sd%]
#if the executable running things is something like a tclkit,
# then it's likely we will need to use the caller's auto_path and tcl::tm::list to find things
#The caller can tune the thread's package search by providing a settingsdict
#tcl::tm::add * must add in reverse order to get reulting list in same order as original
if {![dict exists $::settingsinfo tcl_tm_list]} {
#JMN2
::tcl::tm::add {*}[lreverse [list %mp%]]
} else {
tcl::tm::remove {*}[tcl::tm::list]
::tcl::tm::add {*}[lreverse [dict get $::settingsinfo tcl_tm_list]]
}
if {![dict exists $::settingsinfo auto_path]} {
set ::auto_path [list %ap%]
} else {
set ::auto_path [dict get $::settingsinfo auto_path]
}
package require punk::packagepreference
punk::packagepreference::install
package require Thread
package require shellthread
if {![catch {::shellthread::worker::init %tidcli% %ts_start% $::settingsinfo} errmsg]} {
unset ::settingsinfo
set ::shellthread_init "ok"
} else {
unset ::settingsinfo
set ::shellthread_init "err $errmsg"
}
}]
thread::send -async $tidworker $init_script
#thread::send $tidworker $init_script
set winfo [dict create tid $tidworker list_client_tids [list $tidclient] ts_start $ts_start ts_end_list [list]]
dict set workers $sourcetag $winfo
return $tidworker
}
proc set_pipe_read_from_client {tag_pipename worker_tid rchan args} {
variable workers
if {![dict exists $workers $tag_pipename]} {
error "workerthread::manager::set_pipe_read_from_client source/pipename $tag_pipename not found"
}
set match_worker_tid [dict get $workers $tag_pipename tid]
if {$worker_tid ne $match_worker_tid} {
error "workerthread::manager::set_pipe_read_from_client source/pipename $tag_pipename workert_tid mismatch '$worker_tid' vs existing:'$match_worker_tid'"
}
#buffering set during channel creation will be preserved on thread::transfer
thread::transfer $worker_tid $rchan
#start_pipe_read will vwait - so we have to send async
thread::send -async $worker_tid [list ::shellthread::worker::start_pipe_read $tag_pipename $rchan]
#client may start writing immediately - but presumably it will buffer in fifo2
}
proc set_pipe_write_to_client {tag_pipename worker_tid wchan args} {
variable workers
if {![dict exists $workers $tag_pipename]} {
error "workerthread::manager::set_pipe_write_to_client pipename $tag_pipename not found"
}
set match_worker_tid [dict get $workers $tag_pipename tid]
if {$worker_tid ne $match_worker_tid} {
error "workerthread::manager::set_pipe_write_to_client pipename $tag_pipename workert_tid mismatch '$worker_tid' vs existing:'$match_worker_tid'"
}
#buffering set during channel creation will be preserved on thread::transfer
thread::transfer $worker_tid $wchan
thread::send -async $worker_tid [list ::shellthread::worker::start_pipe_write $tag_pipename $wchan]
}
proc write_log {source msg args} {
variable workers
set ts_micros_sent [clock micros]
set defaults [list -async 1 -level info]
set opts [dict merge $defaults $args]
if {[dict exists $workers $source]} {
set tidworker [dict get $workers $source tid]
if {$tidworker eq "noop"} {
return
}
if {![thread::exists $tidworker]} {
# -syslog -file ?
set tidworker [new_worker $source]
}
} else {
#auto create with no requirement to call new_worker.. warn?
# -syslog -file ?
error "write_log no log opened for source: $source"
set tidworker [new_worker $source]
}
set client_tid [thread::id]
if {[dict get $opts -async]} {
thread::send -async $tidworker [list ::shellthread::worker::send_info $client_tid $ts_micros_sent $source $msg]
} else {
thread::send $tidworker [list ::shellthread::worker::send_info $client_tid $ts_micros_sent $source $msg]
}
}
proc report_worker_errors {errdict} {
variable workers
set reporting_tid [dict get $errdict worker_tid]
dict for {src srcinfo} $workers {
if {[dict get $srcinfo tid] eq $reporting_tid} {
dict set srcinfo errors [dict get $errdict errors]
dict set workers $src $srcinfo ;#writeback updated
break
}
}
}
#aka leave_worker
#Note that the tags may be on separate workertids, or some tags may share workertids
proc unsubscribe {sourcetaglist} {
variable workers
#workers structure example:
#[list sourcetag1 [list tid <tidworker> list_client_tids <clients>] ts_start <ts_start> ts_end_list {}]
variable free_threads
set mytid [thread::id] ;#caller of shellthread::manager::xxx is the client thread
set subscriberless_tags [list]
foreach source $sourcetaglist {
if {[dict exists $workers $source]} {
set list_client_tids [dict get $workers $source list_client_tids]
if {[set posn [lsearch $list_client_tids $mytid]] >= 0} {
set list_client_tids [lreplace $list_client_tids $posn $posn]
dict set workers $source list_client_tids $list_client_tids
}
if {![llength $list_client_tids]} {
lappend subscriberless_tags $source
}
}
}
#we've removed our own tid from all the tags - possibly across multiplew workertids, and possibly leaving some workertids with no subscribers for a particular tag - or no subscribers at all.
set subscriberless_workers [list]
set shuttingdown_workers [list]
foreach deadtag $subscriberless_tags {
set workertid [dict get $workers $deadtag tid]
set worker_tags [get_worker_tagstate $workertid]
set subscriber_count 0
set kill_count 0 ;#number of ts_end_list entries - even one indicates thread is doomed
foreach taginfo $worker_tags {
incr subscriber_count [llength [dict get $taginfo list_client_tids]]
incr kill_count [llength [dict get $taginfo ts_end_list]]
}
if {$subscriber_count == 0} {
lappend subscriberless_workers $workertid
}
if {$kill_count > 0} {
lappend shuttingdown_workers $workertid
}
}
#if worker isn't shutting down - add it to free_threads list
foreach workertid $subscriberless_workers {
if {$workertid ni $shuttingdown_workers} {
if {$workertid ni $free_threads && $workertid ne "noop"} {
lappend free_threads $workertid
}
}
}
#todo
#unsub this client_tid from the sourcetags in the sourcetaglist. if no more client_tids exist for sourcetag, remove sourcetag,
#if no more sourcetags - add worker to free_threads
}
proc get_worker_tagstate {workertid} {
variable workers
set taginfo_list [list]
dict for {source sourceinfo} $workers {
if {[dict get $sourceinfo tid] eq $workertid} {
lappend taginfo_list $sourceinfo
}
}
return $taginfo_list
}
#finalisation
proc shutdown_free_threads {{timeout 2500}} {
variable free_threads
if {![llength $free_threads]} {
return
}
upvar ::shellthread::manager::timeouts timeoutarr
if {[info exists timeoutarr(shutdown_free_threads)]} {
#already called
return false
}
#set timeoutarr(shutdown_free_threads) waiting
#after $timeout [list set timeoutarr(shutdown_free_threads) timed-out]
set ::shellthread::waitfor waiting
#after $timeout [list set ::shellthread::waitfor]
#2025-07 timed-out untested review
set cancelid [after $timeout [list set ::shellthread::waitfor timed-out]]
set waiting_for [list]
set ended [list]
set timedout 0
foreach tid $free_threads {
if {[thread::exists $tid]} {
lappend waiting_for $tid
#thread::send -async $tid [list shellthread::worker::terminate [thread::id]] timeoutarr(shutdown_free_threads)
thread::send -async $tid [list shellthread::worker::terminate [thread::id]] ::shellthread::waitfor
}
}
if {[llength $waiting_for]} {
for {set i 0} {$i < [llength $waiting_for]} {incr i} {
vwait ::shellthread::waitfor
if {$::shellthread::waitfor eq "timed-out"} {
set timedout 1
break
} else {
after cancel $cancelid
lappend ended $::shellthread::waitfor
}
}
}
set free_threads [list]
return [dict create existed $waiting_for ended $ended timedout $timedout]
}
#TODO - important.
#REVIEW!
#since moving to the unsubscribe mechansm - close_worker $source isn't being called
# - we need to set a limit to the number of free threads and shut down excess when detected during unsubscription
#instruction to shut-down the thread that has this source.
#instruction to shut-down the thread that has this source.
proc close_worker {source {timeout 2500}} {
variable workers
variable worker_errors
variable free_threads
upvar ::shellthread::manager::timeouts timeoutarr
set ts_now [clock micros]
#puts stderr "close_worker $source"
if {[dict exists $workers $source]} {
set tidworker [dict get $workers $source tid]
if {$tidworker in $freethreads} {
#make sure a thread that is being closed is removed from the free_threads list
set posn [lsearch $freethreads $tidworker]
set freethreads [lreplace $freethreads $posn $posn]
}
set mytid [thread::id]
set client_tids [dict get $workers $source list_client_tids]
if {[set posn [lsearch $client_tids $mytid]] >= 0} {
set client_tids [lreplace $client_tids $posn $posn]
#remove self from list of clients
dict set workers $source list_client_tids $client_tids
}
set ts_end_list [dict get $workers $source ts_end_list] ;#ts_end_list is just a list of timestamps of closing calls for this source - only one is needed to close, but they may all come in a flurry.
if {[llength $ts_end_list]} {
set last_end_ts [lindex $ts_end_list end]
if {(($tsnow - $last_end_ts) / 1000) >= $timeout} {
lappend ts_end_list $ts_now
dict set workers $source ts_end_list $ts_end_list
} else {
#existing close in progress.. assume it will work
return
}
}
if {[thread::exists $tidworker]} {
#puts stderr "shellthread::manager::close_worker: thread $tidworker for source $source still running.. terminating"
#review - timeoutarr is local var (?)
set timeoutarr($source) 0
after $timeout [list set timeoutarr($source) 2]
thread::send -async $tidworker [list shellthread::worker::send_errors_now [thread::id]]
thread::send -async $tidworker [list shellthread::worker::terminate [thread::id]] timeoutarr($source)
#thread::send -async $tidworker [string map [list %tidclient% [thread::id]] {
# shellthread::worker::terminate %tidclient%
#}] timeoutarr($source)
vwait timeoutarr($source)
#puts stderr "shellthread::manager::close_worker: thread $tidworker for source $source DONE1"
thread::release $tidworker
#puts stderr "shellthread::manager::close_worker: thread $tidworker for source $source DONE2"
if {[dict exists $workers $source errors]} {
set errlist [dict get $workers $source errors]
if {[llength $errlist]} {
lappend worker_errors [list $source [dict get $workers $source]]
}
}
dict unset workers $source
} else {
#thread may have been closed by call to close_worker with another source with same worker
#clear workers record for this source
#REVIEW - race condition for re-creation of source with new workerid?
#check that record is subscriberless to avoid this
if {[llength [dict get $workers $source list_client_tids]] == 0} {
dict unset workers $source
}
}
}
#puts stdout "close_worker $source - end"
}
#worker errors only available for a source after close_worker called on that source
#It is possible for there to be multiple entries for a source because new_worker can be called multiple times with same sourcetag,
proc get_and_clear_errors {source} {
variable worker_errors
set source_errors [lsearch -all -inline -index 0 $worker_errors $source]
set worker_errors [lsearch -all -inline -index 0 -not $worker_errors $source]
return $source_errors
}
}
package provide shellthread [namespace eval shellthread {
variable version
set version 999999.0a1.0
}]

3
src/modules/shellthread-buildversion.txt

@ -0,0 +1,3 @@
1.6.2
#First line must be a semantic version number
#all other lines are ignored.

3
src/project_layouts/custom/_project/punk.project-0.1/src/bootsupport/modules/punk/args-0.2.1.tm

@ -6345,7 +6345,8 @@ tcl::namespace::eval punk::args {
} }
} }
indexexpression { indexexpression {
if {[catch {lindex {} $e_check}]} { #tcl 9.1+? tip 615 'string is index'
if {$echeck eq "" || [catch {lindex {} $e_check}]} {
set msg "$argclass $argname for %caller% requires type indexexpression. An index as used in Tcl list commands. Received: '$e_check'" set msg "$argclass $argname for %caller% requires type indexexpression. An index as used in Tcl list commands. Received: '$e_check'"
#return -options [list -code error -errorcode [list PUNKARGS VALIDATION [list typemismatch $type] -badarg $argname -argspecs $argspecs]] $msg #return -options [list -code error -errorcode [list PUNKARGS VALIDATION [list typemismatch $type] -badarg $argname -argspecs $argspecs]] $msg
lset clause_results $c_idx $a_idx [list errorcode [list PUNKARGS VALIDATION [list typemismatch $type] -badarg $argname -argspecs $argspecs] msg $msg] lset clause_results $c_idx $a_idx [list errorcode [list PUNKARGS VALIDATION [list typemismatch $type] -badarg $argname -argspecs $argspecs] msg $msg]

18
src/project_layouts/custom/_project/punk.project-0.1/src/bootsupport/modules/punk/args/moduledoc/tclcore-0.1.0.tm

@ -6020,6 +6020,13 @@ tcl::namespace::eval punk::args::moduledoc::tclcore {
@values -min 3 -max -1 @values -min 3 -max -1
listVar -type string -help\ listVar -type string -help\
"Existing list variable name" "Existing list variable name"
#note if tip 615 implemented for 9.1 'first' and 'last' need to accept empty string too
#same for lrange, lreplace, string range, string replace
#if {[package vsatisfies [package provide Tcl] 9.1-]} {
# first -type {indexexpression|literal()}
#} else {
# first -type indexexpression
#}
first -type indexexpression first -type indexexpression
last -type indexexpression last -type indexexpression
value -type any -optional 1 -multiple 1 value -type any -optional 1 -multiple 1
@ -6086,10 +6093,21 @@ tcl::namespace::eval punk::args::moduledoc::tclcore {
If additional index arguments are supplied, then each argument is used in turn If additional index arguments are supplied, then each argument is used in turn
to select an element from the previous indexing operation, allowing the script to select an element from the previous indexing operation, allowing the script
to select elements from sublists." to select elements from sublists."
@form -form separate
@values -min 1 -max -1 @values -min 1 -max -1
list -type list -help\ list -type list -help\
"tcl list as a value" "tcl list as a value"
index -type indexexpression -multiple 1 -optional 1 index -type indexexpression -multiple 1 -optional 1
@form -form combined
@values -min 2 -max 2
list -type list -help\
"tcl list as a value"
#list of indexexpression
indexlist -type list -optional 0 -help\
"list of indexexpressions"
} "@doc -name Manpage: -url [manpage_tcl lindex]"\ } "@doc -name Manpage: -url [manpage_tcl lindex]"\
{ {
@examples -help { @examples -help {

853
src/project_layouts/custom/_project/punk.project-0.1/src/bootsupport/modules/shellthread-1.6.2.tm

@ -0,0 +1,853 @@
#package require logger
package require Thread
namespace eval shellthread {
proc iso8601 {{tsmicros ""}} {
if {$tsmicros eq ""} {
set tsmicros [tcl::clock::microseconds]
} else {
set microsnow [tcl::clock::microseconds]
if {[tcl::string::length $tsmicros] != [tcl::string::length $microsnow]} {
error "iso8601 requires 'clock micros' or empty string to create timestamp"
}
}
set seconds [expr {$tsmicros / 1000000}]
return [tcl::clock::format $seconds -format "%Y-%m-%d_%H-%M-%S"]
}
}
namespace eval shellthread::worker {
variable settings
variable sysloghost_port
variable sock
variable logfile ""
variable fd
variable client_ids [list]
variable ts_start_micros
variable errorlist [list]
variable inpipe ""
proc bgerror {args} {
variable errorlist
lappend errorlist $args
}
proc send_errors_now {tidcli} {
variable errorlist
thread::send -async $tidcli [list shellthread::manager::report_worker_errors [list worker_tid [thread::id] errors $errorlist]]
}
proc add_client_tid {tidcli} {
variable client_ids
if {$tidcli ni $client_ids} {
lappend client_ids $tidcli
}
}
proc init {tidclient start_m settingsdict} {
variable sysloghost_port
variable logfile
variable settings
interp bgerror {} shellthread::worker::bgerror
#package require overtype ;#overtype uses tcllib textutil, punk::char etc - currently too heavyweight in terms of loading time for use in threads.
variable client_ids
variable ts_start_micros
lappend client_ids $tidclient
set ts_start_micros $start_m
set defaults [list -raw 0 -file "" -syslog "" -direction out]
set settings [dict merge $defaults $settingsdict]
set syslog [dict get $settings -syslog]
if {[string length $syslog]} {
lassign [split $syslog :] s_host s_port
set sysloghost_port [list $s_host $s_port]
if {[catch {package require udp} errm]} {
#disable rather than bomb and interfere with any -file being written
#review - log/notify?
set sysloghost_port ""
}
} else {
set sysloghost_port ""
}
set logfile [dict get $settings -file]
}
proc start_pipe_read {source readchan args} {
#assume 1 inpipe for now
variable inpipe
variable sysloghost_port
variable logfile
set defaults [dict create -buffering \uFFFF ]
set opts [dict merge $defaults $args]
if {[dict exists $opts -readbuffering]} {
set readbuffering [dict get $opts -readbuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
#get buffering setting from the channel as it was set prior to thread::transfer
set readbuffering [chan configure $readchan -buffering]
} else {
set readbuffering [dict get $opts -buffering]
chan configure $readchan -buffering $readbuffering
}
}
if {[dict exists $opts -writebuffering]} {
set writebuffering [dict get $opts -writebuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
set writebuffering line
#set writebuffering [chan configure $writechan -buffering]
} else {
set writebuffering [dict get $opts -buffering]
#can configure $writechan -buffering $writebuffering
}
}
chan configure $readchan -translation lf
if {$readchan ni [chan names]} {
error "shellthread::worker::start_pipe_read - inpipe not configured. Use shellthread::manager::set_pipe_read_from_client to thread::transfer the pipe end"
}
set inpipe $readchan
chan configure $readchan -blocking 0
set waitvar ::shellthread::worker::wait($inpipe,[clock micros])
#tcl::chan::fifo2 based pipe seems slower to establish events upon than Memchan
chan event $readchan readable [list ::shellthread::worker::pipe_read $readchan $source $waitvar $readbuffering $writebuffering]
vwait $waitvar
}
proc pipe_read {chan source waitfor readbuffering writebuffering} {
if {$readbuffering eq "line"} {
set chunksize [chan gets $chan chunk]
if {$chunksize >= 0} {
if {![chan eof $chan]} {
::shellthread::worker::log pipe 0 - $source - info $chunk\n $writebuffering
} else {
::shellthread::worker::log pipe 0 - $source - info $chunk $writebuffering
}
}
} else {
set chunk [chan read $chan]
::shellthread::worker::log pipe 0 - $source - info $chunk $writebuffering
}
if {[chan eof $chan]} {
chan event $chan readable {}
set $waitfor "pipe"
chan close $chan
}
}
proc start_pipe_write {source writechan args} {
variable outpipe
set defaults [dict create -buffering \uFFFF ]
set opts [dict merge $defaults $args]
#todo!
set readchan stdin
if {[dict exists $opts -readbuffering]} {
set readbuffering [dict get $opts -readbuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
set readbuffering [chan configure $readchan -buffering]
} else {
set readbuffering [dict get $opts -buffering]
chan configure $readchan -buffering $readbuffering
}
}
if {[dict exists $opts -writebuffering]} {
set writebuffering [dict get $opts -writebuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
#nothing explicitly set - take from transferred channel
set writebuffering [chan configure $writechan -buffering]
} else {
set writebuffering [dict get $opts -buffering]
can configure $writechan -buffering $writebuffering
}
}
if {$writechan ni [chan names]} {
error "shellthread::worker::start_pipe_write - outpipe not configured. Use shellthread::manager::set_pipe_write_to_client to thread::transfer the pipe end"
}
set outpipe $writechan
chan configure $readchan -blocking 0
chan configure $writechan -blocking 0
set waitvar ::shellthread::worker::wait($outpipe,[clock micros])
chan event $readchan readable [list apply {{chan writechan source waitfor readbuffering} {
if {$readbuffering eq "line"} {
set chunksize [chan gets $chan chunk]
if {$chunksize >= 0} {
if {![chan eof $chan]} {
puts $writechan $chunk
} else {
puts -nonewline $writechan $chunk
}
}
} else {
set chunk [chan read $chan]
puts -nonewline $writechan $chunk
}
if {[chan eof $chan]} {
chan event $chan readable {}
set $waitfor "pipe"
chan close $writechan
if {$chan ne "stdin"} {
chan close $chan
}
}
}} $readchan $writechan $source $waitvar $readbuffering]
vwait $waitvar
}
proc _initsock {} {
variable sysloghost_port
variable sock
if {[string length $sysloghost_port]} {
if {[catch {chan configure $sock} state]} {
set sock [udp_open]
chan configure $sock -buffering none -translation binary
chan configure $sock -remote $sysloghost_port
}
}
}
proc _reconnect {} {
variable sock
catch {close $sock}
_initsock
return [chan configure $sock]
}
proc send_info {client_tid ts_sent source msg} {
set ts_received [clock micros]
set lag_micros [expr {$ts_received - $ts_sent}]
set lag [expr {$lag_micros / 1000000.0}] ;#lag as x.xxxxxx seconds
log $client_tid $ts_sent $lag $source - info $msg line 1
}
proc log {client_tid ts_sent lag source service level msg writebuffering {islog 0}} {
variable sock
variable fd
variable sysloghost_port
variable logfile
variable settings
if {![dict get $settings -raw]} {
set logchunk $msg
set le "none"
#for cooked - always remove the trailing newline before splitting..
#
#note that if we got our data from reading a non-line-buffered binary channel - then this naive line splitting will not split neatly for mixed line-endings.
#
#Possibly not critical as cooked is for logging and we are still preserving all \r and \n chars - but review and consider implementing a better split
#but add it back exactly as it was afterwards
#we can always split on \n - and any adjacent \r will be preserved in the rejoin
set lastchar [string range $logchunk end end]
if {[string range $logchunk end-1 end] eq "\r\n"} {
set le "crlf"
#set logchunk [string range $logchunk 0 end-2]
} else {
if {$lastchar eq "\n"} {
set le "lf"
#set logchunk [string range $logchunk 0 end-1]
} elseif {$lastchar eq "\r"} {
#\r as line-endings are obsolete..and unlikely... and ugly as they can hide characters on the console.
#If we're writing log lines to a file, we'll end up appending a \n to a trailing \r
#For writing to a syslog target - we'll pass it through as is for the syslog target to display as it wills
set le "cr"
#set logchunk [string range $logchunk 0 end-1]
} else {
#possibly a single line with no linefeed.. or has linefeeds only in the middle
#when writing to syslog we'll pass it through without a trailing linefeed.
#when writing to a file we'll append \n
}
}
#split on \n no matter the actual line-ending in use
#shouldn't matter as long as we don't add anything at the end of the line other than the raw data
#ie - don't quote or add spaces
set lines [split $logchunk \n]
set lcount [llength $lines]
if {$ts_sent != 0} {
set micros [lindex [split [expr {$ts_sent / 1000000.0}] .] end]
set time_info [::shellthread::iso8601 $ts_sent].$micros
#set time_info "${time_info}+$lag"
set lagfp "+[format %f $lag]"
} else {
#from pipe - no ts_sent/lag info available
set time_info ""
set lagfp ""
}
set idtail [string range $client_tid end-8 end] ;#enough for display purposes id - mostly zeros anyway
set w0 9
set w1 27
set w2 11
set w3 22 ;#review - this can truncate source name without indication tail is missing
set w4 [expr {1 + ([::tcl::string::length $lcount] *2)}] ;#eg 999/999
#do not columnize the final data column or append anything to end - or we could muck up the crlf integrity
lassign [list \
[format %-${w0}s $idtail]\
[format %-${w1}s $time_info]\
[format %-${w2}s $lagfp]\
[format %-${w3}s $source]\
] c0 c1 c2 c3
set c2_blank [string repeat " " $w2]
if {[::tcl::string::length $sysloghost_port]} {
_initsock
}
set outlines [list]
set lnum 0
foreach ln $lines {
incr lnum
set c4 [format %-${w4}s $lnum/$lcount]
if {$lnum == 1} {
lappend outlines "$c0 $c1 $c2 $c3 $c4 $ln"
} else {
lappend outlines "$c0 $c1 $c2_blank $c3 $c4 $ln"
}
if {[::tcl::string::length $sysloghost_port]} {
#send each line as a separate syslog message
#even if they arrive out of order or interleaved with records from other sources -
#they can be tied together and ordered using id,source, timestamp, n/numlines fields
#we lose information about the line-endings though
catch {puts -nonewline $sock [lindex $outlines end]}
}
}
#todo - setting to maintain open filehandle and reduce io.
# possible settings for buffersize - and maybe logrotation, although this could be left to client
#for now - default to safe option of open/close each write despite the overhead.
if {[string length $logfile]} {
switch -- $le {
lf {
set logchunk "[join $outlines \n]\n"
}
crlf {
#join with \n because we still did split on \n
set logchunk "[join $outlines \n]\r\n"
}
cr {
set logchunk "[join $outlines \n]\r"
}
none {
set logchunk [join $outlines \n]
}
}
set fd [open $logfile a]
if {$le in {cr none}} {
append logchunk \n
}
puts -nonewline $fd $logchunk
close $fd
}
} else {
#raw
if {[string length $sysloghost_port]} {
_initsock
catch {puts -nonewline $sock $msg}
}
if {[string length $logfile]} {
set fd [open $logfile a]
puts -nonewline $fd $msg
close $fd
}
}
#todo - sockets etc?
}
# - withdraw just this client
proc finish {tidclient} {
variable client_ids
if {($tidclient in $clientids) && ([llength $clientids] == 1)} {
terminate $tidclient
} else {
set posn [lsearch $client_ids $tidclient]
set client_ids [lreplace $clientids $posn $posn]
}
}
#allow any client to terminate
proc terminate {tidclient} {
variable sock
variable fd
variable client_ids
if {$tidclient in $client_ids} {
catch {close $sock}
catch {close $fd}
set client_ids [list]
#review use of thread::release -wait
#docs indicate deprecated for regular use, and that we should use thread::join
#however.. how can we set a timeout on a thread::join ?
#by telling the thread to release itself - we can wait on the thread::send variable
# This needs review - because it's unclear that -wait even works on self
# (what does it mean to wait for the target thread to exit if the target is self??)
thread::release -wait
return [thread::id]
} else {
return ""
}
}
}
namespace eval shellthread::manager {
variable workers [dict create]
variable worker_errors [list]
variable timeouts
variable free_threads [list]
#variable log_threads
proc dict_getdef {dictValue args} {
if {[llength $args] < 2} {
error {wrong # args: should be "dict_getdef dictValue ?key ...? key default"}
}
set keys [lrange $args 0 end-1]
if {[tcl::dict::exists $dictValue {*}$keys]} {
return [tcl::dict::get $dictValue {*}$keys]
} else {
return [lindex $args end]
}
}
#new datastructure regarding workers and sourcetags required.
#one worker can service multiple sourcetags - but each sourcetag may be used by multiple threads too.
#generally each thread will use a specific sourcetag - but we may have pools doing similar things which log to same destination.
#
#As a convention we may use a sourcetag for the thread which started the worker that isn't actually used for logging - but as a common target for joins
#If the thread which started the thread calls leave_worker with that 'primary' sourcetag it means others won't be able to use that target - which seems reasonable.
#If another thread want's to maintain joinability beyond the span provided by the starting client,
#it can join with both the primary tag and a tag it will actually use for logging.
#A thread can join the logger with any existingtag - not just the 'primary'
#(which is arbitrary anyway. It will usually be the first in the list - but may be unsubscribed by clients and disappear)
proc join_worker {existingtag sourcetaglist} {
set client_tid [thread::id]
#todo - allow a source to piggyback on existing worker by referencing one of the sourcetags already using the worker
}
proc new_pipe_worker {sourcetaglist {settingsdict {}}} {
if {[dict exists $settingsdict -workertype]} {
if {[string tolower [dict get $settingsdict -workertype]] ne "pipe"} {
error "new_pipe_worker error: -workertype ne 'pipe'. Set to 'pipe' or leave empty"
}
}
dict set settingsdict -workertype pipe
new_worker $sourcetaglist $settingsdict
}
#it is up to caller to use a unique sourcetag (e.g by prefixing with own thread::id etc)
# This allows multiple threads to more easily write to the same named sourcetag if necessary
# todo - change sourcetag for a list of tags which will be handled by the same thread. e.g for multiple threads logging to same file
#
# todo - some protection mechanism for case where target is a file to stop creation of multiple worker threads writing to same file.
# Even if we use open fd,close fd wrapped around writes.. it is probably undesirable to have multiple threads with same target
# On the other hand socket targets such as UDP can happily be written to by multiple threads.
# For now the mechanism is that a call to new_worker (rename to open_worker?) will join the same thread if a sourcetag matches.
# but, as sourcetags can get removed(unsubbed via leave_worker) this doesn't guarantee two threads with same -file settings won't fight.
# Also.. the settingsdict is ignored when joining with a tag that exists.. this is problematic.. e.g logrotation where previous file still being written by existing worker
# todo - rename 'sourcetag' concept to 'targettag' ?? the concept is a mixture of both.. it is somewhat analagous to a syslog 'facility'
# probably new_worker should disallow auto-joining and we allow different workers to handle same tags simultaneously to support overlap during logrotation etc.
proc new_worker {sourcetaglist {settingsdict {}}} {
variable workers
set ts_start [clock micros]
set tidclient [thread::id]
set sourcetag [lindex $sourcetaglist 0] ;#todo - use all
set defaults [dict create\
-workertype message\
]
set settingsdict [dict merge $defaults $settingsdict]
set workertype [string tolower [dict get $settingsdict -workertype]]
set known_workertypes [list pipe message]
if {$workertype ni $known_workertypes} {
error "new_worker - unknown -workertype $workertype. Expected one of '$known_workertypes'"
}
if {[dict exists $workers $sourcetag]} {
set winfo [dict get $workers $sourcetag]
if {[dict get $winfo tid] ne "noop" && [thread::exists [dict get $winfo tid]]} {
#add our client-info to existing worker thread
dict lappend winfo list_client_tids $tidclient
dict set workers $sourcetag $winfo ;#writeback
return [dict get $winfo tid]
}
}
#noop fake worker for empty syslog and empty file
if {$workertype eq "message"} {
if {[dict_getdef $settingsdict -syslog ""] eq "" && [dict_getdef $settingsdict -file ""] eq ""} {
set winfo [dict create tid noop list_client_tids [list $tidclient] ts_start $ts_start ts_end_list [list] workertype "message"]
dict set workers $sourcetag $winfo
return noop
}
}
#check if there is an existing unsubscribed thread first
#don't use free_threads for pipe workertype for now..
variable free_threads
if {$workertype ne "pipe"} {
if {[llength $free_threads]} {
#todo - re-use from tail - as most likely to have been doing similar work?? review
set free_threads [lassign $free_threads tidworker]
#todo - keep track of real ts_start of free threads... kill when too old
set winfo [dict create tid $tidworker list_client_tids [list $tidclient] ts_start $ts_start ts_end_list [list] workertype [dict get $settingsdict -workertype]]
#puts stderr "shellfilter::new_worker Re-using free worker thread: $tidworker with tag $sourcetag"
dict set workers $sourcetag $winfo
return $tidworker
}
}
#set ts_start [::shellthread::iso8601]
set tidworker [thread::create -preserved]
set init_script [string map [list %ts_start% $ts_start %mp% [tcl::tm::list] %ap% $::auto_path %tidcli% $tidclient %sd% $settingsdict] {
#set tclbase [file dirname [file dirname [info nameofexecutable]]]
#set tcllib $tclbase/lib
#if {$tcllib ni $::auto_path} {
# lappend ::auto_path $tcllib
#}
set ::settingsinfo [dict create %sd%]
#if the executable running things is something like a tclkit,
# then it's likely we will need to use the caller's auto_path and tcl::tm::list to find things
#The caller can tune the thread's package search by providing a settingsdict
#tcl::tm::add * must add in reverse order to get reulting list in same order as original
if {![dict exists $::settingsinfo tcl_tm_list]} {
#JMN2
::tcl::tm::add {*}[lreverse [list %mp%]]
} else {
tcl::tm::remove {*}[tcl::tm::list]
::tcl::tm::add {*}[lreverse [dict get $::settingsinfo tcl_tm_list]]
}
if {![dict exists $::settingsinfo auto_path]} {
set ::auto_path [list %ap%]
} else {
set ::auto_path [dict get $::settingsinfo auto_path]
}
package require punk::packagepreference
punk::packagepreference::install
package require Thread
package require shellthread
if {![catch {::shellthread::worker::init %tidcli% %ts_start% $::settingsinfo} errmsg]} {
unset ::settingsinfo
set ::shellthread_init "ok"
} else {
unset ::settingsinfo
set ::shellthread_init "err $errmsg"
}
}]
thread::send -async $tidworker $init_script
#thread::send $tidworker $init_script
set winfo [dict create tid $tidworker list_client_tids [list $tidclient] ts_start $ts_start ts_end_list [list]]
dict set workers $sourcetag $winfo
return $tidworker
}
proc set_pipe_read_from_client {tag_pipename worker_tid rchan args} {
variable workers
if {![dict exists $workers $tag_pipename]} {
error "workerthread::manager::set_pipe_read_from_client source/pipename $tag_pipename not found"
}
set match_worker_tid [dict get $workers $tag_pipename tid]
if {$worker_tid ne $match_worker_tid} {
error "workerthread::manager::set_pipe_read_from_client source/pipename $tag_pipename workert_tid mismatch '$worker_tid' vs existing:'$match_worker_tid'"
}
#buffering set during channel creation will be preserved on thread::transfer
thread::transfer $worker_tid $rchan
#start_pipe_read will vwait - so we have to send async
thread::send -async $worker_tid [list ::shellthread::worker::start_pipe_read $tag_pipename $rchan]
#client may start writing immediately - but presumably it will buffer in fifo2
}
proc set_pipe_write_to_client {tag_pipename worker_tid wchan args} {
variable workers
if {![dict exists $workers $tag_pipename]} {
error "workerthread::manager::set_pipe_write_to_client pipename $tag_pipename not found"
}
set match_worker_tid [dict get $workers $tag_pipename tid]
if {$worker_tid ne $match_worker_tid} {
error "workerthread::manager::set_pipe_write_to_client pipename $tag_pipename workert_tid mismatch '$worker_tid' vs existing:'$match_worker_tid'"
}
#buffering set during channel creation will be preserved on thread::transfer
thread::transfer $worker_tid $wchan
thread::send -async $worker_tid [list ::shellthread::worker::start_pipe_write $tag_pipename $wchan]
}
proc write_log {source msg args} {
variable workers
set ts_micros_sent [clock micros]
set defaults [list -async 1 -level info]
set opts [dict merge $defaults $args]
if {[dict exists $workers $source]} {
set tidworker [dict get $workers $source tid]
if {$tidworker eq "noop"} {
return
}
if {![thread::exists $tidworker]} {
# -syslog -file ?
set tidworker [new_worker $source]
}
} else {
#auto create with no requirement to call new_worker.. warn?
# -syslog -file ?
error "write_log no log opened for source: $source"
set tidworker [new_worker $source]
}
set client_tid [thread::id]
if {[dict get $opts -async]} {
thread::send -async $tidworker [list ::shellthread::worker::send_info $client_tid $ts_micros_sent $source $msg]
} else {
thread::send $tidworker [list ::shellthread::worker::send_info $client_tid $ts_micros_sent $source $msg]
}
}
proc report_worker_errors {errdict} {
variable workers
set reporting_tid [dict get $errdict worker_tid]
dict for {src srcinfo} $workers {
if {[dict get $srcinfo tid] eq $reporting_tid} {
dict set srcinfo errors [dict get $errdict errors]
dict set workers $src $srcinfo ;#writeback updated
break
}
}
}
#aka leave_worker
#Note that the tags may be on separate workertids, or some tags may share workertids
proc unsubscribe {sourcetaglist} {
variable workers
#workers structure example:
#[list sourcetag1 [list tid <tidworker> list_client_tids <clients>] ts_start <ts_start> ts_end_list {}]
variable free_threads
set mytid [thread::id] ;#caller of shellthread::manager::xxx is the client thread
set subscriberless_tags [list]
foreach source $sourcetaglist {
if {[dict exists $workers $source]} {
set list_client_tids [dict get $workers $source list_client_tids]
if {[set posn [lsearch $list_client_tids $mytid]] >= 0} {
set list_client_tids [lreplace $list_client_tids $posn $posn]
dict set workers $source list_client_tids $list_client_tids
}
if {![llength $list_client_tids]} {
lappend subscriberless_tags $source
}
}
}
#we've removed our own tid from all the tags - possibly across multiplew workertids, and possibly leaving some workertids with no subscribers for a particular tag - or no subscribers at all.
set subscriberless_workers [list]
set shuttingdown_workers [list]
foreach deadtag $subscriberless_tags {
set workertid [dict get $workers $deadtag tid]
set worker_tags [get_worker_tagstate $workertid]
set subscriber_count 0
set kill_count 0 ;#number of ts_end_list entries - even one indicates thread is doomed
foreach taginfo $worker_tags {
incr subscriber_count [llength [dict get $taginfo list_client_tids]]
incr kill_count [llength [dict get $taginfo ts_end_list]]
}
if {$subscriber_count == 0} {
lappend subscriberless_workers $workertid
}
if {$kill_count > 0} {
lappend shuttingdown_workers $workertid
}
}
#if worker isn't shutting down - add it to free_threads list
foreach workertid $subscriberless_workers {
if {$workertid ni $shuttingdown_workers} {
if {$workertid ni $free_threads && $workertid ne "noop"} {
lappend free_threads $workertid
}
}
}
#todo
#unsub this client_tid from the sourcetags in the sourcetaglist. if no more client_tids exist for sourcetag, remove sourcetag,
#if no more sourcetags - add worker to free_threads
}
proc get_worker_tagstate {workertid} {
variable workers
set taginfo_list [list]
dict for {source sourceinfo} $workers {
if {[dict get $sourceinfo tid] eq $workertid} {
lappend taginfo_list $sourceinfo
}
}
return $taginfo_list
}
#finalisation
proc shutdown_free_threads {{timeout 2500}} {
variable free_threads
if {![llength $free_threads]} {
return
}
upvar ::shellthread::manager::timeouts timeoutarr
if {[info exists timeoutarr(shutdown_free_threads)]} {
#already called
return false
}
#set timeoutarr(shutdown_free_threads) waiting
#after $timeout [list set timeoutarr(shutdown_free_threads) timed-out]
set ::shellthread::waitfor waiting
#after $timeout [list set ::shellthread::waitfor]
#2025-07 timed-out untested review
set cancelid [after $timeout [list set ::shellthread::waitfor timed-out]]
set waiting_for [list]
set ended [list]
set timedout 0
foreach tid $free_threads {
if {[thread::exists $tid]} {
lappend waiting_for $tid
#thread::send -async $tid [list shellthread::worker::terminate [thread::id]] timeoutarr(shutdown_free_threads)
thread::send -async $tid [list shellthread::worker::terminate [thread::id]] ::shellthread::waitfor
}
}
if {[llength $waiting_for]} {
for {set i 0} {$i < [llength $waiting_for]} {incr i} {
vwait ::shellthread::waitfor
if {$::shellthread::waitfor eq "timed-out"} {
set timedout 1
break
} else {
after cancel $cancelid
lappend ended $::shellthread::waitfor
}
}
}
set free_threads [list]
return [dict create existed $waiting_for ended $ended timedout $timedout]
}
#TODO - important.
#REVIEW!
#since moving to the unsubscribe mechansm - close_worker $source isn't being called
# - we need to set a limit to the number of free threads and shut down excess when detected during unsubscription
#instruction to shut-down the thread that has this source.
#instruction to shut-down the thread that has this source.
proc close_worker {source {timeout 2500}} {
variable workers
variable worker_errors
variable free_threads
upvar ::shellthread::manager::timeouts timeoutarr
set ts_now [clock micros]
#puts stderr "close_worker $source"
if {[dict exists $workers $source]} {
set tidworker [dict get $workers $source tid]
if {$tidworker in $freethreads} {
#make sure a thread that is being closed is removed from the free_threads list
set posn [lsearch $freethreads $tidworker]
set freethreads [lreplace $freethreads $posn $posn]
}
set mytid [thread::id]
set client_tids [dict get $workers $source list_client_tids]
if {[set posn [lsearch $client_tids $mytid]] >= 0} {
set client_tids [lreplace $client_tids $posn $posn]
#remove self from list of clients
dict set workers $source list_client_tids $client_tids
}
set ts_end_list [dict get $workers $source ts_end_list] ;#ts_end_list is just a list of timestamps of closing calls for this source - only one is needed to close, but they may all come in a flurry.
if {[llength $ts_end_list]} {
set last_end_ts [lindex $ts_end_list end]
if {(($tsnow - $last_end_ts) / 1000) >= $timeout} {
lappend ts_end_list $ts_now
dict set workers $source ts_end_list $ts_end_list
} else {
#existing close in progress.. assume it will work
return
}
}
if {[thread::exists $tidworker]} {
#puts stderr "shellthread::manager::close_worker: thread $tidworker for source $source still running.. terminating"
#review - timeoutarr is local var (?)
set timeoutarr($source) 0
after $timeout [list set timeoutarr($source) 2]
thread::send -async $tidworker [list shellthread::worker::send_errors_now [thread::id]]
thread::send -async $tidworker [list shellthread::worker::terminate [thread::id]] timeoutarr($source)
#thread::send -async $tidworker [string map [list %tidclient% [thread::id]] {
# shellthread::worker::terminate %tidclient%
#}] timeoutarr($source)
vwait timeoutarr($source)
#puts stderr "shellthread::manager::close_worker: thread $tidworker for source $source DONE1"
thread::release $tidworker
#puts stderr "shellthread::manager::close_worker: thread $tidworker for source $source DONE2"
if {[dict exists $workers $source errors]} {
set errlist [dict get $workers $source errors]
if {[llength $errlist]} {
lappend worker_errors [list $source [dict get $workers $source]]
}
}
dict unset workers $source
} else {
#thread may have been closed by call to close_worker with another source with same worker
#clear workers record for this source
#REVIEW - race condition for re-creation of source with new workerid?
#check that record is subscriberless to avoid this
if {[llength [dict get $workers $source list_client_tids]] == 0} {
dict unset workers $source
}
}
}
#puts stdout "close_worker $source - end"
}
#worker errors only available for a source after close_worker called on that source
#It is possible for there to be multiple entries for a source because new_worker can be called multiple times with same sourcetag,
proc get_and_clear_errors {source} {
variable worker_errors
set source_errors [lsearch -all -inline -index 0 $worker_errors $source]
set worker_errors [lsearch -all -inline -index 0 -not $worker_errors $source]
return $source_errors
}
}
package provide shellthread [namespace eval shellthread {
variable version
set version 1.6.2
}]

9470
src/project_layouts/custom/_project/punk.project-0.1/src/bootsupport/modules/tomlish-1.1.7.tm

File diff suppressed because it is too large Load Diff

3
src/project_layouts/custom/_project/punk.shell-0.1/src/bootsupport/modules/punk/args-0.2.1.tm

@ -6345,7 +6345,8 @@ tcl::namespace::eval punk::args {
} }
} }
indexexpression { indexexpression {
if {[catch {lindex {} $e_check}]} { #tcl 9.1+? tip 615 'string is index'
if {$echeck eq "" || [catch {lindex {} $e_check}]} {
set msg "$argclass $argname for %caller% requires type indexexpression. An index as used in Tcl list commands. Received: '$e_check'" set msg "$argclass $argname for %caller% requires type indexexpression. An index as used in Tcl list commands. Received: '$e_check'"
#return -options [list -code error -errorcode [list PUNKARGS VALIDATION [list typemismatch $type] -badarg $argname -argspecs $argspecs]] $msg #return -options [list -code error -errorcode [list PUNKARGS VALIDATION [list typemismatch $type] -badarg $argname -argspecs $argspecs]] $msg
lset clause_results $c_idx $a_idx [list errorcode [list PUNKARGS VALIDATION [list typemismatch $type] -badarg $argname -argspecs $argspecs] msg $msg] lset clause_results $c_idx $a_idx [list errorcode [list PUNKARGS VALIDATION [list typemismatch $type] -badarg $argname -argspecs $argspecs] msg $msg]

18
src/project_layouts/custom/_project/punk.shell-0.1/src/bootsupport/modules/punk/args/moduledoc/tclcore-0.1.0.tm

@ -6020,6 +6020,13 @@ tcl::namespace::eval punk::args::moduledoc::tclcore {
@values -min 3 -max -1 @values -min 3 -max -1
listVar -type string -help\ listVar -type string -help\
"Existing list variable name" "Existing list variable name"
#note if tip 615 implemented for 9.1 'first' and 'last' need to accept empty string too
#same for lrange, lreplace, string range, string replace
#if {[package vsatisfies [package provide Tcl] 9.1-]} {
# first -type {indexexpression|literal()}
#} else {
# first -type indexexpression
#}
first -type indexexpression first -type indexexpression
last -type indexexpression last -type indexexpression
value -type any -optional 1 -multiple 1 value -type any -optional 1 -multiple 1
@ -6086,10 +6093,21 @@ tcl::namespace::eval punk::args::moduledoc::tclcore {
If additional index arguments are supplied, then each argument is used in turn If additional index arguments are supplied, then each argument is used in turn
to select an element from the previous indexing operation, allowing the script to select an element from the previous indexing operation, allowing the script
to select elements from sublists." to select elements from sublists."
@form -form separate
@values -min 1 -max -1 @values -min 1 -max -1
list -type list -help\ list -type list -help\
"tcl list as a value" "tcl list as a value"
index -type indexexpression -multiple 1 -optional 1 index -type indexexpression -multiple 1 -optional 1
@form -form combined
@values -min 2 -max 2
list -type list -help\
"tcl list as a value"
#list of indexexpression
indexlist -type list -optional 0 -help\
"list of indexexpressions"
} "@doc -name Manpage: -url [manpage_tcl lindex]"\ } "@doc -name Manpage: -url [manpage_tcl lindex]"\
{ {
@examples -help { @examples -help {

853
src/project_layouts/custom/_project/punk.shell-0.1/src/bootsupport/modules/shellthread-1.6.2.tm

@ -0,0 +1,853 @@
#package require logger
package require Thread
namespace eval shellthread {
proc iso8601 {{tsmicros ""}} {
if {$tsmicros eq ""} {
set tsmicros [tcl::clock::microseconds]
} else {
set microsnow [tcl::clock::microseconds]
if {[tcl::string::length $tsmicros] != [tcl::string::length $microsnow]} {
error "iso8601 requires 'clock micros' or empty string to create timestamp"
}
}
set seconds [expr {$tsmicros / 1000000}]
return [tcl::clock::format $seconds -format "%Y-%m-%d_%H-%M-%S"]
}
}
namespace eval shellthread::worker {
variable settings
variable sysloghost_port
variable sock
variable logfile ""
variable fd
variable client_ids [list]
variable ts_start_micros
variable errorlist [list]
variable inpipe ""
proc bgerror {args} {
variable errorlist
lappend errorlist $args
}
proc send_errors_now {tidcli} {
variable errorlist
thread::send -async $tidcli [list shellthread::manager::report_worker_errors [list worker_tid [thread::id] errors $errorlist]]
}
proc add_client_tid {tidcli} {
variable client_ids
if {$tidcli ni $client_ids} {
lappend client_ids $tidcli
}
}
proc init {tidclient start_m settingsdict} {
variable sysloghost_port
variable logfile
variable settings
interp bgerror {} shellthread::worker::bgerror
#package require overtype ;#overtype uses tcllib textutil, punk::char etc - currently too heavyweight in terms of loading time for use in threads.
variable client_ids
variable ts_start_micros
lappend client_ids $tidclient
set ts_start_micros $start_m
set defaults [list -raw 0 -file "" -syslog "" -direction out]
set settings [dict merge $defaults $settingsdict]
set syslog [dict get $settings -syslog]
if {[string length $syslog]} {
lassign [split $syslog :] s_host s_port
set sysloghost_port [list $s_host $s_port]
if {[catch {package require udp} errm]} {
#disable rather than bomb and interfere with any -file being written
#review - log/notify?
set sysloghost_port ""
}
} else {
set sysloghost_port ""
}
set logfile [dict get $settings -file]
}
proc start_pipe_read {source readchan args} {
#assume 1 inpipe for now
variable inpipe
variable sysloghost_port
variable logfile
set defaults [dict create -buffering \uFFFF ]
set opts [dict merge $defaults $args]
if {[dict exists $opts -readbuffering]} {
set readbuffering [dict get $opts -readbuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
#get buffering setting from the channel as it was set prior to thread::transfer
set readbuffering [chan configure $readchan -buffering]
} else {
set readbuffering [dict get $opts -buffering]
chan configure $readchan -buffering $readbuffering
}
}
if {[dict exists $opts -writebuffering]} {
set writebuffering [dict get $opts -writebuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
set writebuffering line
#set writebuffering [chan configure $writechan -buffering]
} else {
set writebuffering [dict get $opts -buffering]
#can configure $writechan -buffering $writebuffering
}
}
chan configure $readchan -translation lf
if {$readchan ni [chan names]} {
error "shellthread::worker::start_pipe_read - inpipe not configured. Use shellthread::manager::set_pipe_read_from_client to thread::transfer the pipe end"
}
set inpipe $readchan
chan configure $readchan -blocking 0
set waitvar ::shellthread::worker::wait($inpipe,[clock micros])
#tcl::chan::fifo2 based pipe seems slower to establish events upon than Memchan
chan event $readchan readable [list ::shellthread::worker::pipe_read $readchan $source $waitvar $readbuffering $writebuffering]
vwait $waitvar
}
proc pipe_read {chan source waitfor readbuffering writebuffering} {
if {$readbuffering eq "line"} {
set chunksize [chan gets $chan chunk]
if {$chunksize >= 0} {
if {![chan eof $chan]} {
::shellthread::worker::log pipe 0 - $source - info $chunk\n $writebuffering
} else {
::shellthread::worker::log pipe 0 - $source - info $chunk $writebuffering
}
}
} else {
set chunk [chan read $chan]
::shellthread::worker::log pipe 0 - $source - info $chunk $writebuffering
}
if {[chan eof $chan]} {
chan event $chan readable {}
set $waitfor "pipe"
chan close $chan
}
}
proc start_pipe_write {source writechan args} {
variable outpipe
set defaults [dict create -buffering \uFFFF ]
set opts [dict merge $defaults $args]
#todo!
set readchan stdin
if {[dict exists $opts -readbuffering]} {
set readbuffering [dict get $opts -readbuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
set readbuffering [chan configure $readchan -buffering]
} else {
set readbuffering [dict get $opts -buffering]
chan configure $readchan -buffering $readbuffering
}
}
if {[dict exists $opts -writebuffering]} {
set writebuffering [dict get $opts -writebuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
#nothing explicitly set - take from transferred channel
set writebuffering [chan configure $writechan -buffering]
} else {
set writebuffering [dict get $opts -buffering]
can configure $writechan -buffering $writebuffering
}
}
if {$writechan ni [chan names]} {
error "shellthread::worker::start_pipe_write - outpipe not configured. Use shellthread::manager::set_pipe_write_to_client to thread::transfer the pipe end"
}
set outpipe $writechan
chan configure $readchan -blocking 0
chan configure $writechan -blocking 0
set waitvar ::shellthread::worker::wait($outpipe,[clock micros])
chan event $readchan readable [list apply {{chan writechan source waitfor readbuffering} {
if {$readbuffering eq "line"} {
set chunksize [chan gets $chan chunk]
if {$chunksize >= 0} {
if {![chan eof $chan]} {
puts $writechan $chunk
} else {
puts -nonewline $writechan $chunk
}
}
} else {
set chunk [chan read $chan]
puts -nonewline $writechan $chunk
}
if {[chan eof $chan]} {
chan event $chan readable {}
set $waitfor "pipe"
chan close $writechan
if {$chan ne "stdin"} {
chan close $chan
}
}
}} $readchan $writechan $source $waitvar $readbuffering]
vwait $waitvar
}
proc _initsock {} {
variable sysloghost_port
variable sock
if {[string length $sysloghost_port]} {
if {[catch {chan configure $sock} state]} {
set sock [udp_open]
chan configure $sock -buffering none -translation binary
chan configure $sock -remote $sysloghost_port
}
}
}
proc _reconnect {} {
variable sock
catch {close $sock}
_initsock
return [chan configure $sock]
}
proc send_info {client_tid ts_sent source msg} {
set ts_received [clock micros]
set lag_micros [expr {$ts_received - $ts_sent}]
set lag [expr {$lag_micros / 1000000.0}] ;#lag as x.xxxxxx seconds
log $client_tid $ts_sent $lag $source - info $msg line 1
}
proc log {client_tid ts_sent lag source service level msg writebuffering {islog 0}} {
variable sock
variable fd
variable sysloghost_port
variable logfile
variable settings
if {![dict get $settings -raw]} {
set logchunk $msg
set le "none"
#for cooked - always remove the trailing newline before splitting..
#
#note that if we got our data from reading a non-line-buffered binary channel - then this naive line splitting will not split neatly for mixed line-endings.
#
#Possibly not critical as cooked is for logging and we are still preserving all \r and \n chars - but review and consider implementing a better split
#but add it back exactly as it was afterwards
#we can always split on \n - and any adjacent \r will be preserved in the rejoin
set lastchar [string range $logchunk end end]
if {[string range $logchunk end-1 end] eq "\r\n"} {
set le "crlf"
#set logchunk [string range $logchunk 0 end-2]
} else {
if {$lastchar eq "\n"} {
set le "lf"
#set logchunk [string range $logchunk 0 end-1]
} elseif {$lastchar eq "\r"} {
#\r as line-endings are obsolete..and unlikely... and ugly as they can hide characters on the console.
#If we're writing log lines to a file, we'll end up appending a \n to a trailing \r
#For writing to a syslog target - we'll pass it through as is for the syslog target to display as it wills
set le "cr"
#set logchunk [string range $logchunk 0 end-1]
} else {
#possibly a single line with no linefeed.. or has linefeeds only in the middle
#when writing to syslog we'll pass it through without a trailing linefeed.
#when writing to a file we'll append \n
}
}
#split on \n no matter the actual line-ending in use
#shouldn't matter as long as we don't add anything at the end of the line other than the raw data
#ie - don't quote or add spaces
set lines [split $logchunk \n]
set lcount [llength $lines]
if {$ts_sent != 0} {
set micros [lindex [split [expr {$ts_sent / 1000000.0}] .] end]
set time_info [::shellthread::iso8601 $ts_sent].$micros
#set time_info "${time_info}+$lag"
set lagfp "+[format %f $lag]"
} else {
#from pipe - no ts_sent/lag info available
set time_info ""
set lagfp ""
}
set idtail [string range $client_tid end-8 end] ;#enough for display purposes id - mostly zeros anyway
set w0 9
set w1 27
set w2 11
set w3 22 ;#review - this can truncate source name without indication tail is missing
set w4 [expr {1 + ([::tcl::string::length $lcount] *2)}] ;#eg 999/999
#do not columnize the final data column or append anything to end - or we could muck up the crlf integrity
lassign [list \
[format %-${w0}s $idtail]\
[format %-${w1}s $time_info]\
[format %-${w2}s $lagfp]\
[format %-${w3}s $source]\
] c0 c1 c2 c3
set c2_blank [string repeat " " $w2]
if {[::tcl::string::length $sysloghost_port]} {
_initsock
}
set outlines [list]
set lnum 0
foreach ln $lines {
incr lnum
set c4 [format %-${w4}s $lnum/$lcount]
if {$lnum == 1} {
lappend outlines "$c0 $c1 $c2 $c3 $c4 $ln"
} else {
lappend outlines "$c0 $c1 $c2_blank $c3 $c4 $ln"
}
if {[::tcl::string::length $sysloghost_port]} {
#send each line as a separate syslog message
#even if they arrive out of order or interleaved with records from other sources -
#they can be tied together and ordered using id,source, timestamp, n/numlines fields
#we lose information about the line-endings though
catch {puts -nonewline $sock [lindex $outlines end]}
}
}
#todo - setting to maintain open filehandle and reduce io.
# possible settings for buffersize - and maybe logrotation, although this could be left to client
#for now - default to safe option of open/close each write despite the overhead.
if {[string length $logfile]} {
switch -- $le {
lf {
set logchunk "[join $outlines \n]\n"
}
crlf {
#join with \n because we still did split on \n
set logchunk "[join $outlines \n]\r\n"
}
cr {
set logchunk "[join $outlines \n]\r"
}
none {
set logchunk [join $outlines \n]
}
}
set fd [open $logfile a]
if {$le in {cr none}} {
append logchunk \n
}
puts -nonewline $fd $logchunk
close $fd
}
} else {
#raw
if {[string length $sysloghost_port]} {
_initsock
catch {puts -nonewline $sock $msg}
}
if {[string length $logfile]} {
set fd [open $logfile a]
puts -nonewline $fd $msg
close $fd
}
}
#todo - sockets etc?
}
# - withdraw just this client
proc finish {tidclient} {
variable client_ids
if {($tidclient in $clientids) && ([llength $clientids] == 1)} {
terminate $tidclient
} else {
set posn [lsearch $client_ids $tidclient]
set client_ids [lreplace $clientids $posn $posn]
}
}
#allow any client to terminate
proc terminate {tidclient} {
variable sock
variable fd
variable client_ids
if {$tidclient in $client_ids} {
catch {close $sock}
catch {close $fd}
set client_ids [list]
#review use of thread::release -wait
#docs indicate deprecated for regular use, and that we should use thread::join
#however.. how can we set a timeout on a thread::join ?
#by telling the thread to release itself - we can wait on the thread::send variable
# This needs review - because it's unclear that -wait even works on self
# (what does it mean to wait for the target thread to exit if the target is self??)
thread::release -wait
return [thread::id]
} else {
return ""
}
}
}
namespace eval shellthread::manager {
variable workers [dict create]
variable worker_errors [list]
variable timeouts
variable free_threads [list]
#variable log_threads
proc dict_getdef {dictValue args} {
if {[llength $args] < 2} {
error {wrong # args: should be "dict_getdef dictValue ?key ...? key default"}
}
set keys [lrange $args 0 end-1]
if {[tcl::dict::exists $dictValue {*}$keys]} {
return [tcl::dict::get $dictValue {*}$keys]
} else {
return [lindex $args end]
}
}
#new datastructure regarding workers and sourcetags required.
#one worker can service multiple sourcetags - but each sourcetag may be used by multiple threads too.
#generally each thread will use a specific sourcetag - but we may have pools doing similar things which log to same destination.
#
#As a convention we may use a sourcetag for the thread which started the worker that isn't actually used for logging - but as a common target for joins
#If the thread which started the thread calls leave_worker with that 'primary' sourcetag it means others won't be able to use that target - which seems reasonable.
#If another thread want's to maintain joinability beyond the span provided by the starting client,
#it can join with both the primary tag and a tag it will actually use for logging.
#A thread can join the logger with any existingtag - not just the 'primary'
#(which is arbitrary anyway. It will usually be the first in the list - but may be unsubscribed by clients and disappear)
proc join_worker {existingtag sourcetaglist} {
set client_tid [thread::id]
#todo - allow a source to piggyback on existing worker by referencing one of the sourcetags already using the worker
}
proc new_pipe_worker {sourcetaglist {settingsdict {}}} {
if {[dict exists $settingsdict -workertype]} {
if {[string tolower [dict get $settingsdict -workertype]] ne "pipe"} {
error "new_pipe_worker error: -workertype ne 'pipe'. Set to 'pipe' or leave empty"
}
}
dict set settingsdict -workertype pipe
new_worker $sourcetaglist $settingsdict
}
#it is up to caller to use a unique sourcetag (e.g by prefixing with own thread::id etc)
# This allows multiple threads to more easily write to the same named sourcetag if necessary
# todo - change sourcetag for a list of tags which will be handled by the same thread. e.g for multiple threads logging to same file
#
# todo - some protection mechanism for case where target is a file to stop creation of multiple worker threads writing to same file.
# Even if we use open fd,close fd wrapped around writes.. it is probably undesirable to have multiple threads with same target
# On the other hand socket targets such as UDP can happily be written to by multiple threads.
# For now the mechanism is that a call to new_worker (rename to open_worker?) will join the same thread if a sourcetag matches.
# but, as sourcetags can get removed(unsubbed via leave_worker) this doesn't guarantee two threads with same -file settings won't fight.
# Also.. the settingsdict is ignored when joining with a tag that exists.. this is problematic.. e.g logrotation where previous file still being written by existing worker
# todo - rename 'sourcetag' concept to 'targettag' ?? the concept is a mixture of both.. it is somewhat analagous to a syslog 'facility'
# probably new_worker should disallow auto-joining and we allow different workers to handle same tags simultaneously to support overlap during logrotation etc.
proc new_worker {sourcetaglist {settingsdict {}}} {
variable workers
set ts_start [clock micros]
set tidclient [thread::id]
set sourcetag [lindex $sourcetaglist 0] ;#todo - use all
set defaults [dict create\
-workertype message\
]
set settingsdict [dict merge $defaults $settingsdict]
set workertype [string tolower [dict get $settingsdict -workertype]]
set known_workertypes [list pipe message]
if {$workertype ni $known_workertypes} {
error "new_worker - unknown -workertype $workertype. Expected one of '$known_workertypes'"
}
if {[dict exists $workers $sourcetag]} {
set winfo [dict get $workers $sourcetag]
if {[dict get $winfo tid] ne "noop" && [thread::exists [dict get $winfo tid]]} {
#add our client-info to existing worker thread
dict lappend winfo list_client_tids $tidclient
dict set workers $sourcetag $winfo ;#writeback
return [dict get $winfo tid]
}
}
#noop fake worker for empty syslog and empty file
if {$workertype eq "message"} {
if {[dict_getdef $settingsdict -syslog ""] eq "" && [dict_getdef $settingsdict -file ""] eq ""} {
set winfo [dict create tid noop list_client_tids [list $tidclient] ts_start $ts_start ts_end_list [list] workertype "message"]
dict set workers $sourcetag $winfo
return noop
}
}
#check if there is an existing unsubscribed thread first
#don't use free_threads for pipe workertype for now..
variable free_threads
if {$workertype ne "pipe"} {
if {[llength $free_threads]} {
#todo - re-use from tail - as most likely to have been doing similar work?? review
set free_threads [lassign $free_threads tidworker]
#todo - keep track of real ts_start of free threads... kill when too old
set winfo [dict create tid $tidworker list_client_tids [list $tidclient] ts_start $ts_start ts_end_list [list] workertype [dict get $settingsdict -workertype]]
#puts stderr "shellfilter::new_worker Re-using free worker thread: $tidworker with tag $sourcetag"
dict set workers $sourcetag $winfo
return $tidworker
}
}
#set ts_start [::shellthread::iso8601]
set tidworker [thread::create -preserved]
set init_script [string map [list %ts_start% $ts_start %mp% [tcl::tm::list] %ap% $::auto_path %tidcli% $tidclient %sd% $settingsdict] {
#set tclbase [file dirname [file dirname [info nameofexecutable]]]
#set tcllib $tclbase/lib
#if {$tcllib ni $::auto_path} {
# lappend ::auto_path $tcllib
#}
set ::settingsinfo [dict create %sd%]
#if the executable running things is something like a tclkit,
# then it's likely we will need to use the caller's auto_path and tcl::tm::list to find things
#The caller can tune the thread's package search by providing a settingsdict
#tcl::tm::add * must add in reverse order to get reulting list in same order as original
if {![dict exists $::settingsinfo tcl_tm_list]} {
#JMN2
::tcl::tm::add {*}[lreverse [list %mp%]]
} else {
tcl::tm::remove {*}[tcl::tm::list]
::tcl::tm::add {*}[lreverse [dict get $::settingsinfo tcl_tm_list]]
}
if {![dict exists $::settingsinfo auto_path]} {
set ::auto_path [list %ap%]
} else {
set ::auto_path [dict get $::settingsinfo auto_path]
}
package require punk::packagepreference
punk::packagepreference::install
package require Thread
package require shellthread
if {![catch {::shellthread::worker::init %tidcli% %ts_start% $::settingsinfo} errmsg]} {
unset ::settingsinfo
set ::shellthread_init "ok"
} else {
unset ::settingsinfo
set ::shellthread_init "err $errmsg"
}
}]
thread::send -async $tidworker $init_script
#thread::send $tidworker $init_script
set winfo [dict create tid $tidworker list_client_tids [list $tidclient] ts_start $ts_start ts_end_list [list]]
dict set workers $sourcetag $winfo
return $tidworker
}
proc set_pipe_read_from_client {tag_pipename worker_tid rchan args} {
variable workers
if {![dict exists $workers $tag_pipename]} {
error "workerthread::manager::set_pipe_read_from_client source/pipename $tag_pipename not found"
}
set match_worker_tid [dict get $workers $tag_pipename tid]
if {$worker_tid ne $match_worker_tid} {
error "workerthread::manager::set_pipe_read_from_client source/pipename $tag_pipename workert_tid mismatch '$worker_tid' vs existing:'$match_worker_tid'"
}
#buffering set during channel creation will be preserved on thread::transfer
thread::transfer $worker_tid $rchan
#start_pipe_read will vwait - so we have to send async
thread::send -async $worker_tid [list ::shellthread::worker::start_pipe_read $tag_pipename $rchan]
#client may start writing immediately - but presumably it will buffer in fifo2
}
proc set_pipe_write_to_client {tag_pipename worker_tid wchan args} {
variable workers
if {![dict exists $workers $tag_pipename]} {
error "workerthread::manager::set_pipe_write_to_client pipename $tag_pipename not found"
}
set match_worker_tid [dict get $workers $tag_pipename tid]
if {$worker_tid ne $match_worker_tid} {
error "workerthread::manager::set_pipe_write_to_client pipename $tag_pipename workert_tid mismatch '$worker_tid' vs existing:'$match_worker_tid'"
}
#buffering set during channel creation will be preserved on thread::transfer
thread::transfer $worker_tid $wchan
thread::send -async $worker_tid [list ::shellthread::worker::start_pipe_write $tag_pipename $wchan]
}
proc write_log {source msg args} {
variable workers
set ts_micros_sent [clock micros]
set defaults [list -async 1 -level info]
set opts [dict merge $defaults $args]
if {[dict exists $workers $source]} {
set tidworker [dict get $workers $source tid]
if {$tidworker eq "noop"} {
return
}
if {![thread::exists $tidworker]} {
# -syslog -file ?
set tidworker [new_worker $source]
}
} else {
#auto create with no requirement to call new_worker.. warn?
# -syslog -file ?
error "write_log no log opened for source: $source"
set tidworker [new_worker $source]
}
set client_tid [thread::id]
if {[dict get $opts -async]} {
thread::send -async $tidworker [list ::shellthread::worker::send_info $client_tid $ts_micros_sent $source $msg]
} else {
thread::send $tidworker [list ::shellthread::worker::send_info $client_tid $ts_micros_sent $source $msg]
}
}
proc report_worker_errors {errdict} {
variable workers
set reporting_tid [dict get $errdict worker_tid]
dict for {src srcinfo} $workers {
if {[dict get $srcinfo tid] eq $reporting_tid} {
dict set srcinfo errors [dict get $errdict errors]
dict set workers $src $srcinfo ;#writeback updated
break
}
}
}
#aka leave_worker
#Note that the tags may be on separate workertids, or some tags may share workertids
proc unsubscribe {sourcetaglist} {
variable workers
#workers structure example:
#[list sourcetag1 [list tid <tidworker> list_client_tids <clients>] ts_start <ts_start> ts_end_list {}]
variable free_threads
set mytid [thread::id] ;#caller of shellthread::manager::xxx is the client thread
set subscriberless_tags [list]
foreach source $sourcetaglist {
if {[dict exists $workers $source]} {
set list_client_tids [dict get $workers $source list_client_tids]
if {[set posn [lsearch $list_client_tids $mytid]] >= 0} {
set list_client_tids [lreplace $list_client_tids $posn $posn]
dict set workers $source list_client_tids $list_client_tids
}
if {![llength $list_client_tids]} {
lappend subscriberless_tags $source
}
}
}
#we've removed our own tid from all the tags - possibly across multiplew workertids, and possibly leaving some workertids with no subscribers for a particular tag - or no subscribers at all.
set subscriberless_workers [list]
set shuttingdown_workers [list]
foreach deadtag $subscriberless_tags {
set workertid [dict get $workers $deadtag tid]
set worker_tags [get_worker_tagstate $workertid]
set subscriber_count 0
set kill_count 0 ;#number of ts_end_list entries - even one indicates thread is doomed
foreach taginfo $worker_tags {
incr subscriber_count [llength [dict get $taginfo list_client_tids]]
incr kill_count [llength [dict get $taginfo ts_end_list]]
}
if {$subscriber_count == 0} {
lappend subscriberless_workers $workertid
}
if {$kill_count > 0} {
lappend shuttingdown_workers $workertid
}
}
#if worker isn't shutting down - add it to free_threads list
foreach workertid $subscriberless_workers {
if {$workertid ni $shuttingdown_workers} {
if {$workertid ni $free_threads && $workertid ne "noop"} {
lappend free_threads $workertid
}
}
}
#todo
#unsub this client_tid from the sourcetags in the sourcetaglist. if no more client_tids exist for sourcetag, remove sourcetag,
#if no more sourcetags - add worker to free_threads
}
proc get_worker_tagstate {workertid} {
variable workers
set taginfo_list [list]
dict for {source sourceinfo} $workers {
if {[dict get $sourceinfo tid] eq $workertid} {
lappend taginfo_list $sourceinfo
}
}
return $taginfo_list
}
#finalisation
proc shutdown_free_threads {{timeout 2500}} {
variable free_threads
if {![llength $free_threads]} {
return
}
upvar ::shellthread::manager::timeouts timeoutarr
if {[info exists timeoutarr(shutdown_free_threads)]} {
#already called
return false
}
#set timeoutarr(shutdown_free_threads) waiting
#after $timeout [list set timeoutarr(shutdown_free_threads) timed-out]
set ::shellthread::waitfor waiting
#after $timeout [list set ::shellthread::waitfor]
#2025-07 timed-out untested review
set cancelid [after $timeout [list set ::shellthread::waitfor timed-out]]
set waiting_for [list]
set ended [list]
set timedout 0
foreach tid $free_threads {
if {[thread::exists $tid]} {
lappend waiting_for $tid
#thread::send -async $tid [list shellthread::worker::terminate [thread::id]] timeoutarr(shutdown_free_threads)
thread::send -async $tid [list shellthread::worker::terminate [thread::id]] ::shellthread::waitfor
}
}
if {[llength $waiting_for]} {
for {set i 0} {$i < [llength $waiting_for]} {incr i} {
vwait ::shellthread::waitfor
if {$::shellthread::waitfor eq "timed-out"} {
set timedout 1
break
} else {
after cancel $cancelid
lappend ended $::shellthread::waitfor
}
}
}
set free_threads [list]
return [dict create existed $waiting_for ended $ended timedout $timedout]
}
#TODO - important.
#REVIEW!
#since moving to the unsubscribe mechansm - close_worker $source isn't being called
# - we need to set a limit to the number of free threads and shut down excess when detected during unsubscription
#instruction to shut-down the thread that has this source.
#instruction to shut-down the thread that has this source.
proc close_worker {source {timeout 2500}} {
variable workers
variable worker_errors
variable free_threads
upvar ::shellthread::manager::timeouts timeoutarr
set ts_now [clock micros]
#puts stderr "close_worker $source"
if {[dict exists $workers $source]} {
set tidworker [dict get $workers $source tid]
if {$tidworker in $freethreads} {
#make sure a thread that is being closed is removed from the free_threads list
set posn [lsearch $freethreads $tidworker]
set freethreads [lreplace $freethreads $posn $posn]
}
set mytid [thread::id]
set client_tids [dict get $workers $source list_client_tids]
if {[set posn [lsearch $client_tids $mytid]] >= 0} {
set client_tids [lreplace $client_tids $posn $posn]
#remove self from list of clients
dict set workers $source list_client_tids $client_tids
}
set ts_end_list [dict get $workers $source ts_end_list] ;#ts_end_list is just a list of timestamps of closing calls for this source - only one is needed to close, but they may all come in a flurry.
if {[llength $ts_end_list]} {
set last_end_ts [lindex $ts_end_list end]
if {(($tsnow - $last_end_ts) / 1000) >= $timeout} {
lappend ts_end_list $ts_now
dict set workers $source ts_end_list $ts_end_list
} else {
#existing close in progress.. assume it will work
return
}
}
if {[thread::exists $tidworker]} {
#puts stderr "shellthread::manager::close_worker: thread $tidworker for source $source still running.. terminating"
#review - timeoutarr is local var (?)
set timeoutarr($source) 0
after $timeout [list set timeoutarr($source) 2]
thread::send -async $tidworker [list shellthread::worker::send_errors_now [thread::id]]
thread::send -async $tidworker [list shellthread::worker::terminate [thread::id]] timeoutarr($source)
#thread::send -async $tidworker [string map [list %tidclient% [thread::id]] {
# shellthread::worker::terminate %tidclient%
#}] timeoutarr($source)
vwait timeoutarr($source)
#puts stderr "shellthread::manager::close_worker: thread $tidworker for source $source DONE1"
thread::release $tidworker
#puts stderr "shellthread::manager::close_worker: thread $tidworker for source $source DONE2"
if {[dict exists $workers $source errors]} {
set errlist [dict get $workers $source errors]
if {[llength $errlist]} {
lappend worker_errors [list $source [dict get $workers $source]]
}
}
dict unset workers $source
} else {
#thread may have been closed by call to close_worker with another source with same worker
#clear workers record for this source
#REVIEW - race condition for re-creation of source with new workerid?
#check that record is subscriberless to avoid this
if {[llength [dict get $workers $source list_client_tids]] == 0} {
dict unset workers $source
}
}
}
#puts stdout "close_worker $source - end"
}
#worker errors only available for a source after close_worker called on that source
#It is possible for there to be multiple entries for a source because new_worker can be called multiple times with same sourcetag,
proc get_and_clear_errors {source} {
variable worker_errors
set source_errors [lsearch -all -inline -index 0 $worker_errors $source]
set worker_errors [lsearch -all -inline -index 0 -not $worker_errors $source]
return $source_errors
}
}
package provide shellthread [namespace eval shellthread {
variable version
set version 1.6.2
}]

9470
src/project_layouts/custom/_project/punk.shell-0.1/src/bootsupport/modules/tomlish-1.1.7.tm

File diff suppressed because it is too large Load Diff

853
src/vfs/_vfscommon.vfs/modules/shellthread-1.6.2.tm

@ -0,0 +1,853 @@
#package require logger
package require Thread
namespace eval shellthread {
proc iso8601 {{tsmicros ""}} {
if {$tsmicros eq ""} {
set tsmicros [tcl::clock::microseconds]
} else {
set microsnow [tcl::clock::microseconds]
if {[tcl::string::length $tsmicros] != [tcl::string::length $microsnow]} {
error "iso8601 requires 'clock micros' or empty string to create timestamp"
}
}
set seconds [expr {$tsmicros / 1000000}]
return [tcl::clock::format $seconds -format "%Y-%m-%d_%H-%M-%S"]
}
}
namespace eval shellthread::worker {
variable settings
variable sysloghost_port
variable sock
variable logfile ""
variable fd
variable client_ids [list]
variable ts_start_micros
variable errorlist [list]
variable inpipe ""
proc bgerror {args} {
variable errorlist
lappend errorlist $args
}
proc send_errors_now {tidcli} {
variable errorlist
thread::send -async $tidcli [list shellthread::manager::report_worker_errors [list worker_tid [thread::id] errors $errorlist]]
}
proc add_client_tid {tidcli} {
variable client_ids
if {$tidcli ni $client_ids} {
lappend client_ids $tidcli
}
}
proc init {tidclient start_m settingsdict} {
variable sysloghost_port
variable logfile
variable settings
interp bgerror {} shellthread::worker::bgerror
#package require overtype ;#overtype uses tcllib textutil, punk::char etc - currently too heavyweight in terms of loading time for use in threads.
variable client_ids
variable ts_start_micros
lappend client_ids $tidclient
set ts_start_micros $start_m
set defaults [list -raw 0 -file "" -syslog "" -direction out]
set settings [dict merge $defaults $settingsdict]
set syslog [dict get $settings -syslog]
if {[string length $syslog]} {
lassign [split $syslog :] s_host s_port
set sysloghost_port [list $s_host $s_port]
if {[catch {package require udp} errm]} {
#disable rather than bomb and interfere with any -file being written
#review - log/notify?
set sysloghost_port ""
}
} else {
set sysloghost_port ""
}
set logfile [dict get $settings -file]
}
proc start_pipe_read {source readchan args} {
#assume 1 inpipe for now
variable inpipe
variable sysloghost_port
variable logfile
set defaults [dict create -buffering \uFFFF ]
set opts [dict merge $defaults $args]
if {[dict exists $opts -readbuffering]} {
set readbuffering [dict get $opts -readbuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
#get buffering setting from the channel as it was set prior to thread::transfer
set readbuffering [chan configure $readchan -buffering]
} else {
set readbuffering [dict get $opts -buffering]
chan configure $readchan -buffering $readbuffering
}
}
if {[dict exists $opts -writebuffering]} {
set writebuffering [dict get $opts -writebuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
set writebuffering line
#set writebuffering [chan configure $writechan -buffering]
} else {
set writebuffering [dict get $opts -buffering]
#can configure $writechan -buffering $writebuffering
}
}
chan configure $readchan -translation lf
if {$readchan ni [chan names]} {
error "shellthread::worker::start_pipe_read - inpipe not configured. Use shellthread::manager::set_pipe_read_from_client to thread::transfer the pipe end"
}
set inpipe $readchan
chan configure $readchan -blocking 0
set waitvar ::shellthread::worker::wait($inpipe,[clock micros])
#tcl::chan::fifo2 based pipe seems slower to establish events upon than Memchan
chan event $readchan readable [list ::shellthread::worker::pipe_read $readchan $source $waitvar $readbuffering $writebuffering]
vwait $waitvar
}
proc pipe_read {chan source waitfor readbuffering writebuffering} {
if {$readbuffering eq "line"} {
set chunksize [chan gets $chan chunk]
if {$chunksize >= 0} {
if {![chan eof $chan]} {
::shellthread::worker::log pipe 0 - $source - info $chunk\n $writebuffering
} else {
::shellthread::worker::log pipe 0 - $source - info $chunk $writebuffering
}
}
} else {
set chunk [chan read $chan]
::shellthread::worker::log pipe 0 - $source - info $chunk $writebuffering
}
if {[chan eof $chan]} {
chan event $chan readable {}
set $waitfor "pipe"
chan close $chan
}
}
proc start_pipe_write {source writechan args} {
variable outpipe
set defaults [dict create -buffering \uFFFF ]
set opts [dict merge $defaults $args]
#todo!
set readchan stdin
if {[dict exists $opts -readbuffering]} {
set readbuffering [dict get $opts -readbuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
set readbuffering [chan configure $readchan -buffering]
} else {
set readbuffering [dict get $opts -buffering]
chan configure $readchan -buffering $readbuffering
}
}
if {[dict exists $opts -writebuffering]} {
set writebuffering [dict get $opts -writebuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
#nothing explicitly set - take from transferred channel
set writebuffering [chan configure $writechan -buffering]
} else {
set writebuffering [dict get $opts -buffering]
can configure $writechan -buffering $writebuffering
}
}
if {$writechan ni [chan names]} {
error "shellthread::worker::start_pipe_write - outpipe not configured. Use shellthread::manager::set_pipe_write_to_client to thread::transfer the pipe end"
}
set outpipe $writechan
chan configure $readchan -blocking 0
chan configure $writechan -blocking 0
set waitvar ::shellthread::worker::wait($outpipe,[clock micros])
chan event $readchan readable [list apply {{chan writechan source waitfor readbuffering} {
if {$readbuffering eq "line"} {
set chunksize [chan gets $chan chunk]
if {$chunksize >= 0} {
if {![chan eof $chan]} {
puts $writechan $chunk
} else {
puts -nonewline $writechan $chunk
}
}
} else {
set chunk [chan read $chan]
puts -nonewline $writechan $chunk
}
if {[chan eof $chan]} {
chan event $chan readable {}
set $waitfor "pipe"
chan close $writechan
if {$chan ne "stdin"} {
chan close $chan
}
}
}} $readchan $writechan $source $waitvar $readbuffering]
vwait $waitvar
}
proc _initsock {} {
variable sysloghost_port
variable sock
if {[string length $sysloghost_port]} {
if {[catch {chan configure $sock} state]} {
set sock [udp_open]
chan configure $sock -buffering none -translation binary
chan configure $sock -remote $sysloghost_port
}
}
}
proc _reconnect {} {
variable sock
catch {close $sock}
_initsock
return [chan configure $sock]
}
proc send_info {client_tid ts_sent source msg} {
set ts_received [clock micros]
set lag_micros [expr {$ts_received - $ts_sent}]
set lag [expr {$lag_micros / 1000000.0}] ;#lag as x.xxxxxx seconds
log $client_tid $ts_sent $lag $source - info $msg line 1
}
proc log {client_tid ts_sent lag source service level msg writebuffering {islog 0}} {
variable sock
variable fd
variable sysloghost_port
variable logfile
variable settings
if {![dict get $settings -raw]} {
set logchunk $msg
set le "none"
#for cooked - always remove the trailing newline before splitting..
#
#note that if we got our data from reading a non-line-buffered binary channel - then this naive line splitting will not split neatly for mixed line-endings.
#
#Possibly not critical as cooked is for logging and we are still preserving all \r and \n chars - but review and consider implementing a better split
#but add it back exactly as it was afterwards
#we can always split on \n - and any adjacent \r will be preserved in the rejoin
set lastchar [string range $logchunk end end]
if {[string range $logchunk end-1 end] eq "\r\n"} {
set le "crlf"
#set logchunk [string range $logchunk 0 end-2]
} else {
if {$lastchar eq "\n"} {
set le "lf"
#set logchunk [string range $logchunk 0 end-1]
} elseif {$lastchar eq "\r"} {
#\r as line-endings are obsolete..and unlikely... and ugly as they can hide characters on the console.
#If we're writing log lines to a file, we'll end up appending a \n to a trailing \r
#For writing to a syslog target - we'll pass it through as is for the syslog target to display as it wills
set le "cr"
#set logchunk [string range $logchunk 0 end-1]
} else {
#possibly a single line with no linefeed.. or has linefeeds only in the middle
#when writing to syslog we'll pass it through without a trailing linefeed.
#when writing to a file we'll append \n
}
}
#split on \n no matter the actual line-ending in use
#shouldn't matter as long as we don't add anything at the end of the line other than the raw data
#ie - don't quote or add spaces
set lines [split $logchunk \n]
set lcount [llength $lines]
if {$ts_sent != 0} {
set micros [lindex [split [expr {$ts_sent / 1000000.0}] .] end]
set time_info [::shellthread::iso8601 $ts_sent].$micros
#set time_info "${time_info}+$lag"
set lagfp "+[format %f $lag]"
} else {
#from pipe - no ts_sent/lag info available
set time_info ""
set lagfp ""
}
set idtail [string range $client_tid end-8 end] ;#enough for display purposes id - mostly zeros anyway
set w0 9
set w1 27
set w2 11
set w3 22 ;#review - this can truncate source name without indication tail is missing
set w4 [expr {1 + ([::tcl::string::length $lcount] *2)}] ;#eg 999/999
#do not columnize the final data column or append anything to end - or we could muck up the crlf integrity
lassign [list \
[format %-${w0}s $idtail]\
[format %-${w1}s $time_info]\
[format %-${w2}s $lagfp]\
[format %-${w3}s $source]\
] c0 c1 c2 c3
set c2_blank [string repeat " " $w2]
if {[::tcl::string::length $sysloghost_port]} {
_initsock
}
set outlines [list]
set lnum 0
foreach ln $lines {
incr lnum
set c4 [format %-${w4}s $lnum/$lcount]
if {$lnum == 1} {
lappend outlines "$c0 $c1 $c2 $c3 $c4 $ln"
} else {
lappend outlines "$c0 $c1 $c2_blank $c3 $c4 $ln"
}
if {[::tcl::string::length $sysloghost_port]} {
#send each line as a separate syslog message
#even if they arrive out of order or interleaved with records from other sources -
#they can be tied together and ordered using id,source, timestamp, n/numlines fields
#we lose information about the line-endings though
catch {puts -nonewline $sock [lindex $outlines end]}
}
}
#todo - setting to maintain open filehandle and reduce io.
# possible settings for buffersize - and maybe logrotation, although this could be left to client
#for now - default to safe option of open/close each write despite the overhead.
if {[string length $logfile]} {
switch -- $le {
lf {
set logchunk "[join $outlines \n]\n"
}
crlf {
#join with \n because we still did split on \n
set logchunk "[join $outlines \n]\r\n"
}
cr {
set logchunk "[join $outlines \n]\r"
}
none {
set logchunk [join $outlines \n]
}
}
set fd [open $logfile a]
if {$le in {cr none}} {
append logchunk \n
}
puts -nonewline $fd $logchunk
close $fd
}
} else {
#raw
if {[string length $sysloghost_port]} {
_initsock
catch {puts -nonewline $sock $msg}
}
if {[string length $logfile]} {
set fd [open $logfile a]
puts -nonewline $fd $msg
close $fd
}
}
#todo - sockets etc?
}
# - withdraw just this client
proc finish {tidclient} {
variable client_ids
if {($tidclient in $clientids) && ([llength $clientids] == 1)} {
terminate $tidclient
} else {
set posn [lsearch $client_ids $tidclient]
set client_ids [lreplace $clientids $posn $posn]
}
}
#allow any client to terminate
proc terminate {tidclient} {
variable sock
variable fd
variable client_ids
if {$tidclient in $client_ids} {
catch {close $sock}
catch {close $fd}
set client_ids [list]
#review use of thread::release -wait
#docs indicate deprecated for regular use, and that we should use thread::join
#however.. how can we set a timeout on a thread::join ?
#by telling the thread to release itself - we can wait on the thread::send variable
# This needs review - because it's unclear that -wait even works on self
# (what does it mean to wait for the target thread to exit if the target is self??)
thread::release -wait
return [thread::id]
} else {
return ""
}
}
}
namespace eval shellthread::manager {
variable workers [dict create]
variable worker_errors [list]
variable timeouts
variable free_threads [list]
#variable log_threads
proc dict_getdef {dictValue args} {
if {[llength $args] < 2} {
error {wrong # args: should be "dict_getdef dictValue ?key ...? key default"}
}
set keys [lrange $args 0 end-1]
if {[tcl::dict::exists $dictValue {*}$keys]} {
return [tcl::dict::get $dictValue {*}$keys]
} else {
return [lindex $args end]
}
}
#new datastructure regarding workers and sourcetags required.
#one worker can service multiple sourcetags - but each sourcetag may be used by multiple threads too.
#generally each thread will use a specific sourcetag - but we may have pools doing similar things which log to same destination.
#
#As a convention we may use a sourcetag for the thread which started the worker that isn't actually used for logging - but as a common target for joins
#If the thread which started the thread calls leave_worker with that 'primary' sourcetag it means others won't be able to use that target - which seems reasonable.
#If another thread want's to maintain joinability beyond the span provided by the starting client,
#it can join with both the primary tag and a tag it will actually use for logging.
#A thread can join the logger with any existingtag - not just the 'primary'
#(which is arbitrary anyway. It will usually be the first in the list - but may be unsubscribed by clients and disappear)
proc join_worker {existingtag sourcetaglist} {
set client_tid [thread::id]
#todo - allow a source to piggyback on existing worker by referencing one of the sourcetags already using the worker
}
proc new_pipe_worker {sourcetaglist {settingsdict {}}} {
if {[dict exists $settingsdict -workertype]} {
if {[string tolower [dict get $settingsdict -workertype]] ne "pipe"} {
error "new_pipe_worker error: -workertype ne 'pipe'. Set to 'pipe' or leave empty"
}
}
dict set settingsdict -workertype pipe
new_worker $sourcetaglist $settingsdict
}
#it is up to caller to use a unique sourcetag (e.g by prefixing with own thread::id etc)
# This allows multiple threads to more easily write to the same named sourcetag if necessary
# todo - change sourcetag for a list of tags which will be handled by the same thread. e.g for multiple threads logging to same file
#
# todo - some protection mechanism for case where target is a file to stop creation of multiple worker threads writing to same file.
# Even if we use open fd,close fd wrapped around writes.. it is probably undesirable to have multiple threads with same target
# On the other hand socket targets such as UDP can happily be written to by multiple threads.
# For now the mechanism is that a call to new_worker (rename to open_worker?) will join the same thread if a sourcetag matches.
# but, as sourcetags can get removed(unsubbed via leave_worker) this doesn't guarantee two threads with same -file settings won't fight.
# Also.. the settingsdict is ignored when joining with a tag that exists.. this is problematic.. e.g logrotation where previous file still being written by existing worker
# todo - rename 'sourcetag' concept to 'targettag' ?? the concept is a mixture of both.. it is somewhat analagous to a syslog 'facility'
# probably new_worker should disallow auto-joining and we allow different workers to handle same tags simultaneously to support overlap during logrotation etc.
proc new_worker {sourcetaglist {settingsdict {}}} {
variable workers
set ts_start [clock micros]
set tidclient [thread::id]
set sourcetag [lindex $sourcetaglist 0] ;#todo - use all
set defaults [dict create\
-workertype message\
]
set settingsdict [dict merge $defaults $settingsdict]
set workertype [string tolower [dict get $settingsdict -workertype]]
set known_workertypes [list pipe message]
if {$workertype ni $known_workertypes} {
error "new_worker - unknown -workertype $workertype. Expected one of '$known_workertypes'"
}
if {[dict exists $workers $sourcetag]} {
set winfo [dict get $workers $sourcetag]
if {[dict get $winfo tid] ne "noop" && [thread::exists [dict get $winfo tid]]} {
#add our client-info to existing worker thread
dict lappend winfo list_client_tids $tidclient
dict set workers $sourcetag $winfo ;#writeback
return [dict get $winfo tid]
}
}
#noop fake worker for empty syslog and empty file
if {$workertype eq "message"} {
if {[dict_getdef $settingsdict -syslog ""] eq "" && [dict_getdef $settingsdict -file ""] eq ""} {
set winfo [dict create tid noop list_client_tids [list $tidclient] ts_start $ts_start ts_end_list [list] workertype "message"]
dict set workers $sourcetag $winfo
return noop
}
}
#check if there is an existing unsubscribed thread first
#don't use free_threads for pipe workertype for now..
variable free_threads
if {$workertype ne "pipe"} {
if {[llength $free_threads]} {
#todo - re-use from tail - as most likely to have been doing similar work?? review
set free_threads [lassign $free_threads tidworker]
#todo - keep track of real ts_start of free threads... kill when too old
set winfo [dict create tid $tidworker list_client_tids [list $tidclient] ts_start $ts_start ts_end_list [list] workertype [dict get $settingsdict -workertype]]
#puts stderr "shellfilter::new_worker Re-using free worker thread: $tidworker with tag $sourcetag"
dict set workers $sourcetag $winfo
return $tidworker
}
}
#set ts_start [::shellthread::iso8601]
set tidworker [thread::create -preserved]
set init_script [string map [list %ts_start% $ts_start %mp% [tcl::tm::list] %ap% $::auto_path %tidcli% $tidclient %sd% $settingsdict] {
#set tclbase [file dirname [file dirname [info nameofexecutable]]]
#set tcllib $tclbase/lib
#if {$tcllib ni $::auto_path} {
# lappend ::auto_path $tcllib
#}
set ::settingsinfo [dict create %sd%]
#if the executable running things is something like a tclkit,
# then it's likely we will need to use the caller's auto_path and tcl::tm::list to find things
#The caller can tune the thread's package search by providing a settingsdict
#tcl::tm::add * must add in reverse order to get reulting list in same order as original
if {![dict exists $::settingsinfo tcl_tm_list]} {
#JMN2
::tcl::tm::add {*}[lreverse [list %mp%]]
} else {
tcl::tm::remove {*}[tcl::tm::list]
::tcl::tm::add {*}[lreverse [dict get $::settingsinfo tcl_tm_list]]
}
if {![dict exists $::settingsinfo auto_path]} {
set ::auto_path [list %ap%]
} else {
set ::auto_path [dict get $::settingsinfo auto_path]
}
package require punk::packagepreference
punk::packagepreference::install
package require Thread
package require shellthread
if {![catch {::shellthread::worker::init %tidcli% %ts_start% $::settingsinfo} errmsg]} {
unset ::settingsinfo
set ::shellthread_init "ok"
} else {
unset ::settingsinfo
set ::shellthread_init "err $errmsg"
}
}]
thread::send -async $tidworker $init_script
#thread::send $tidworker $init_script
set winfo [dict create tid $tidworker list_client_tids [list $tidclient] ts_start $ts_start ts_end_list [list]]
dict set workers $sourcetag $winfo
return $tidworker
}
proc set_pipe_read_from_client {tag_pipename worker_tid rchan args} {
variable workers
if {![dict exists $workers $tag_pipename]} {
error "workerthread::manager::set_pipe_read_from_client source/pipename $tag_pipename not found"
}
set match_worker_tid [dict get $workers $tag_pipename tid]
if {$worker_tid ne $match_worker_tid} {
error "workerthread::manager::set_pipe_read_from_client source/pipename $tag_pipename workert_tid mismatch '$worker_tid' vs existing:'$match_worker_tid'"
}
#buffering set during channel creation will be preserved on thread::transfer
thread::transfer $worker_tid $rchan
#start_pipe_read will vwait - so we have to send async
thread::send -async $worker_tid [list ::shellthread::worker::start_pipe_read $tag_pipename $rchan]
#client may start writing immediately - but presumably it will buffer in fifo2
}
proc set_pipe_write_to_client {tag_pipename worker_tid wchan args} {
variable workers
if {![dict exists $workers $tag_pipename]} {
error "workerthread::manager::set_pipe_write_to_client pipename $tag_pipename not found"
}
set match_worker_tid [dict get $workers $tag_pipename tid]
if {$worker_tid ne $match_worker_tid} {
error "workerthread::manager::set_pipe_write_to_client pipename $tag_pipename workert_tid mismatch '$worker_tid' vs existing:'$match_worker_tid'"
}
#buffering set during channel creation will be preserved on thread::transfer
thread::transfer $worker_tid $wchan
thread::send -async $worker_tid [list ::shellthread::worker::start_pipe_write $tag_pipename $wchan]
}
proc write_log {source msg args} {
variable workers
set ts_micros_sent [clock micros]
set defaults [list -async 1 -level info]
set opts [dict merge $defaults $args]
if {[dict exists $workers $source]} {
set tidworker [dict get $workers $source tid]
if {$tidworker eq "noop"} {
return
}
if {![thread::exists $tidworker]} {
# -syslog -file ?
set tidworker [new_worker $source]
}
} else {
#auto create with no requirement to call new_worker.. warn?
# -syslog -file ?
error "write_log no log opened for source: $source"
set tidworker [new_worker $source]
}
set client_tid [thread::id]
if {[dict get $opts -async]} {
thread::send -async $tidworker [list ::shellthread::worker::send_info $client_tid $ts_micros_sent $source $msg]
} else {
thread::send $tidworker [list ::shellthread::worker::send_info $client_tid $ts_micros_sent $source $msg]
}
}
proc report_worker_errors {errdict} {
variable workers
set reporting_tid [dict get $errdict worker_tid]
dict for {src srcinfo} $workers {
if {[dict get $srcinfo tid] eq $reporting_tid} {
dict set srcinfo errors [dict get $errdict errors]
dict set workers $src $srcinfo ;#writeback updated
break
}
}
}
#aka leave_worker
#Note that the tags may be on separate workertids, or some tags may share workertids
proc unsubscribe {sourcetaglist} {
variable workers
#workers structure example:
#[list sourcetag1 [list tid <tidworker> list_client_tids <clients>] ts_start <ts_start> ts_end_list {}]
variable free_threads
set mytid [thread::id] ;#caller of shellthread::manager::xxx is the client thread
set subscriberless_tags [list]
foreach source $sourcetaglist {
if {[dict exists $workers $source]} {
set list_client_tids [dict get $workers $source list_client_tids]
if {[set posn [lsearch $list_client_tids $mytid]] >= 0} {
set list_client_tids [lreplace $list_client_tids $posn $posn]
dict set workers $source list_client_tids $list_client_tids
}
if {![llength $list_client_tids]} {
lappend subscriberless_tags $source
}
}
}
#we've removed our own tid from all the tags - possibly across multiplew workertids, and possibly leaving some workertids with no subscribers for a particular tag - or no subscribers at all.
set subscriberless_workers [list]
set shuttingdown_workers [list]
foreach deadtag $subscriberless_tags {
set workertid [dict get $workers $deadtag tid]
set worker_tags [get_worker_tagstate $workertid]
set subscriber_count 0
set kill_count 0 ;#number of ts_end_list entries - even one indicates thread is doomed
foreach taginfo $worker_tags {
incr subscriber_count [llength [dict get $taginfo list_client_tids]]
incr kill_count [llength [dict get $taginfo ts_end_list]]
}
if {$subscriber_count == 0} {
lappend subscriberless_workers $workertid
}
if {$kill_count > 0} {
lappend shuttingdown_workers $workertid
}
}
#if worker isn't shutting down - add it to free_threads list
foreach workertid $subscriberless_workers {
if {$workertid ni $shuttingdown_workers} {
if {$workertid ni $free_threads && $workertid ne "noop"} {
lappend free_threads $workertid
}
}
}
#todo
#unsub this client_tid from the sourcetags in the sourcetaglist. if no more client_tids exist for sourcetag, remove sourcetag,
#if no more sourcetags - add worker to free_threads
}
proc get_worker_tagstate {workertid} {
variable workers
set taginfo_list [list]
dict for {source sourceinfo} $workers {
if {[dict get $sourceinfo tid] eq $workertid} {
lappend taginfo_list $sourceinfo
}
}
return $taginfo_list
}
#finalisation
proc shutdown_free_threads {{timeout 2500}} {
variable free_threads
if {![llength $free_threads]} {
return
}
upvar ::shellthread::manager::timeouts timeoutarr
if {[info exists timeoutarr(shutdown_free_threads)]} {
#already called
return false
}
#set timeoutarr(shutdown_free_threads) waiting
#after $timeout [list set timeoutarr(shutdown_free_threads) timed-out]
set ::shellthread::waitfor waiting
#after $timeout [list set ::shellthread::waitfor]
#2025-07 timed-out untested review
set cancelid [after $timeout [list set ::shellthread::waitfor timed-out]]
set waiting_for [list]
set ended [list]
set timedout 0
foreach tid $free_threads {
if {[thread::exists $tid]} {
lappend waiting_for $tid
#thread::send -async $tid [list shellthread::worker::terminate [thread::id]] timeoutarr(shutdown_free_threads)
thread::send -async $tid [list shellthread::worker::terminate [thread::id]] ::shellthread::waitfor
}
}
if {[llength $waiting_for]} {
for {set i 0} {$i < [llength $waiting_for]} {incr i} {
vwait ::shellthread::waitfor
if {$::shellthread::waitfor eq "timed-out"} {
set timedout 1
break
} else {
after cancel $cancelid
lappend ended $::shellthread::waitfor
}
}
}
set free_threads [list]
return [dict create existed $waiting_for ended $ended timedout $timedout]
}
#TODO - important.
#REVIEW!
#since moving to the unsubscribe mechansm - close_worker $source isn't being called
# - we need to set a limit to the number of free threads and shut down excess when detected during unsubscription
#instruction to shut-down the thread that has this source.
#instruction to shut-down the thread that has this source.
proc close_worker {source {timeout 2500}} {
variable workers
variable worker_errors
variable free_threads
upvar ::shellthread::manager::timeouts timeoutarr
set ts_now [clock micros]
#puts stderr "close_worker $source"
if {[dict exists $workers $source]} {
set tidworker [dict get $workers $source tid]
if {$tidworker in $freethreads} {
#make sure a thread that is being closed is removed from the free_threads list
set posn [lsearch $freethreads $tidworker]
set freethreads [lreplace $freethreads $posn $posn]
}
set mytid [thread::id]
set client_tids [dict get $workers $source list_client_tids]
if {[set posn [lsearch $client_tids $mytid]] >= 0} {
set client_tids [lreplace $client_tids $posn $posn]
#remove self from list of clients
dict set workers $source list_client_tids $client_tids
}
set ts_end_list [dict get $workers $source ts_end_list] ;#ts_end_list is just a list of timestamps of closing calls for this source - only one is needed to close, but they may all come in a flurry.
if {[llength $ts_end_list]} {
set last_end_ts [lindex $ts_end_list end]
if {(($tsnow - $last_end_ts) / 1000) >= $timeout} {
lappend ts_end_list $ts_now
dict set workers $source ts_end_list $ts_end_list
} else {
#existing close in progress.. assume it will work
return
}
}
if {[thread::exists $tidworker]} {
#puts stderr "shellthread::manager::close_worker: thread $tidworker for source $source still running.. terminating"
#review - timeoutarr is local var (?)
set timeoutarr($source) 0
after $timeout [list set timeoutarr($source) 2]
thread::send -async $tidworker [list shellthread::worker::send_errors_now [thread::id]]
thread::send -async $tidworker [list shellthread::worker::terminate [thread::id]] timeoutarr($source)
#thread::send -async $tidworker [string map [list %tidclient% [thread::id]] {
# shellthread::worker::terminate %tidclient%
#}] timeoutarr($source)
vwait timeoutarr($source)
#puts stderr "shellthread::manager::close_worker: thread $tidworker for source $source DONE1"
thread::release $tidworker
#puts stderr "shellthread::manager::close_worker: thread $tidworker for source $source DONE2"
if {[dict exists $workers $source errors]} {
set errlist [dict get $workers $source errors]
if {[llength $errlist]} {
lappend worker_errors [list $source [dict get $workers $source]]
}
}
dict unset workers $source
} else {
#thread may have been closed by call to close_worker with another source with same worker
#clear workers record for this source
#REVIEW - race condition for re-creation of source with new workerid?
#check that record is subscriberless to avoid this
if {[llength [dict get $workers $source list_client_tids]] == 0} {
dict unset workers $source
}
}
}
#puts stdout "close_worker $source - end"
}
#worker errors only available for a source after close_worker called on that source
#It is possible for there to be multiple entries for a source because new_worker can be called multiple times with same sourcetag,
proc get_and_clear_errors {source} {
variable worker_errors
set source_errors [lsearch -all -inline -index 0 $worker_errors $source]
set worker_errors [lsearch -all -inline -index 0 -not $worker_errors $source]
return $source_errors
}
}
package provide shellthread [namespace eval shellthread {
variable version
set version 1.6.2
}]
Loading…
Cancel
Save