Files
streamdata/streamdata.go
Yonah 3389848f45 streamdata: optionally reuse buffer
This enables use of stdlib iterator helpers.

Signed-off-by: Yonah <contrib@gensokyo.uk>
2026-03-20 03:18:30 +09:00

332 lines
7.6 KiB
Go

// Package streamdata provides a simple API for downloading VODs and maintaining
// basic metadata alongside them.
package streamdata
import (
"encoding/json"
"errors"
"io"
"io/fs"
"iter"
"os"
"path"
"strconv"
"strings"
"syscall"
"time"
"unsafe"
)
// Channel represents a Twitch channel.
type Channel struct {
// Numerical identifier specific to the channel, returned by Twitch.
Identifier uint64 `json:"id"`
// Unique, mutable channel name.
Name string `json:"name"`
// Directory to place persistent data in.
root *os.Root
}
// Is returns whether target is equivalent to c.
func (c *Channel) Is(target *Channel) bool {
return (c == nil && target == nil) || (c != nil && target != nil &&
c.Identifier == target.Identifier &&
c.Name == target.Name)
}
// Close closes the underlying on-disk representation.
func (c *Channel) Close() error {
if c == nil || c.root == nil {
return syscall.EINVAL
}
return c.root.Close()
}
const (
// channelPathMetadata points to the metadata file relative to Channel.root.
channelPathMetadata = "channel"
// channelPathVOD points to the vod directory relative to Channel.root.
channelPathVOD = "vod"
// ChannelVODSuffix is the (apparently) hardcoded asset file name suffix.
ChannelVODSuffix = ".mp4"
// channelPathPending points to the transaction backing file relative to Channel.root.
channelPathPending = "pending"
)
// Create initialises the on-disk representation of a [Channel].
func (c *Channel) Create(pathname string) error {
if c == nil || c.root != nil || c.Identifier == 0 || c.Name == "" {
return syscall.EINVAL
}
if err := os.MkdirAll(pathname, 0755); err != nil {
return err
}
if root, err := os.OpenRoot(pathname); err != nil {
return err
} else {
c.root = root
}
if w, err := c.root.OpenFile(
channelPathMetadata,
os.O_WRONLY|os.O_CREATE|os.O_EXCL,
0444,
); err != nil {
_ = c.root.Close()
return err
} else if err = json.NewEncoder(w).Encode(c); err != nil {
_ = c.root.Close()
return err
}
if err := c.root.Mkdir(channelPathVOD, 0755); err != nil {
_ = c.root.Close()
return err
}
return nil
}
// ErrDanglingTransaction is returned when attempting to [Open] the on-disk
// representation of a [Channel] with a dangling transaction backing file.
var ErrDanglingTransaction = errors.New("dangling transaction backing file")
// Open opens the on-disk representation of [Channel] and returns its address.
func Open(pathname string) (*Channel, error) {
var c Channel
if root, err := os.OpenRoot(pathname); err != nil {
return nil, err
} else {
c.root = root
}
if _, err := c.root.Lstat(channelPathPending); err != nil {
if !errors.Is(err, os.ErrNotExist) {
_ = c.root.Close()
return nil, err
}
} else {
_ = c.root.Close()
return nil, ErrDanglingTransaction
}
if f, err := c.root.Open(channelPathMetadata); err != nil {
_ = c.root.Close()
return nil, err
} else if err = json.NewDecoder(f).Decode(&c); err != nil {
_ = c.root.Close()
return nil, err
}
return &c, nil
}
// VOD holds additional metadata associated with a vod.
type VOD struct {
// Stream title.
Title string `json:"title"`
// Day of stream start.
Date time.Time `json:"date"`
// Free-form category string.
Category string `json:"category,omitempty"`
// A mirror site the asset is uploaded to, usually YouTube.
Mirror string `json:"mirror,omitempty"`
}
// ChannelMismatchError describes a mismatching [Ident.Channel] passed to [Channel.Add].
type ChannelMismatchError struct {
Got, Want uint64
}
func (c *ChannelMismatchError) Error() string {
return "attempting to add VOD from channel " +
strconv.FormatUint(c.Got, 10) + " to channel " +
strconv.FormatUint(c.Want, 10)
}
// RenameFrom is a pathname returned by a function passed to [Channel.Add] to
// rename from this pathname instead of the managed transaction file.
type RenameFrom string
func (pathname RenameFrom) Error() string {
return "requesting rename from " + strconv.Quote(string(pathname))
}
// Add adds a [VOD] and its corresponding asset to the on-disk representation.
func (c *Channel) Add(ident *Ident, f func(v *VOD, w io.Writer) error) error {
if ident == nil || f == nil {
return syscall.EINVAL
}
if ident.Channel != c.Identifier {
return &ChannelMismatchError{ident.Channel, c.Identifier}
}
w, err := c.root.OpenFile(
channelPathPending,
os.O_WRONLY|os.O_CREATE|os.O_EXCL,
0,
)
if err != nil {
return err
}
var v VOD
err = f(&v, w)
if closeErr := w.Close(); err == nil {
err = closeErr
}
if err != nil {
var rf RenameFrom
if !errors.As(err, &rf) {
_ = c.root.Remove(channelPathPending)
return err
}
if err = c.root.Remove(channelPathPending); err != nil {
return err
}
if err = os.Rename(string(rf), path.Join(
c.root.Name(),
channelPathPending,
)); err != nil {
return err
}
}
if err = c.root.Chmod(channelPathPending, 0444); err != nil {
return err
}
pathname := path.Join(channelPathVOD, ident.String())
if w, err = c.root.OpenFile(
pathname,
os.O_WRONLY|os.O_CREATE|os.O_EXCL,
0444,
); err != nil {
return err
} else if err = json.NewEncoder(w).Encode(&v); err != nil {
_ = w.Close()
_ = c.root.Remove(pathname)
return err
} else if err = w.Close(); err != nil {
_ = c.root.Remove(pathname)
return err
}
if err = c.root.Rename(
channelPathPending,
pathname+ChannelVODSuffix,
); err != nil {
_ = c.root.Remove(pathname)
return err
}
return nil
}
// Edit loads a [VOD] by its [Ident], and writes the modified [VOD] back to the
// on-disk representation.
func (c *Channel) Edit(ident *Ident, f func(v *VOD) error) error {
v, err := c.Load(ident)
if err != nil {
return err
}
err = f(v)
if err != nil {
return err
}
pathname := path.Join(
channelPathVOD,
"."+ident.String(),
)
var w *os.File
if w, err = c.root.OpenFile(
pathname,
os.O_WRONLY|os.O_CREATE|os.O_EXCL,
0444,
); err != nil {
return err
} else if err = json.NewEncoder(w).Encode(v); err != nil {
_ = w.Close()
_ = c.root.Remove(pathname)
return err
} else if err = w.Close(); err != nil {
_ = c.root.Remove(pathname)
return err
} else {
return c.root.Rename(pathname, path.Join(
channelPathVOD,
ident.String(),
))
}
}
// Path returns a pathname by [Ident].
func (c *Channel) Path(ident *Ident) string {
return path.Join(
c.root.Name(),
channelPathVOD,
ident.String()+ChannelVODSuffix,
)
}
// All returns an iterator over all known [Ident] in the on-disk representation.
// Iteration stops when encountering the first non-nil error, and its value is
// saved to the value pointed to by errP.
//
// If reuse is true, the same value is updated every iteration and the same
// address is yileded as a result.
func (c *Channel) All(errP *error, reuse bool) iter.Seq[*Ident] {
return func(yield func(*Ident) bool) {
dents, err := c.root.FS().(fs.ReadDirFS).ReadDir(channelPathVOD)
if err != nil {
*errP = err
return
}
var ident Ident
for _, dent := range dents {
name := dent.Name()
if strings.HasSuffix(name, ChannelVODSuffix) {
continue
}
if err = ident.UnmarshalText(
unsafe.Slice(unsafe.StringData(name), len(name)),
); err != nil {
*errP = err
return
}
p := &ident
if !reuse {
p = new(ident)
}
if !yield(p) {
return
}
}
}
}
// Load loads the metadata of a [VOD] by [Ident] and returns its address.
func (c *Channel) Load(ident *Ident) (*VOD, error) {
var v VOD
if r, err := c.root.Open(path.Join(
channelPathVOD,
ident.String(),
)); err != nil {
return nil, err
} else if err = json.NewDecoder(r).Decode(&v); err != nil {
_ = r.Close()
return nil, err
} else if err = r.Close(); err != nil {
return nil, err
} else {
return &v, nil
}
}