Browse Source

stdin_race.tcl etc in scriptlib (test for redirected stdin issues with subprocesses)

master
Julian Noble 1 month ago
parent
commit
eb2a1037e4
  1. 137
      scriptlib/stdin_race.pl
  2. 125
      scriptlib/stdin_race.py
  3. 115
      scriptlib/stdin_race.tcl
  4. 9
      src/modules/shellfilter-0.2.tm

137
scriptlib/stdin_race.pl

@ -0,0 +1,137 @@
#!/usr/bin/perl
use strict;
use warnings;
use Time::HiRes qw(usleep);
use IPC::Open3;
use IO::Handle;
# ANSI color codes
my $RESET = "\e[0m";
my $GREEN = "\e[32m"; # Child color
my $YELLOW = "\e[33m"; # Parent color
# Usage function
sub usage {
my ($args) = @_;
print STDERR "rcvd : $0 $args\n";
print STDERR "usage:\n";
print STDERR " $0 pump <persecond> <maxcount>\n";
print STDERR " $0 parent\n";
print STDERR " $0 child <delay_ms>\n";
print STDERR "\n";
print STDERR "e.g:\n";
print STDERR " perl $0 pump 35 50 | perl $0 parent\n";
exit 1;
}
# Pump role
sub pump {
my ($persecond, $maxcount) = @_;
if ($persecond > 1000) {
print STDERR "WARNING: (>1000) sub millisecond scheduling not available - will go full speed\n";
usleep(500_000);
}
STDOUT->autoflush(1);
STDERR->autoflush(1);
my $counter = -1;
my $ms = int(1000 / $persecond);
while ($maxcount <= 0 || $counter < $maxcount - 1) {
$counter++;
print ".${counter}";
usleep($ms * 1000);
}
print STDERR "pump-done\n";
}
# Parent role
sub parent {
print STDERR "${YELLOW}parent${RESET}\n";
usleep(250_000);
# Read the first chunk from stdin
my $parent_chunk1;
read(STDIN, $parent_chunk1, 8);
print STDERR "${YELLOW}${parent_chunk1}${RESET}\n";
# Launch the child process
#my $child_pid = open3("<&STDIN", my $child_out, ">&STDERR", "perl", $0, "child", "150");
open(local *CHILD_STDIN, "<&", \*STDIN) or die "Can't dup STDIN: $!";
open(local *CHILD_STDERR, "<&", \*STDERR) or die "Can't dup STDERR: $!";
my $child_pid = open3("<&CHILD_STDIN", my $child_out, ">&CHILD_STDERR", "perl", $0, "child", "150");
binmode($child_out, ":utf8");
# Handle output from the child process asynchronously
while (my $line = <$child_out>) {
print STDERR $line;
}
#close $child_out;
waitpid($child_pid, 0);
print STDERR "parent-tail-read\n";
while (my $chunk = <STDIN>) {
print STDOUT $chunk;
}
print STDERR "\n${YELLOW}parent-done${RESET}\n";
}
# Child role
sub child {
my ($delay_ms) = @_;
print STDERR "\n${GREEN}child${RESET}\n";
usleep($delay_ms * 1000);
# Read exactly 16 characters from stdin
my $chunk;
my $bytes_read = read(STDIN, $chunk, 16);
if (defined $bytes_read) {
if ($bytes_read > 0) {
print STDERR "${GREEN}child-read: $bytes_read bytes${RESET}\n";
} else {
print STDERR "${GREEN}child-read: no data read${RESET}\n";
exit 0;
}
}
print STDOUT "${GREEN}${chunk}${RESET}\n";
print STDERR "child-done\n";
exit 0;
}
# Main function
sub main {
my @args = @ARGV;
if (@args < 1) {
usage("");
}
my $role = shift @args;
if ($role eq "pump") {
if (@args != 2) {
usage("pump");
}
my ($persecond, $maxcount) = @args;
pump($persecond, $maxcount);
} elsif ($role eq "parent") {
if (@args != 0) {
usage("parent");
}
parent();
} elsif ($role eq "child") {
if (@args != 1) {
usage("child");
}
my ($delay_ms) = @args;
child($delay_ms);
} else {
usage($role);
}
}
main();

125
scriptlib/stdin_race.py

@ -0,0 +1,125 @@
import sys
import time
import asyncio
from subprocess import Popen, PIPE
RESET = "\033[0m"
C = "\033[32m" # Child color green
P = "\033[33m" # Parent color yellow
def usage():
print("Usage:")
print(" stdin_race.py pump <persecond> <maxcount>")
print(" stdin_race.py parent")
print(" stdin_race.py child <delay_ms>")
print("\nExample:")
print(" python stdin_race.py pump 35 50 | python stdin_race.py parent")
sys.exit(1)
async def pump(persecond, maxcount):
if persecond > 1000:
print("WARNING: (>1000) sub-millisecond scheduling not available - will go full speed", file=sys.stderr)
await asyncio.sleep(0.5)
counter = -1
ms = 1000 / persecond
async def pump_emit():
nonlocal counter
try:
counter += 1
print(f".{counter}", end="", flush=True)
except BrokenPipeError:
return False
return True
async def pump_schedule():
nonlocal counter
while maxcount <= 0 or counter < maxcount - 1:
if not await pump_emit():
break
await asyncio.sleep(ms / 1000)
print("pump-done", file=sys.stderr, flush=True)
await pump_schedule()
async def parent():
print(f"{P}parent{RESET}", file=sys.stderr, flush=True)
await asyncio.sleep(0.25)
# Read the first chunk from stdin
parent_chunk1 = sys.stdin.read(8)
print(f"{P}{parent_chunk1}{RESET}", file=sys.stderr, flush=True)
# Launch the child process
child_proc = Popen(
[sys.executable, __file__, "child", "150"],
stdin=sys.stdin, stdout=PIPE, stderr=sys.stderr, text=True
)
# Forward stdin to the child process
#while True:
# chunk = sys.stdin.read(1)
# if not chunk:
# break
# try:
# child_proc.stdin.write(chunk)
# #child_proc.stdin.flush()
# except BrokenPipeError:
# print("Broken pipe error, child process may have exited.", file=sys.stderr)
# break
child_proc.wait()
#child_proc.wait()
print("parent-tail-read", file=sys.stderr, flush=True)
while True:
chunk = sys.stdin.read(1)
if not chunk:
break
print(chunk, end="", file=sys.stderr, flush=True)
print(f"\n{P}parent-done{RESET}", flush=True)
async def child(delay_ms):
print(f"\n{C}child{RESET}", file=sys.stderr, flush=True)
await asyncio.sleep(delay_ms / 1000)
# Read exactly 16 characters from stdin
chunk = sys.stdin.read(16)
print(f"{C}{chunk}{RESET}", file=sys.stderr, flush=True)
print("child-done", file=sys.stderr, flush=True)
sys.exit(0)
def main():
if len(sys.argv) < 2:
usage()
role = sys.argv[1]
if role == "pump":
if len(sys.argv) != 4:
usage()
persecond = int(sys.argv[2])
maxcount = int(sys.argv[3])
asyncio.run(pump(persecond, maxcount))
elif role == "parent":
if len(sys.argv) != 2:
usage()
asyncio.run(parent())
elif role == "child":
if len(sys.argv) != 3:
usage()
delay_ms = int(sys.argv[2])
asyncio.run(child(delay_ms))
else:
usage()
if __name__ == "__main__":
main()

115
scriptlib/stdin_race.tcl

@ -0,0 +1,115 @@
chan configure stdin -blocking 0 -buffering none
#puts stderr "stdinconf: [chan configure stdin]"
set RST \x1b\[0m
set C \x1b\[32m ;#child colour green
set P \x1b\[33m ;#parent colour yellow
proc usage {args} {
puts stderr "rcvd : [info script] $args"
puts stderr "usage:"
puts stderr " [info script] pump <persecond> <maxcount>"
puts stderr " [info script] parent"
puts stderr " [info script] child <delay_ms>"
puts stderr \n
puts stderr "e.g:"
puts stderr " >tclsh"
puts stderr " %chan configure stdin -blocking 0"
puts stderr " %tclsh [info script] pump 35 50 | tclsh [info script] parent"
exit 0
}
proc read_child {chan} {
if {![eof $chan]} {
puts stdout [read $chan]
flush stdout
} else {
set ::done 1
}
}
proc pump_schedule {} {
upvar ::counter c
upvar ::maxcount maxcount
if {$::forever_pump} {
if {$maxcount > 0 && $c >= $maxcount} {
set ::forever_pump 0
} else {
after idle [list after 0 ::pump_emit]
}
tailcall after $::ms ::pump_schedule
} else {
after idle [list ::pump_end]
}
}
proc pump_emit {} {
upvar ::counter c
if {[catch {
puts -nonewline stdout .[incr c]
}]} {
set ::forever_pump 0
}
flush stdout
}
proc pump_end {} {
puts stderr "pump-done"
flush stderr
flush stdout
}
switch -- [lindex $::argv 0] {
pump {
if {$::argc != 3} {usage {*}$::argv}
set persec [lindex $::argv 1]
set maxcount [lindex $::argv 2]
if {$persec > 1000} {
puts stderr "WARNING: (>1000) sub millisecond scheduling not available - will go full speed"
flush stderr
after 500
}
chan configure stdout -blocking 1 -buffering none
set counter -1
set ms [expr {1000 / $persec}]
set ::forever_pump 1
pump_schedule
vwait ::forever_pump
}
parent {
if {$::argc != 1} {usage {*}$::argv}
puts stderr "${::P}parent$RST"
after 250
set parent_chunk1 [read stdin 8]
#set rdout [open |[concat tclsh [info script] child 150 2>@stdout <@stdin] RDONLY]
set rdout [open |[concat tclsh [info script] child 150 2>@stdout <@stdin] RDONLY]
chan conf $rdout -blocking 0 -buffersize 1
chan event $rdout readable [list ::read_child $rdout]
puts -nonewline stderr $::P$parent_chunk1$::RST
flush stderr
after 10000 {set ::done 1}
vwait ::done
puts stdout parent-tail-read
while {![eof stdin]} {
puts -nonewline stderr [read stdin]
flush stderr
}
puts stdout \n${::P}parent-done$::RST
flush stdout
}
child {
if {$::argc != 2} {usage $::argv}
set delay_ms [lindex $::argv 1]
puts stdout "\n${::C}child$::RST"
after $delay_ms
puts stdout ${::C}[read stdin 16]$::RST
#puts stderr ${::C}[read stdin]$::RST
puts stdout "child-done"
flush stderr
exit 0
}
default {usage $::argv}
}
exit 0

9
src/modules/shellfilter-0.2.tm

@ -2792,7 +2792,7 @@ namespace eval shellfilter {
#chan configure $inchan -buffering none -blocking 1 ;test
#chan configure $inchan -buffering none -blocking 1 ;#test
chan configure $inchan -buffering $inbuffering -blocking 0 ;#we are setting up a readable handler for this - so non-blocking ok
@ -2831,10 +2831,15 @@ namespace eval shellfilter {
# Ideally we need something like exec,open in tcl that interacts with transformed channels directly and emits as it runs, not only at termination
# - and that at least appears like a terminal to the called command.
#set rdout [open |[concat $commandlist [list 2>@stderr <@$inchan]] [list RDONLY]]
#REVIEW!
#if the child process takes a while to begin reading stdin - the data on stdin between when we stopped the parent chan event handler and when the child gets data,
#seems to stay buffered somewhere. It is then read by the parent, after the child returns. (ie not lost, but out-of-order)
#This can be apparent sometimes even with fast typing upon calling an executable. (e.g occasionally even vim - but seems to be timing based so might only happen first time if at all)
# see scriptlib/stdin_race.tcl etc test files.
#similar problem with python & perl - issue seems to be in libc or OS buffering behaviour for standard channels.
#note that zig (repo/jn/zig/stdin_race) seems to avoid this issue - todo - make zig based binary extension for open/exec?
set rdout [open |[concat $commandlist [list 2>@$wrerr <@$inchan]] [list RDONLY]]
chan configure $rderr -buffering $errbuffering -blocking 0

Loading…
Cancel
Save