streamdata: channel I/O helpers
These takes assets as a stream alongside their metadata. Signed-off-by: Yonah <contrib@gensokyo.uk>
This commit is contained in:
199
streamdata.go
Normal file
199
streamdata.go
Normal file
@@ -0,0 +1,199 @@
|
||||
// Package streamdata provides a simple API for downloading VODs and maintaining
|
||||
// basic metadata alongside them.
|
||||
package streamdata
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Channel represents a Twitch channel.
|
||||
type Channel struct {
|
||||
// Numerical identifier specific to the channel, returned by Twitch.
|
||||
Identifier uint64 `json:"id"`
|
||||
// Unique, mutable channel name.
|
||||
Name string `json:"name"`
|
||||
|
||||
// Directory to place persistent data in.
|
||||
root *os.Root
|
||||
}
|
||||
|
||||
// Is returns whether target is equivalent to c.
|
||||
func (c *Channel) Is(target *Channel) bool {
|
||||
return (c == nil && target == nil) || (c != nil && target != nil &&
|
||||
c.Identifier == target.Identifier &&
|
||||
c.Name == target.Name)
|
||||
}
|
||||
|
||||
// Close closes the underlying on-disk representation.
|
||||
func (c *Channel) Close() error {
|
||||
if c == nil || c.root == nil {
|
||||
return syscall.EINVAL
|
||||
}
|
||||
return c.root.Close()
|
||||
}
|
||||
|
||||
const (
|
||||
// channelPathMetadata points to the metadata file relative to Channel.root.
|
||||
channelPathMetadata = "channel"
|
||||
// channelPathVOD points to the vod directory relative to Channel.root.
|
||||
channelPathVOD = "vod"
|
||||
// ChannelVODSuffix is the (apparently) hardcoded asset file name suffix.
|
||||
ChannelVODSuffix = ".mp4"
|
||||
// channelPathPending points to the transaction backing file relative to Channel.root.
|
||||
channelPathPending = "pending"
|
||||
)
|
||||
|
||||
// Create initialises the on-disk representation of a [Channel].
|
||||
func (c *Channel) Create(pathname string) error {
|
||||
if c == nil || c.root != nil || c.Identifier == 0 || c.Name == "" {
|
||||
return syscall.EINVAL
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(pathname, 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
if root, err := os.OpenRoot(pathname); err != nil {
|
||||
return err
|
||||
} else {
|
||||
c.root = root
|
||||
}
|
||||
|
||||
if w, err := c.root.OpenFile(
|
||||
channelPathMetadata,
|
||||
os.O_WRONLY|os.O_CREATE|os.O_EXCL,
|
||||
0444,
|
||||
); err != nil {
|
||||
_ = c.root.Close()
|
||||
return err
|
||||
} else if err = json.NewEncoder(w).Encode(c); err != nil {
|
||||
_ = c.root.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.root.Mkdir(channelPathVOD, 0755); err != nil {
|
||||
_ = c.root.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ErrDanglingTransaction is returned when attempting to [Open] the on-disk
|
||||
// representation of a [Channel] with a dangling transaction backing file.
|
||||
var ErrDanglingTransaction = errors.New("dangling transaction backing file")
|
||||
|
||||
// Open opens the on-disk representation of [Channel] and returns its address.
|
||||
func Open(pathname string) (*Channel, error) {
|
||||
var c Channel
|
||||
if root, err := os.OpenRoot(pathname); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
c.root = root
|
||||
}
|
||||
|
||||
if _, err := c.root.Lstat(channelPathPending); err != nil {
|
||||
if !errors.Is(err, os.ErrNotExist) {
|
||||
_ = c.root.Close()
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
_ = c.root.Close()
|
||||
return nil, ErrDanglingTransaction
|
||||
}
|
||||
|
||||
if f, err := c.root.Open(channelPathMetadata); err != nil {
|
||||
_ = c.root.Close()
|
||||
return nil, err
|
||||
} else if err = json.NewDecoder(f).Decode(&c); err != nil {
|
||||
_ = c.root.Close()
|
||||
return nil, err
|
||||
}
|
||||
return &c, nil
|
||||
}
|
||||
|
||||
// VOD holds additional metadata associated with a vod.
|
||||
type VOD struct {
|
||||
// Stream title.
|
||||
Title string `json:"title"`
|
||||
// Day of stream start.
|
||||
Date time.Time `json:"date"`
|
||||
// Free-form category string.
|
||||
Category string `json:"category"`
|
||||
}
|
||||
|
||||
// ChannelMismatchError describes a mismatching [Ident.Channel] passed to [Channel.Add].
|
||||
type ChannelMismatchError struct {
|
||||
Got, Want uint64
|
||||
}
|
||||
|
||||
func (c *ChannelMismatchError) Error() string {
|
||||
return "attempting to add VOD from channel " +
|
||||
strconv.FormatUint(c.Got, 10) + " to channel " +
|
||||
strconv.FormatUint(c.Want, 10)
|
||||
}
|
||||
|
||||
// Add adds a [VOD] and its corresponding asset to the on-disk representation.
|
||||
func (c *Channel) Add(ident *Ident, f func(v *VOD, w io.Writer) error) error {
|
||||
if ident == nil || f == nil {
|
||||
return syscall.EINVAL
|
||||
}
|
||||
if ident.Channel != c.Identifier {
|
||||
return &ChannelMismatchError{ident.Channel, c.Identifier}
|
||||
}
|
||||
|
||||
w, err := c.root.OpenFile(
|
||||
channelPathPending,
|
||||
os.O_WRONLY|os.O_CREATE|os.O_EXCL,
|
||||
0,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var v VOD
|
||||
err = f(&v, w)
|
||||
if closeErr := w.Close(); err == nil {
|
||||
err = closeErr
|
||||
}
|
||||
if err != nil {
|
||||
_ = c.root.Remove(channelPathPending)
|
||||
return err
|
||||
}
|
||||
|
||||
if err = c.root.Chmod(channelPathPending, 0444); err != nil {
|
||||
return err
|
||||
}
|
||||
pathname := path.Join(channelPathVOD, ident.String())
|
||||
|
||||
if w, err = c.root.OpenFile(
|
||||
pathname,
|
||||
os.O_WRONLY|os.O_CREATE|os.O_EXCL,
|
||||
0444,
|
||||
); err != nil {
|
||||
return err
|
||||
} else if err = json.NewEncoder(w).Encode(&v); err != nil {
|
||||
_ = w.Close()
|
||||
_ = c.root.Remove(pathname)
|
||||
return err
|
||||
} else if err = w.Close(); err != nil {
|
||||
_ = c.root.Remove(pathname)
|
||||
return err
|
||||
}
|
||||
|
||||
if err = c.root.Rename(
|
||||
channelPathPending,
|
||||
pathname+ChannelVODSuffix,
|
||||
); err != nil {
|
||||
_ = c.root.Remove(pathname)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1,11 +1,312 @@
|
||||
package streamdata_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.gensokyo.uk/yonah/streamdata"
|
||||
)
|
||||
|
||||
func TestChannelInvalid(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
if err := (*streamdata.Channel)(nil).Create(""); !reflect.DeepEqual(err, syscall.EINVAL) {
|
||||
t.Fatalf("Create: error = %v", err)
|
||||
}
|
||||
if err := (*streamdata.Channel)(nil).Close(); !reflect.DeepEqual(err, syscall.EINVAL) {
|
||||
t.Fatalf("Close: error = %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestChannelOpenPerm(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := t.TempDir()
|
||||
if err := os.Chmod(d, 0); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
wantErr := &os.PathError{
|
||||
Op: "open",
|
||||
Path: d,
|
||||
Err: syscall.EACCES,
|
||||
}
|
||||
|
||||
t.Run("open", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
if _, err := streamdata.Open(d); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Fatalf("Open: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("create", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("root", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
if err := (&streamdata.Channel{
|
||||
Identifier: 0xcafe,
|
||||
Name: ":3",
|
||||
}).Create(d); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Fatalf("Create: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("mkdir", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
wantErrMkdir := &os.PathError{
|
||||
Op: "mkdir",
|
||||
Path: path.Join(d, "stream"),
|
||||
Err: syscall.EACCES,
|
||||
}
|
||||
if err := (&streamdata.Channel{
|
||||
Identifier: 0xcafe,
|
||||
Name: ":3",
|
||||
}).Create(path.Join(d, "stream")); !reflect.DeepEqual(err, wantErrMkdir) {
|
||||
t.Fatalf("Create: error = %#v, want %#v", err, wantErrMkdir)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("metadata", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
nx := t.TempDir()
|
||||
if err := os.Chmod(nx, 0400); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
wantErrMetadata := &os.PathError{
|
||||
Op: "openat",
|
||||
Path: "channel",
|
||||
Err: syscall.EACCES,
|
||||
}
|
||||
if err := (&streamdata.Channel{
|
||||
Identifier: 0xcafe,
|
||||
Name: ":3",
|
||||
}).Create(nx); !reflect.DeepEqual(err, wantErrMetadata) {
|
||||
t.Fatalf("Create: error = %#v, want %#v", err, wantErrMetadata)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestChannelCreate(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := t.TempDir()
|
||||
c := streamdata.Channel{
|
||||
Identifier: 0xcafe,
|
||||
Name: ":3",
|
||||
}
|
||||
|
||||
if err := c.Create(d); err != nil {
|
||||
t.Fatalf("Create: error = %v", err)
|
||||
}
|
||||
if err := c.Close(); err != nil {
|
||||
t.Fatalf("Close: error = %v", err)
|
||||
}
|
||||
|
||||
if got, err := streamdata.Open(d); err != nil {
|
||||
t.Fatalf("Open: error = %v", err)
|
||||
} else if err = got.Close(); err != nil {
|
||||
t.Fatalf("Close: error = %v", err)
|
||||
} else if !got.Is(&c) {
|
||||
t.Errorf("Open: %#v, want %#v", got, c)
|
||||
}
|
||||
}
|
||||
|
||||
func TestChannelDangling(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("dangling", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := t.TempDir()
|
||||
if err := os.WriteFile(path.Join(d, "pending"), nil, 0); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := streamdata.Open(d); !reflect.DeepEqual(err, streamdata.ErrDanglingTransaction) {
|
||||
t.Errorf("Open: error = %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("perm", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := t.TempDir()
|
||||
if err := os.Chmod(d, 0400); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
wantErr := &os.PathError{
|
||||
Op: "statat",
|
||||
Path: "pending",
|
||||
Err: syscall.EACCES,
|
||||
}
|
||||
if _, err := streamdata.Open(d); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Errorf("Open: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestChannelBadMetadata(t *testing.T) {
|
||||
t.Run("perm", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := t.TempDir()
|
||||
if err := os.WriteFile(path.Join(d, "channel"), nil, 0); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
wantErr := &os.PathError{
|
||||
Op: "openat",
|
||||
Path: "channel",
|
||||
Err: syscall.EACCES,
|
||||
}
|
||||
if _, err := streamdata.Open(d); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Errorf("Open: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("invalid", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := t.TempDir()
|
||||
if err := os.WriteFile(path.Join(d, "channel"), nil, 0400); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
wantErr := io.EOF
|
||||
if _, err := streamdata.Open(d); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Errorf("Open: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestChannelAdd(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := t.TempDir()
|
||||
c := streamdata.Channel{
|
||||
Identifier: 0xcafe,
|
||||
Name: ":3",
|
||||
}
|
||||
|
||||
if err := c.Create(d); err != nil {
|
||||
t.Fatalf("Create: error = %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
if err := c.Close(); err != nil {
|
||||
t.Fatalf("Close: error = %v", err)
|
||||
}
|
||||
})
|
||||
var wantErr error
|
||||
|
||||
if err := c.Add(nil, nil); !reflect.DeepEqual(err, syscall.EINVAL) {
|
||||
t.Errorf("(invalid) Add: error = %#v", err)
|
||||
}
|
||||
|
||||
wantErr = &streamdata.ChannelMismatchError{
|
||||
Got: 0xbabe,
|
||||
Want: 0xcafe,
|
||||
}
|
||||
if err := c.Add(&streamdata.Ident{
|
||||
Channel: 0xbabe,
|
||||
}, func(*streamdata.VOD, io.Writer) error {
|
||||
panic("unreachable")
|
||||
}); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Fatalf("(channel) Add: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
|
||||
wantErr = &os.PathError{
|
||||
Op: "openat",
|
||||
Path: "pending",
|
||||
Err: syscall.EEXIST,
|
||||
}
|
||||
if err := os.WriteFile(path.Join(d, "pending"), nil, 0); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Add(&streamdata.Ident{
|
||||
Channel: 0xcafe,
|
||||
}, func(*streamdata.VOD, io.Writer) error {
|
||||
panic("unreachable")
|
||||
}); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Fatalf("(create) Add: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
if err := os.Remove(path.Join(d, "pending")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
wantErr = errors.New("unique error")
|
||||
if err := c.Add(&streamdata.Ident{
|
||||
Channel: 0xcafe,
|
||||
}, func(*streamdata.VOD, io.Writer) error {
|
||||
return wantErr
|
||||
}); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Fatalf("(callback) Add: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
if _, err := os.Stat(path.Join(d, "pending")); !errors.Is(err, os.ErrNotExist) {
|
||||
t.Fatalf("Stat: error = %v", err)
|
||||
}
|
||||
|
||||
wantData := []byte{0xde, 0xad, 0xbe, 0xef}
|
||||
wantVOD := streamdata.VOD{
|
||||
Title: "\x00",
|
||||
Date: time.Unix(0, 0).UTC(),
|
||||
Category: "\t",
|
||||
}
|
||||
if err := c.Add(&streamdata.Ident{
|
||||
Serial: 0xfdfdfdfd,
|
||||
Channel: 0xcafe,
|
||||
Data: [streamdata.IdentFFLen]byte{
|
||||
0xfe, 0xe1, 0xde, 0xad,
|
||||
0xfe, 0xed,
|
||||
0x0b, 0xad,
|
||||
0xf0, 0x0d,
|
||||
0, 0, 0, 0, 0, 0,
|
||||
},
|
||||
}, func(v *streamdata.VOD, w io.Writer) error {
|
||||
*v = wantVOD
|
||||
_, err := w.Write(wantData)
|
||||
return err
|
||||
}); err != nil {
|
||||
t.Fatalf("Add: error = %v", err)
|
||||
}
|
||||
|
||||
if dents, err := os.ReadDir(path.Join(d, "vod")); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if len(dents) != 2 {
|
||||
t.Fatalf("ReadDir: %#v", dents)
|
||||
}
|
||||
|
||||
const wantIdent = "4261281277-51966-fee1dead-feed-0bad-f00d-000000000000"
|
||||
var got streamdata.VOD
|
||||
if r, err := os.Open(path.Join(d, "vod", wantIdent)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err = json.NewDecoder(r).Decode(&got); err != nil {
|
||||
_ = r.Close()
|
||||
t.Fatalf("Decode: error = %v", err)
|
||||
} else if got != wantVOD {
|
||||
t.Errorf("Add: %#v, want %#v", got, wantVOD)
|
||||
}
|
||||
|
||||
if gotData, err := os.ReadFile(path.Join(d, "vod", wantIdent+streamdata.ChannelVODSuffix)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if string(gotData) != string(wantData) {
|
||||
t.Errorf("Add: data = %#v, want %#v", gotData, wantData)
|
||||
}
|
||||
}
|
||||
|
||||
func TestErrors(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
@@ -18,6 +319,8 @@ func TestErrors(t *testing.T) {
|
||||
"got 51966 field(s) instead of 7"},
|
||||
{"IdentFFError", &streamdata.IdentFFError{Got: 0xcafe, Want: 0xbabe},
|
||||
"got 51966 bytes for a 47806-byte long free-form field"},
|
||||
{"ChannelMismatchError", &streamdata.ChannelMismatchError{Got: 0xcafe, Want: 0xbabe},
|
||||
"attempting to add VOD from channel 51966 to channel 47806"},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user