From 76c1fb84c87ba12a22b64f7c471a92603365837f Mon Sep 17 00:00:00 2001 From: Ophestra Date: Thu, 4 Jun 2026 18:12:03 +0900 Subject: [PATCH] internal/pkg: stream decompress artifact The tarArtifact predates FileArtifact pipelining. This migrates decompression and buffering into a standalone artifact implementation. Signed-off-by: Ophestra --- internal/pkg/compress.go | 119 ++++++++++++++++++++++++++++++++++ internal/pkg/compress_test.go | 70 ++++++++++++++++++++ internal/pkg/ir_test.go | 2 + internal/pkg/pkg.go | 34 ++++++++++ internal/pkg/pkg_test.go | 8 +-- 5 files changed, 229 insertions(+), 4 deletions(-) create mode 100644 internal/pkg/compress.go create mode 100644 internal/pkg/compress_test.go diff --git a/internal/pkg/compress.go b/internal/pkg/compress.go new file mode 100644 index 00000000..310fe065 --- /dev/null +++ b/internal/pkg/compress.go @@ -0,0 +1,119 @@ +package pkg + +import ( + "compress/bzip2" + "compress/gzip" + "fmt" + "io" + "os" +) + +const ( + // Gzip denotes a stream compressed via [gzip]. + Gzip = iota + // Bzip2 denotes a stream compressed via [bzip2]. + Bzip2 +) + +// A decompressArtifact is a [FileArtifact] decompressing a backing +// [FileArtifact] stream. +type decompressArtifact struct { + // Caller-supplied backing stream. + f Artifact + // Compression on top of the stream. + compress uint32 +} + +var _ FileArtifact = new(decompressArtifact) + +// decompressArtifactNamed embeds decompressArtifact for a [fmt.Stringer] stream. +type decompressArtifactNamed struct { + decompressArtifact + // Copied from decompressArtifact.f. + name string +} + +var _ fmt.Stringer = new(decompressArtifactNamed) + +// NewDecompress returns a [FileArtifact] decompressing the supplied [Artifact]. +func NewDecompress(a Artifact, compress uint32) Artifact { + da := decompressArtifact{a, compress} + if s, ok := a.(fmt.Stringer); ok { + if name := s.String(); name != "" { + return &decompressArtifactNamed{da, name} + } + } + return &da +} + +// String returns the name of the underlying [Artifact] suffixed with decompress. +func (a *decompressArtifactNamed) String() string { return a.name + "-decompress" } + +// Kind returns the hardcoded [Kind] constant. +func (a *decompressArtifact) Kind() Kind { return KindDecompress } + +// Params writes value of compression enum. +func (a *decompressArtifact) Params(ctx *IContext) { ctx.WriteUint32(a.compress) } + +func init() { + register(KindDecompress, func(r *IRReader) Artifact { + a := NewDecompress(r.Next(), r.ReadUint32()) + if _, ok := r.Finalise(); ok { + panic(ErrUnexpectedChecksum) + } + return a + }) +} + +// Dependencies returns a slice containing the backing file. +func (a *decompressArtifact) Dependencies() []Artifact { + return []Artifact{a.f} +} + +// IsExclusive returns false: decompressor is fully sequential. +func (a *decompressArtifact) IsExclusive() bool { return false } + +// compoundCloser is an [io.ReadCloser] with an additional [io.Closer] attached. +type compoundCloser struct { + io.ReadCloser + c io.Closer +} + +// Close closes [io.ReadCloser] and the additional [io.Closer]. It returns the +// non-nil error returned by the underlying [io.ReadCloser], otherwise it +// returns the error returned by the additional [io.Closer]. +func (c compoundCloser) Close() error { + err := c.ReadCloser.Close() + if _err := c.c.Close(); err == nil { + err = _err + } + return err +} + +// Cure returns a decompressor [io.ReadCloser]. +func (a *decompressArtifact) Cure(r *RContext) (io.ReadCloser, error) { + sr, err := r.Open(a.f) + if err != nil { + return nil, err + } + br := r.cache.getReaderRC(sr) + + var dr io.ReadCloser + switch a.compress { + case Gzip: + if dr, err = gzip.NewReader(br); err != nil { + _ = br.Close() + return nil, err + } + return compoundCloser{dr, br}, nil + + case Bzip2: + return struct { + io.Reader + io.Closer + }{bzip2.NewReader(br), br}, nil + + default: + return nil, os.ErrInvalid + } +} diff --git a/internal/pkg/compress_test.go b/internal/pkg/compress_test.go new file mode 100644 index 00000000..5b542dce --- /dev/null +++ b/internal/pkg/compress_test.go @@ -0,0 +1,70 @@ +package pkg_test + +import ( + "bytes" + "compress/gzip" + "crypto/sha512" + "io/fs" + "net/http" + "testing" + "testing/fstest" + + "hakurei.app/check" + "hakurei.app/internal/pkg" +) + +func TestDecompress(t *testing.T) { + t.Parallel() + + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + if _, err := gw.Write([]byte{0}); err != nil { + t.Fatal(err) + } else if err = gw.Close(); err != nil { + t.Fatal(err) + } + testdata := buf.String() + + var transport http.Transport + client := http.Client{Transport: &transport} + transport.RegisterProtocol("file", http.NewFileTransportFS(fstest.MapFS{ + "testdata": {Data: []byte(testdata), Mode: 0400}, + })) + testdataChecksum := func() pkg.Checksum { + h := sha512.New384() + h.Write([]byte(testdata)) + return (pkg.Checksum)(h.Sum(nil)) + }() + + checkWithCache(t, []cacheTestCase{ + {"decompress", 0, nil, func(t *testing.T, base *check.Absolute, c *pkg.Cache) { + cureMany(t, c, []cureStep{ + {"close", pkg.NewDecompress(pkg.NewHTTPGet( + &client, + "file:///testdata", + pkg.Checksum{0xfd}, + ), pkg.Gzip), nil, nil, &pkg.ChecksumMismatchError{ + Got: testdataChecksum, + Want: pkg.Checksum{0xfd}, + }}, + + {"gzip", pkg.NewDecompress(pkg.NewHTTPGet( + &client, + "file:///testdata", + testdataChecksum, + ), pkg.Gzip), ignorePathname, expectsChecksum(sha512.Sum384([]byte{0})), nil}, + }) + }, expectsFS{ + ".": {Mode: fs.ModeDir | 0700}, + + "checksum": {Mode: fs.ModeDir | 0700}, + "checksum/" + pkg.Encode(sha512.Sum384([]byte{0})): {Mode: 0400, Data: []byte{0}}, + + "identifier": {Mode: fs.ModeDir | 0700}, + "identifier/QpjkahDrz7pz-tv0eAGNXR6x9NAtTjWCK5Hr7G1cIZj9rT7bLYJWUQeLD4wamAlF": {Mode: fs.ModeSymlink | 0777, Data: []byte("../checksum/vsAhtPNo4waRNOASwrQwcIPTqb3SBuJOXw2G4T1mNmVZM-wrQTRllmgXqcIIoRcX")}, + + "substitute": {Mode: fs.ModeDir | 0700}, + "work": {Mode: fs.ModeDir | 0700}, + }}, + }) +} diff --git a/internal/pkg/ir_test.go b/internal/pkg/ir_test.go index 9a465e55..d49d2c56 100644 --- a/internal/pkg/ir_test.go +++ b/internal/pkg/ir_test.go @@ -104,6 +104,8 @@ func TestIRRoundtrip(t *testing.T) { {"file anonymous", pkg.NewFile("", []byte{0})}, {"file", pkg.NewFile("stub", []byte("stub"))}, + + {"decompress", pkg.NewDecompress(pkg.NewFile("", []byte{0}), pkg.Bzip2)}, } testCasesCache := make([]cacheTestCase, len(testCases)) for i, tc := range testCases { diff --git a/internal/pkg/pkg.go b/internal/pkg/pkg.go index 8b022d1f..09d91f73 100644 --- a/internal/pkg/pkg.go +++ b/internal/pkg/pkg.go @@ -508,6 +508,8 @@ const ( KindExecNet // KindFile is the kind of [Artifact] returned by [NewFile]. KindFile + // KindDecompress is the kind of [Artifact] returned by [NewDecompress]. + KindDecompress // _kindEnd is the total number of kinds and does not denote a kind. _kindEnd @@ -795,6 +797,38 @@ func (c *Cache) getReader(r io.Reader) *bufio.Reader { // putReader adds br to brPool. func (c *Cache) putReader(br *bufio.Reader) { c.brPool.Put(br) } +// bufioReadCloser is the concrete type of value returned by Cache.getReaderRC. +type bufioReadCloser struct { + // Saved close error. + closeErr error + // Synchronises calls to Close. + closeOnce sync.Once + + // For backing freelist. + c *Cache + // Underlying reader. + r io.ReadCloser + // Allocated from c. + *bufio.Reader +} + +// Close closes the underlying reader, saves its return value, and returns the +// [bufio.Reader] instance to the backing [Cache]. +func (brc *bufioReadCloser) Close() error { + brc.closeOnce.Do(func() { + br := brc.Reader + brc.Reader = nil + brc.c.putReader(br) + brc.closeErr = brc.r.Close() + }) + return brc.closeErr +} + +// getReaderRC is like getReader, but returns an [io.ReadCloser]. +func (c *Cache) getReaderRC(r io.ReadCloser) io.ReadCloser { + return &bufioReadCloser{c: c, r: r, Reader: c.getReader(r)} +} + // getWriter is like [bufio.NewWriter] but for bwPool. func (c *Cache) getWriter(w io.Writer) *bufio.Writer { bw := c.bwPool.Get().(*bufio.Writer) diff --git a/internal/pkg/pkg_test.go b/internal/pkg/pkg_test.go index 25deb95c..ce11fa90 100644 --- a/internal/pkg/pkg_test.go +++ b/internal/pkg/pkg_test.go @@ -293,15 +293,15 @@ func TestIdent(t *testing.T) { a pkg.Artifact want unique.Handle[pkg.ID] }{ - {"tar", &stubArtifact{ - pkg.KindTar, - []byte{pkg.TarGzip, 0, 0, 0, 0, 0, 0, 0}, + {"decompress", &stubArtifact{ + pkg.KindDecompress, + []byte{pkg.Gzip, 0, 0, 0, 0, 0, 0, 0}, []pkg.Artifact{ overrideIdent{pkg.ID{}, new(stubArtifact)}, }, nil, }, unique.Make[pkg.ID](pkg.MustDecode( - "WKErnjTOVbuH2P9a0gM4OcAAO4p-CoX2HQu7CbZrg8ZOzApvWoO3-ISzPw6av_rN", + "97Y85QewssfPbNIN9cyNhzD4e6dLHcDTU8rb2c34k-aCrZfBNXFUc0duPiLFFcw_", ))}, }