internal/pkg: streaming archive reader/writer
All checks were successful
Test / Create distribution (push) Successful in 1m5s
Test / Sandbox (push) Successful in 2m49s
Test / Hakurei (push) Successful in 3m53s
Test / ShareFS (push) Successful in 3m51s
Test / Sandbox (race detector) (push) Successful in 5m35s
Test / Hakurei (race detector) (push) Successful in 6m32s
Test / Flake checks (push) Successful in 1m15s
All checks were successful
Test / Create distribution (push) Successful in 1m5s
Test / Sandbox (push) Successful in 2m49s
Test / Hakurei (push) Successful in 3m53s
Test / ShareFS (push) Successful in 3m51s
Test / Sandbox (race detector) (push) Successful in 5m35s
Test / Hakurei (race detector) (push) Successful in 6m32s
Test / Flake checks (push) Successful in 1m15s
This is much more robust and efficient than the simple buffering implementation for larger files. Allocations happen almost exclusively in WalkDir. Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
278
internal/pkg/archive.go
Normal file
278
internal/pkg/archive.go
Normal file
@@ -0,0 +1,278 @@
|
||||
package pkg
|
||||
|
||||
import (
|
||||
"crypto/sha512"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"unsafe"
|
||||
|
||||
"hakurei.app/check"
|
||||
)
|
||||
|
||||
/*
|
||||
| mode uint32 | path_sz uint32 |
|
||||
| data_sz uint64 |
|
||||
| path string |
|
||||
| data []byte |
|
||||
*/
|
||||
|
||||
// An ArchiveHeader represents a single header in an archive.
|
||||
type ArchiveHeader struct {
|
||||
Mode fs.FileMode // file mode bits
|
||||
Path string // pathname of the file
|
||||
Size uint64 // size of data segment
|
||||
}
|
||||
|
||||
// Writer implements sequential writing of an archive. [Writer.WriteHeader]
|
||||
// begins a new file with the provided [ArchiveHeader], and then Writer can be
|
||||
// treated as an [io.Writer] to supply that file's data.
|
||||
//
|
||||
// It is the caller's responsibility to write entries in lexical order.
|
||||
type Writer struct {
|
||||
// Underlying writer.
|
||||
w io.Writer
|
||||
// Current header.
|
||||
h ArchiveHeader
|
||||
// Fixed-size header segment.
|
||||
buf [wordSize * 2]byte
|
||||
// Current position in data segment.
|
||||
n uint64
|
||||
}
|
||||
|
||||
// NewWriter returns the address of a new [Writer] writing to w.
|
||||
func NewWriter(w io.Writer) *Writer { return &Writer{w: w} }
|
||||
|
||||
var zero [wordSize]byte
|
||||
|
||||
// padSize returns the padding size for aligning sz.
|
||||
func padSize[T int | uint64](sz T) T {
|
||||
return (wordSize - (sz)%wordSize) % wordSize
|
||||
}
|
||||
|
||||
// flush concludes writing to the current file and writes padding.
|
||||
func (aw *Writer) flush() error {
|
||||
if aw.h.Size > aw.n {
|
||||
return fmt.Errorf("missed writing %d bytes", aw.h.Size-aw.n)
|
||||
} else if aw.h.Size < aw.n {
|
||||
return fmt.Errorf("wrote %d bytes beyond end of file", aw.n-aw.h.Size)
|
||||
}
|
||||
|
||||
if psz := padSize(aw.h.Size); psz != 0 {
|
||||
if _, err := aw.w.Write(zero[:psz]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
aw.n = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
// WriteHeader writes h and begins accepting its corresponding file.
|
||||
func (aw *Writer) WriteHeader(h *ArchiveHeader) error {
|
||||
if err := aw.flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
aw.h = *h
|
||||
binary.LittleEndian.PutUint32(aw.buf[:], uint32(aw.h.Mode))
|
||||
binary.LittleEndian.PutUint32(aw.buf[wordSize/2:], uint32(len(aw.h.Path)))
|
||||
binary.LittleEndian.PutUint64(aw.buf[wordSize:], aw.h.Size)
|
||||
if _, err := aw.w.Write(aw.buf[:]); err != nil {
|
||||
return err
|
||||
} else if _, err = aw.w.Write(
|
||||
unsafe.Slice(unsafe.StringData(aw.h.Path), len(aw.h.Path)),
|
||||
); err != nil {
|
||||
return err
|
||||
} else if psz := padSize(len(aw.h.Path)); psz != 0 {
|
||||
if _, err = aw.w.Write(zero[:psz]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write writes p to the underlying writer and records the new position. Invalid
|
||||
// positions are reported by WriteHeader and Close.
|
||||
func (aw *Writer) Write(p []byte) (n int, err error) {
|
||||
n, err = aw.w.Write(p)
|
||||
aw.n += uint64(n)
|
||||
return
|
||||
}
|
||||
|
||||
// Close concludes writing to the archive stream.
|
||||
func (aw *Writer) Close() (err error) {
|
||||
err = aw.flush()
|
||||
aw.w = nil
|
||||
return
|
||||
}
|
||||
|
||||
// ErrInsecurePath is returned by [FlatEntry.Decode] if validation is requested
|
||||
// and a nonlocal path is encountered in the stream.
|
||||
var ErrInsecurePath = errors.New("insecure file path")
|
||||
|
||||
// Reader implements sequential reading of an archive. [Reader.Next] advances to
|
||||
// the next file in the archive (including the first), and then Reader can be
|
||||
// treated as an [io.Reader] to access the file's data.
|
||||
type Reader struct {
|
||||
// Underlying reader.
|
||||
r io.Reader
|
||||
// Fixed-size header segment.
|
||||
buf [wordSize * 2]byte
|
||||
// Remaining bytes in current data segment.
|
||||
n, pad uint64
|
||||
}
|
||||
|
||||
// NewReader returns the address of a new [Reader] reading from r.
|
||||
func NewReader(r io.Reader) *Reader { return &Reader{r: r} }
|
||||
|
||||
// Next advances ar to the next entry. Remaining bytes of the current data
|
||||
// segment are discarded. Advancing beyond the final entry returns [io.EOF].
|
||||
func (ar *Reader) Next() (*ArchiveHeader, error) {
|
||||
if dsz := int64(ar.n + ar.pad); dsz > 0 {
|
||||
if n, err := io.CopyN(io.Discard, ar.r, dsz); err != nil {
|
||||
if errors.Is(err, io.EOF) && n != dsz {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if n, err := ar.r.Read(ar.buf[:]); err != nil {
|
||||
if errors.Is(err, io.EOF) && n != 0 {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
h := ArchiveHeader{
|
||||
Mode: fs.FileMode(binary.LittleEndian.Uint32(ar.buf[:])),
|
||||
Size: binary.LittleEndian.Uint64(ar.buf[wordSize:]),
|
||||
}
|
||||
pathSize := int(binary.LittleEndian.Uint32(ar.buf[wordSize/2:]))
|
||||
pPathSize := alignSize(pathSize)
|
||||
|
||||
buf := make([]byte, pPathSize)
|
||||
if n, err := ar.r.Read(buf); err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
if n != len(buf) {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
h.Path = unsafe.String(unsafe.SliceData(buf), pathSize)
|
||||
return &h, err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
h.Path = unsafe.String(unsafe.SliceData(buf), pathSize)
|
||||
if !filepath.IsLocal(h.Path) {
|
||||
return &h, ErrInsecurePath
|
||||
}
|
||||
|
||||
ar.n = h.Size
|
||||
ar.pad = padSize(h.Size)
|
||||
return &h, nil
|
||||
}
|
||||
|
||||
// Read implements [io.Reader] for the data segment of the current entry.
|
||||
func (ar *Reader) Read(p []byte) (n int, err error) {
|
||||
if uint64(len(p)) > ar.n {
|
||||
p = p[:ar.n]
|
||||
}
|
||||
|
||||
if len(p) > 0 {
|
||||
n, err = ar.r.Read(p)
|
||||
ar.n -= uint64(n)
|
||||
}
|
||||
|
||||
switch err {
|
||||
case io.EOF:
|
||||
if ar.n > 0 {
|
||||
return n, io.ErrUnexpectedEOF
|
||||
}
|
||||
|
||||
case nil:
|
||||
if ar.n == 0 {
|
||||
return n, io.EOF
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Write writes a deterministic representation of the contents of fsys to w.
|
||||
// The resulting data can be hashed to produce a deterministic checksum for the
|
||||
// directory.
|
||||
func Write(fsys fs.FS, root string, w io.Writer) error {
|
||||
aw := NewWriter(w)
|
||||
if err := fs.WalkDir(fsys, root, func(path string, d fs.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var fi fs.FileInfo
|
||||
fi, err = d.Info()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
h := ArchiveHeader{
|
||||
Path: path,
|
||||
Mode: fi.Mode(),
|
||||
}
|
||||
if h.Mode.IsRegular() {
|
||||
h.Size = uint64(fi.Size())
|
||||
if err = aw.WriteHeader(&h); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var r fs.File
|
||||
r, err = fsys.Open(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = io.Copy(aw, r)
|
||||
if _err := r.Close(); err == nil {
|
||||
err = _err
|
||||
}
|
||||
return err
|
||||
} else if h.Mode&fs.ModeSymlink != 0 {
|
||||
var newpath string
|
||||
if newpath, err = fs.ReadLink(fsys, path); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
h.Size = uint64(len(newpath))
|
||||
if err = aw.WriteHeader(&h); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = aw.Write(unsafe.Slice(unsafe.StringData(newpath), len(newpath)))
|
||||
return err
|
||||
} else if !h.Mode.IsDir() {
|
||||
return InvalidFileModeError(h.Mode)
|
||||
}
|
||||
return aw.WriteHeader(&h)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
return aw.Close()
|
||||
}
|
||||
|
||||
// HashFS returns a checksum produced by hashing the result of [Flatten].
|
||||
func HashFS(buf *Checksum, fsys fs.FS, root string) error {
|
||||
h := sha512.New384()
|
||||
if err := Write(fsys, root, h); err != nil {
|
||||
return err
|
||||
}
|
||||
h.Sum(buf[:0])
|
||||
return nil
|
||||
}
|
||||
|
||||
// HashDir returns a checksum produced by hashing the result of [Flatten].
|
||||
func HashDir(buf *Checksum, pathname *check.Absolute) error {
|
||||
return HashFS(buf, os.DirFS(pathname.String()), ".")
|
||||
}
|
||||
Reference in New Issue
Block a user