fortify/test/test.py
Ophestra e9a7cd526f
All checks were successful
Test / Create distribution (push) Successful in 27s
Test / Sandbox (push) Successful in 1m45s
Test / Fortify (push) Successful in 2m36s
Test / Sandbox (race detector) (push) Successful in 2m49s
Test / Fpkg (push) Successful in 3m33s
Test / Fortify (race detector) (push) Successful in 4m13s
Test / Flake checks (push) Successful in 1m6s
app: improve shim process management
This ensures a signal gets delivered to the process instead of relying on parent death behaviour.

SIGCONT was chosen as it is the only signal an unprivileged process is allowed to send to processes with different credentials.

A custom signal handler is installed because the Go runtime does not expose signal information other than which signal was received, and shim must check pid to ensure reasonable behaviour.

Signed-off-by: Ophestra <cat@gensokyo.uk>
2025-04-07 03:55:17 +09:00

259 lines
9.8 KiB
Python

import json
import shlex
q = shlex.quote
NODE_GROUPS = ["nodes", "floating_nodes"]
def swaymsg(command: str = "", succeed=True, type="command"):
assert command != "" or type != "command", "Must specify command or type"
shell = q(f"swaymsg -t {q(type)} -- {q(command)}")
with machine.nested(
f"sending swaymsg {shell!r}" + " (allowed to fail)" * (not succeed)
):
ret = (machine.succeed if succeed else machine.execute)(
f"su - alice -c {shell}"
)
# execute also returns a status code, but disregard.
if not succeed:
_, ret = ret
if not succeed and not ret:
return None
parsed = json.loads(ret)
return parsed
def walk(tree):
yield tree
for group in NODE_GROUPS:
for node in tree.get(group, []):
yield from walk(node)
def wait_for_window(pattern):
def func(last_chance):
nodes = (node["name"] for node in walk(swaymsg(type="get_tree")))
if last_chance:
nodes = list(nodes)
machine.log(f"Last call! Current list of windows: {nodes}")
return any(pattern in name for name in nodes)
retry(func)
def collect_state_ui(name):
swaymsg(f"exec fortify ps > '/tmp/{name}.ps'")
machine.copy_from_vm(f"/tmp/{name}.ps", "")
swaymsg(f"exec fortify --json ps > '/tmp/{name}.json'")
machine.copy_from_vm(f"/tmp/{name}.json", "")
machine.screenshot(name)
def check_state(name, enablements):
instances = json.loads(machine.succeed("sudo -u alice -i XDG_RUNTIME_DIR=/run/user/1000 fortify --json ps"))
if len(instances) != 1:
raise Exception(f"unexpected state length {len(instances)}")
instance = next(iter(instances.values()))
config = instance['config']
command = f"{name}-start"
if not (config['path'].startswith("/nix/store/")) or not (config['path'].endswith(command)):
raise Exception(f"unexpected path {config['path']}")
if len(config['args']) != 1 or config['args'][0] != command:
raise Exception(f"unexpected args {config['args']}")
if config['confinement']['enablements'] != enablements:
raise Exception(f"unexpected enablements {instance['config']['confinement']['enablements']}")
def fortify(command):
swaymsg(f"exec fortify {command}")
start_all()
machine.wait_for_unit("multi-user.target")
# Run fortify Go tests outside of nix build in the background:
machine.succeed("sudo -u untrusted -i fortify-go-test &> /tmp/go-test &")
# To check fortify's version:
print(machine.succeed("sudo -u alice -i fortify version"))
# Wait for Sway to complete startup:
machine.wait_for_file("/run/user/1000/wayland-1")
machine.wait_for_file("/tmp/sway-ipc.sock")
# Deny unmapped uid:
denyOutput = machine.fail("sudo -u untrusted -i fortify run &>/dev/stdout")
print(denyOutput)
denyOutputVerbose = machine.fail("sudo -u untrusted -i fortify -v run &>/dev/stdout")
print(denyOutputVerbose)
# Fail direct fsu call:
print(machine.fail("sudo -u alice -i fsu"))
# Verify PrintBaseError behaviour:
if denyOutput != "fsu: uid 1001 is not in the fsurc file\n":
raise Exception(f"unexpected deny output:\n{denyOutput}")
if denyOutputVerbose != "fsu: uid 1001 is not in the fsurc file\nfortify: *cannot obtain uid from fsu: permission denied\n":
raise Exception(f"unexpected deny verbose output:\n{denyOutputVerbose}")
check_offset = 0
def aid(offset):
return 1+check_offset+offset
def tmpdir_path(offset, name):
return f"/tmp/fortify.1000/tmpdir/{aid(offset)}/{name}"
# Start fortify permissive defaults outside Wayland session:
print(machine.succeed("sudo -u alice -i fortify -v run -a 0 touch /tmp/pd-bare-ok"))
machine.wait_for_file("/tmp/fortify.1000/tmpdir/0/pd-bare-ok", timeout=5)
# Verify silent output permissive defaults:
output = machine.succeed("sudo -u alice -i fortify run -a 0 true &>/dev/stdout")
if output != "":
raise Exception(f"unexpected output\n{output}")
# Verify silent output permissive defaults signal:
def silent_output_interrupt(flags):
swaymsg("exec foot")
wait_for_window("alice@machine")
# aid 0 does not have home-manager
machine.send_chars(f"exec fortify run {flags}-a 0 sh -c 'export PATH=/run/current-system/sw/bin:$PATH && touch /tmp/pd-silent-ready && sleep infinity' &>/tmp/pd-silent\n")
machine.wait_for_file("/tmp/fortify.1000/tmpdir/0/pd-silent-ready", timeout=15)
machine.succeed("rm /tmp/fortify.1000/tmpdir/0/pd-silent-ready")
machine.send_key("ctrl-c")
machine.wait_until_fails("pgrep foot", timeout=5)
machine.wait_until_fails(f"pgrep -u alice -f 'fortify run {flags}-a 0 '", timeout=5)
output = machine.succeed("cat /tmp/pd-silent && rm /tmp/pd-silent")
if output != "":
raise Exception(f"unexpected output\n{output}")
silent_output_interrupt("")
silent_output_interrupt("--dbus ") # this one is especially painful as it maintains a helper
silent_output_interrupt("--wayland -X --dbus --pulse ")
# Verify graceful failure on bad Wayland display name:
print(machine.fail("sudo -u alice -i fortify -v run --wayland true"))
# Start fortify permissive defaults within Wayland session:
fortify('-v run --wayland --dbus notify-send -a "NixOS Tests" "Test notification" "Notification from within sandbox." && touch /tmp/dbus-ok')
machine.wait_for_file("/tmp/dbus-ok", timeout=15)
collect_state_ui("dbus_notify_exited")
machine.succeed("pkill -9 mako")
# Check revert type selection:
fortify("-v run --wayland -X --dbus --pulse -u p0 foot && touch /tmp/p0-exit-ok")
wait_for_window("p0@machine")
print(machine.succeed("getfacl --absolute-names --omit-header --numeric /run/user/1000 | grep 1000000"))
fortify("-v run --wayland -X --dbus --pulse -u p1 foot && touch /tmp/p1-exit-ok")
wait_for_window("p1@machine")
print(machine.succeed("getfacl --absolute-names --omit-header --numeric /run/user/1000 | grep 1000000"))
machine.send_chars("exit\n")
machine.wait_for_file("/tmp/p1-exit-ok", timeout=15)
# Verify acl is kept alive:
print(machine.succeed("getfacl --absolute-names --omit-header --numeric /run/user/1000 | grep 1000000"))
machine.send_chars("exit\n")
machine.wait_for_file("/tmp/p0-exit-ok", timeout=15)
machine.fail("getfacl --absolute-names --omit-header --numeric /run/user/1000 | grep 1000000")
# Check interrupt shim behaviour:
swaymsg("exec sh -c 'ne-foot; echo -n $? > /tmp/monitor-exit-code'")
wait_for_window(f"u0_a{aid(0)}@machine")
machine.succeed("pkill -INT -f 'fortify -v app '")
machine.wait_until_fails("pgrep foot", timeout=5)
machine.wait_for_file("/tmp/monitor-exit-code")
interrupt_exit_code = int(machine.succeed("cat /tmp/monitor-exit-code"))
if interrupt_exit_code != 254:
raise Exception(f"unexpected exit code {interrupt_exit_code}")
# Start app (foot) with Wayland enablement:
swaymsg("exec ne-foot")
wait_for_window(f"u0_a{aid(0)}@machine")
machine.send_chars("clear; wayland-info && touch /tmp/client-ok\n")
machine.wait_for_file(tmpdir_path(0, "client-ok"), timeout=15)
collect_state_ui("foot_wayland")
check_state("ne-foot", 1)
# Verify lack of acl on XDG_RUNTIME_DIR:
machine.fail(f"getfacl --absolute-names --omit-header --numeric /run/user/1000 | grep {aid(0) + 1000000}")
machine.send_chars("exit\n")
machine.wait_until_fails("pgrep foot", timeout=5)
machine.fail(f"getfacl --absolute-names --omit-header --numeric /run/user/1000 | grep {aid(0) + 1000000}", timeout=5)
# Test PulseAudio (fortify does not support PipeWire yet):
swaymsg("exec pa-foot")
wait_for_window(f"u0_a{aid(1)}@machine")
machine.send_chars("clear; pactl info && touch /tmp/pulse-ok\n")
machine.wait_for_file(tmpdir_path(1, "pulse-ok"), timeout=15)
collect_state_ui("pulse_wayland")
check_state("pa-foot", 9)
machine.send_chars("exit\n")
machine.wait_until_fails("pgrep foot", timeout=5)
# Test XWayland (foot does not support X):
swaymsg("exec x11-alacritty")
wait_for_window(f"u0_a{aid(2)}@machine")
machine.send_chars("clear; glinfo && touch /tmp/x11-ok\n")
machine.wait_for_file(tmpdir_path(2, "x11-ok"), timeout=15)
collect_state_ui("alacritty_x11")
check_state("x11-alacritty", 2)
machine.send_chars("exit\n")
machine.wait_until_fails("pgrep alacritty", timeout=5)
# Start app (foot) with direct Wayland access:
swaymsg("exec da-foot")
wait_for_window(f"u0_a{aid(3)}@machine")
machine.send_chars("clear; wayland-info && touch /tmp/direct-ok\n")
collect_state_ui("foot_direct")
machine.wait_for_file(tmpdir_path(3, "direct-ok"), timeout=15)
check_state("da-foot", 1)
# Verify acl on XDG_RUNTIME_DIR:
print(machine.succeed(f"getfacl --absolute-names --omit-header --numeric /run/user/1000 | grep {aid(3) + 1000000}"))
machine.send_chars("exit\n")
machine.wait_until_fails("pgrep foot", timeout=5)
# Verify acl cleanup on XDG_RUNTIME_DIR:
machine.wait_until_fails(f"getfacl --absolute-names --omit-header --numeric /run/user/1000 | grep {aid(3) + 1000000}", timeout=5)
# Test syscall filter:
print(machine.fail("sudo -u alice -i XDG_RUNTIME_DIR=/run/user/1000 strace-failure"))
# Start app (foot) with Wayland enablement from a terminal:
swaymsg("exec foot $SHELL -c '(ne-foot) & disown && exec $SHELL'")
wait_for_window(f"u0_a{aid(0)}@machine")
machine.send_chars("clear; wayland-info && touch /tmp/term-ok\n")
machine.wait_for_file(tmpdir_path(0, "term-ok"), timeout=15)
machine.send_key("alt-h")
machine.send_chars("clear; fortify show $(fortify ps --short) && touch /tmp/ps-show-ok && exec cat\n")
machine.wait_for_file("/tmp/ps-show-ok", timeout=5)
collect_state_ui("foot_wayland_term")
check_state("ne-foot", 1)
machine.send_key("alt-l")
machine.send_chars("exit\n")
wait_for_window("alice@machine")
machine.send_key("ctrl-c")
machine.wait_until_fails("pgrep foot", timeout=5)
# Exit Sway and verify process exit status 0:
swaymsg("exit", succeed=False)
machine.wait_for_file("/tmp/sway-exit-ok")
# Print fortify runDir contents:
print(machine.succeed("find /run/user/1000/fortify"))
# Verify go test status:
machine.wait_for_file("/tmp/go-test", timeout=5)
print(machine.succeed("cat /tmp/go-test"))
machine.wait_for_file("/tmp/go-test-ok", timeout=5)