File size: 3,141 Bytes
c336648 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
import io
import sys
import threading
import traceback
stdout_stream = io.StringIO()
stderr_stream = io.StringIO()
class DualStream:
def __init__(self, stream1, stream2):
self.stream1 = stream1
self.stream2 = stream2
def write(self, text):
self.stream1.write(text)
self.stream2.write(text)
def flush(self):
self.stream1.flush()
self.stream2.flush()
def capture_init():
# Create the dual stream object
dual_stream = DualStream(sys.stdout, stdout_stream)
# Redirect stdout and stderr to the dual stream
sys.stdout = dual_stream
sys.stderr = dual_stream
def capture_snapshot():
# Get the captured stdout and stderr
stdout_value = stdout_stream.getvalue()
stderr_value = stderr_stream.getvalue()
return stdout_value, stderr_value
def capture_clear():
stdout_stream.truncate(0)
stdout_stream.seek(0)
stderr_stream.truncate(0)
stderr_stream.seek(0)
def capture_wrap(func):
def wrapper(*args, **kwargs):
# Create a thread to capture the stdout and stderr
output_thread = threading.Thread(target=capture_init)
output_thread.start()
try:
results = func(*args, **kwargs) # Execute the decorated function
# Wait for the thread to finish capturing output
output_thread.join()
result_text = ""
result_remain = tuple()
if results is None:
results = tuple("")
if isinstance(results, str):
result_text = results
if isinstance(results, tuple) and len(results) > 0:
result_text = str(results[0])
result_remain = results[1:]
out, err = capture_snapshot()
message = f"[result]:\n{result_text}\n\n"
if out:
message += f"[stdout]\n{out}\n\n"
if err:
message += f"[stderr]\n{err}\n\n"
return message, *result_remain
except Exception:
output_thread.join()
out, err = capture_snapshot()
stack_trace = traceback.format_exc()
message = ""
if out:
message += f"[stdout]\n{out}\n\n"
if err:
message += f"[stderr]\n{err}\n\n"
message += f"[trace]\n{stack_trace}"
return message, None
finally:
capture_clear()
return wrapper
"""
def update_output():
while True:
stdout_value, stderr_value = get_output()
if stdout_value or stderr_value: # Check if there is new content
if text_output.value is None:
text_output.value = ''
if stdout_value:
text_output.value += stdout_value
if stderr_value:
text_output.value += stderr_value
# Clear the captured output
stdout_stream.truncate(0)
stderr_stream.truncate(0)
time.sleep(0.5) # Add a small delay before checking again
output_thread = threading.Thread(target=update_output)
output_thread.start()
"""
|