internal/pkg: stream decompress artifact
All checks were successful
Test / Create distribution (push) Successful in 2m56s
Test / Sandbox (push) Successful in 6m55s
Test / Hakurei (push) Successful in 9m46s
Test / ShareFS (push) Successful in 10m21s
Test / Sandbox (race detector) (push) Successful in 10m44s
Test / Hakurei (race detector) (push) Successful in 14m34s
Test / Flake checks (push) Successful in 3m14s
All checks were successful
Test / Create distribution (push) Successful in 2m56s
Test / Sandbox (push) Successful in 6m55s
Test / Hakurei (push) Successful in 9m46s
Test / ShareFS (push) Successful in 10m21s
Test / Sandbox (race detector) (push) Successful in 10m44s
Test / Hakurei (race detector) (push) Successful in 14m34s
Test / Flake checks (push) Successful in 3m14s
The tarArtifact predates FileArtifact pipelining. This migrates decompression and buffering into a standalone artifact implementation. Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
119
internal/pkg/compress.go
Normal file
119
internal/pkg/compress.go
Normal file
@@ -0,0 +1,119 @@
|
|||||||
|
package pkg
|
||||||
|
|
||||||
|
import (
|
||||||
|
"compress/bzip2"
|
||||||
|
"compress/gzip"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Gzip denotes a stream compressed via [gzip].
|
||||||
|
Gzip = iota
|
||||||
|
// Bzip2 denotes a stream compressed via [bzip2].
|
||||||
|
Bzip2
|
||||||
|
)
|
||||||
|
|
||||||
|
// A decompressArtifact is a [FileArtifact] decompressing a backing
|
||||||
|
// [FileArtifact] stream.
|
||||||
|
type decompressArtifact struct {
|
||||||
|
// Caller-supplied backing stream.
|
||||||
|
f Artifact
|
||||||
|
// Compression on top of the stream.
|
||||||
|
compress uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ FileArtifact = new(decompressArtifact)
|
||||||
|
|
||||||
|
// decompressArtifactNamed embeds decompressArtifact for a [fmt.Stringer] stream.
|
||||||
|
type decompressArtifactNamed struct {
|
||||||
|
decompressArtifact
|
||||||
|
// Copied from decompressArtifact.f.
|
||||||
|
name string
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ fmt.Stringer = new(decompressArtifactNamed)
|
||||||
|
|
||||||
|
// NewDecompress returns a [FileArtifact] decompressing the supplied [Artifact].
|
||||||
|
func NewDecompress(a Artifact, compress uint32) Artifact {
|
||||||
|
da := decompressArtifact{a, compress}
|
||||||
|
if s, ok := a.(fmt.Stringer); ok {
|
||||||
|
if name := s.String(); name != "" {
|
||||||
|
return &decompressArtifactNamed{da, name}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &da
|
||||||
|
}
|
||||||
|
|
||||||
|
// String returns the name of the underlying [Artifact] suffixed with decompress.
|
||||||
|
func (a *decompressArtifactNamed) String() string { return a.name + "-decompress" }
|
||||||
|
|
||||||
|
// Kind returns the hardcoded [Kind] constant.
|
||||||
|
func (a *decompressArtifact) Kind() Kind { return KindDecompress }
|
||||||
|
|
||||||
|
// Params writes value of compression enum.
|
||||||
|
func (a *decompressArtifact) Params(ctx *IContext) { ctx.WriteUint32(a.compress) }
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
register(KindDecompress, func(r *IRReader) Artifact {
|
||||||
|
a := NewDecompress(r.Next(), r.ReadUint32())
|
||||||
|
if _, ok := r.Finalise(); ok {
|
||||||
|
panic(ErrUnexpectedChecksum)
|
||||||
|
}
|
||||||
|
return a
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dependencies returns a slice containing the backing file.
|
||||||
|
func (a *decompressArtifact) Dependencies() []Artifact {
|
||||||
|
return []Artifact{a.f}
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsExclusive returns false: decompressor is fully sequential.
|
||||||
|
func (a *decompressArtifact) IsExclusive() bool { return false }
|
||||||
|
|
||||||
|
// compoundCloser is an [io.ReadCloser] with an additional [io.Closer] attached.
|
||||||
|
type compoundCloser struct {
|
||||||
|
io.ReadCloser
|
||||||
|
c io.Closer
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes [io.ReadCloser] and the additional [io.Closer]. It returns the
|
||||||
|
// non-nil error returned by the underlying [io.ReadCloser], otherwise it
|
||||||
|
// returns the error returned by the additional [io.Closer].
|
||||||
|
func (c compoundCloser) Close() error {
|
||||||
|
err := c.ReadCloser.Close()
|
||||||
|
if _err := c.c.Close(); err == nil {
|
||||||
|
err = _err
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cure returns a decompressor [io.ReadCloser].
|
||||||
|
func (a *decompressArtifact) Cure(r *RContext) (io.ReadCloser, error) {
|
||||||
|
sr, err := r.Open(a.f)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
br := r.cache.getReaderRC(sr)
|
||||||
|
|
||||||
|
var dr io.ReadCloser
|
||||||
|
switch a.compress {
|
||||||
|
case Gzip:
|
||||||
|
if dr, err = gzip.NewReader(br); err != nil {
|
||||||
|
_ = br.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return compoundCloser{dr, br}, nil
|
||||||
|
|
||||||
|
case Bzip2:
|
||||||
|
return struct {
|
||||||
|
io.Reader
|
||||||
|
io.Closer
|
||||||
|
}{bzip2.NewReader(br), br}, nil
|
||||||
|
|
||||||
|
default:
|
||||||
|
return nil, os.ErrInvalid
|
||||||
|
}
|
||||||
|
}
|
||||||
70
internal/pkg/compress_test.go
Normal file
70
internal/pkg/compress_test.go
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
package pkg_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"compress/gzip"
|
||||||
|
"crypto/sha512"
|
||||||
|
"io/fs"
|
||||||
|
"net/http"
|
||||||
|
"testing"
|
||||||
|
"testing/fstest"
|
||||||
|
|
||||||
|
"hakurei.app/check"
|
||||||
|
"hakurei.app/internal/pkg"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDecompress(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
gw := gzip.NewWriter(&buf)
|
||||||
|
if _, err := gw.Write([]byte{0}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if err = gw.Close(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
testdata := buf.String()
|
||||||
|
|
||||||
|
var transport http.Transport
|
||||||
|
client := http.Client{Transport: &transport}
|
||||||
|
transport.RegisterProtocol("file", http.NewFileTransportFS(fstest.MapFS{
|
||||||
|
"testdata": {Data: []byte(testdata), Mode: 0400},
|
||||||
|
}))
|
||||||
|
testdataChecksum := func() pkg.Checksum {
|
||||||
|
h := sha512.New384()
|
||||||
|
h.Write([]byte(testdata))
|
||||||
|
return (pkg.Checksum)(h.Sum(nil))
|
||||||
|
}()
|
||||||
|
|
||||||
|
checkWithCache(t, []cacheTestCase{
|
||||||
|
{"decompress", 0, nil, func(t *testing.T, base *check.Absolute, c *pkg.Cache) {
|
||||||
|
cureMany(t, c, []cureStep{
|
||||||
|
{"close", pkg.NewDecompress(pkg.NewHTTPGet(
|
||||||
|
&client,
|
||||||
|
"file:///testdata",
|
||||||
|
pkg.Checksum{0xfd},
|
||||||
|
), pkg.Gzip), nil, nil, &pkg.ChecksumMismatchError{
|
||||||
|
Got: testdataChecksum,
|
||||||
|
Want: pkg.Checksum{0xfd},
|
||||||
|
}},
|
||||||
|
|
||||||
|
{"gzip", pkg.NewDecompress(pkg.NewHTTPGet(
|
||||||
|
&client,
|
||||||
|
"file:///testdata",
|
||||||
|
testdataChecksum,
|
||||||
|
), pkg.Gzip), ignorePathname, expectsChecksum(sha512.Sum384([]byte{0})), nil},
|
||||||
|
})
|
||||||
|
}, expectsFS{
|
||||||
|
".": {Mode: fs.ModeDir | 0700},
|
||||||
|
|
||||||
|
"checksum": {Mode: fs.ModeDir | 0700},
|
||||||
|
"checksum/" + pkg.Encode(sha512.Sum384([]byte{0})): {Mode: 0400, Data: []byte{0}},
|
||||||
|
|
||||||
|
"identifier": {Mode: fs.ModeDir | 0700},
|
||||||
|
"identifier/QpjkahDrz7pz-tv0eAGNXR6x9NAtTjWCK5Hr7G1cIZj9rT7bLYJWUQeLD4wamAlF": {Mode: fs.ModeSymlink | 0777, Data: []byte("../checksum/vsAhtPNo4waRNOASwrQwcIPTqb3SBuJOXw2G4T1mNmVZM-wrQTRllmgXqcIIoRcX")},
|
||||||
|
|
||||||
|
"substitute": {Mode: fs.ModeDir | 0700},
|
||||||
|
"work": {Mode: fs.ModeDir | 0700},
|
||||||
|
}},
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -104,6 +104,8 @@ func TestIRRoundtrip(t *testing.T) {
|
|||||||
|
|
||||||
{"file anonymous", pkg.NewFile("", []byte{0})},
|
{"file anonymous", pkg.NewFile("", []byte{0})},
|
||||||
{"file", pkg.NewFile("stub", []byte("stub"))},
|
{"file", pkg.NewFile("stub", []byte("stub"))},
|
||||||
|
|
||||||
|
{"decompress", pkg.NewDecompress(pkg.NewFile("", []byte{0}), pkg.Bzip2)},
|
||||||
}
|
}
|
||||||
testCasesCache := make([]cacheTestCase, len(testCases))
|
testCasesCache := make([]cacheTestCase, len(testCases))
|
||||||
for i, tc := range testCases {
|
for i, tc := range testCases {
|
||||||
|
|||||||
@@ -508,6 +508,8 @@ const (
|
|||||||
KindExecNet
|
KindExecNet
|
||||||
// KindFile is the kind of [Artifact] returned by [NewFile].
|
// KindFile is the kind of [Artifact] returned by [NewFile].
|
||||||
KindFile
|
KindFile
|
||||||
|
// KindDecompress is the kind of [Artifact] returned by [NewDecompress].
|
||||||
|
KindDecompress
|
||||||
|
|
||||||
// _kindEnd is the total number of kinds and does not denote a kind.
|
// _kindEnd is the total number of kinds and does not denote a kind.
|
||||||
_kindEnd
|
_kindEnd
|
||||||
@@ -795,6 +797,38 @@ func (c *Cache) getReader(r io.Reader) *bufio.Reader {
|
|||||||
// putReader adds br to brPool.
|
// putReader adds br to brPool.
|
||||||
func (c *Cache) putReader(br *bufio.Reader) { c.brPool.Put(br) }
|
func (c *Cache) putReader(br *bufio.Reader) { c.brPool.Put(br) }
|
||||||
|
|
||||||
|
// bufioReadCloser is the concrete type of value returned by Cache.getReaderRC.
|
||||||
|
type bufioReadCloser struct {
|
||||||
|
// Saved close error.
|
||||||
|
closeErr error
|
||||||
|
// Synchronises calls to Close.
|
||||||
|
closeOnce sync.Once
|
||||||
|
|
||||||
|
// For backing freelist.
|
||||||
|
c *Cache
|
||||||
|
// Underlying reader.
|
||||||
|
r io.ReadCloser
|
||||||
|
// Allocated from c.
|
||||||
|
*bufio.Reader
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the underlying reader, saves its return value, and returns the
|
||||||
|
// [bufio.Reader] instance to the backing [Cache].
|
||||||
|
func (brc *bufioReadCloser) Close() error {
|
||||||
|
brc.closeOnce.Do(func() {
|
||||||
|
br := brc.Reader
|
||||||
|
brc.Reader = nil
|
||||||
|
brc.c.putReader(br)
|
||||||
|
brc.closeErr = brc.r.Close()
|
||||||
|
})
|
||||||
|
return brc.closeErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// getReaderRC is like getReader, but returns an [io.ReadCloser].
|
||||||
|
func (c *Cache) getReaderRC(r io.ReadCloser) io.ReadCloser {
|
||||||
|
return &bufioReadCloser{c: c, r: r, Reader: c.getReader(r)}
|
||||||
|
}
|
||||||
|
|
||||||
// getWriter is like [bufio.NewWriter] but for bwPool.
|
// getWriter is like [bufio.NewWriter] but for bwPool.
|
||||||
func (c *Cache) getWriter(w io.Writer) *bufio.Writer {
|
func (c *Cache) getWriter(w io.Writer) *bufio.Writer {
|
||||||
bw := c.bwPool.Get().(*bufio.Writer)
|
bw := c.bwPool.Get().(*bufio.Writer)
|
||||||
|
|||||||
@@ -293,15 +293,15 @@ func TestIdent(t *testing.T) {
|
|||||||
a pkg.Artifact
|
a pkg.Artifact
|
||||||
want unique.Handle[pkg.ID]
|
want unique.Handle[pkg.ID]
|
||||||
}{
|
}{
|
||||||
{"tar", &stubArtifact{
|
{"decompress", &stubArtifact{
|
||||||
pkg.KindTar,
|
pkg.KindDecompress,
|
||||||
[]byte{pkg.TarGzip, 0, 0, 0, 0, 0, 0, 0},
|
[]byte{pkg.Gzip, 0, 0, 0, 0, 0, 0, 0},
|
||||||
[]pkg.Artifact{
|
[]pkg.Artifact{
|
||||||
overrideIdent{pkg.ID{}, new(stubArtifact)},
|
overrideIdent{pkg.ID{}, new(stubArtifact)},
|
||||||
},
|
},
|
||||||
nil,
|
nil,
|
||||||
}, unique.Make[pkg.ID](pkg.MustDecode(
|
}, unique.Make[pkg.ID](pkg.MustDecode(
|
||||||
"WKErnjTOVbuH2P9a0gM4OcAAO4p-CoX2HQu7CbZrg8ZOzApvWoO3-ISzPw6av_rN",
|
"97Y85QewssfPbNIN9cyNhzD4e6dLHcDTU8rb2c34k-aCrZfBNXFUc0duPiLFFcw_",
|
||||||
))},
|
))},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user