diff --git a/streamdata.go b/streamdata.go new file mode 100644 index 0000000..a34f864 --- /dev/null +++ b/streamdata.go @@ -0,0 +1,199 @@ +// Package streamdata provides a simple API for downloading VODs and maintaining +// basic metadata alongside them. +package streamdata + +import ( + "encoding/json" + "errors" + "io" + "os" + "path" + "strconv" + "syscall" + "time" +) + +// Channel represents a Twitch channel. +type Channel struct { + // Numerical identifier specific to the channel, returned by Twitch. + Identifier uint64 `json:"id"` + // Unique, mutable channel name. + Name string `json:"name"` + + // Directory to place persistent data in. + root *os.Root +} + +// Is returns whether target is equivalent to c. +func (c *Channel) Is(target *Channel) bool { + return (c == nil && target == nil) || (c != nil && target != nil && + c.Identifier == target.Identifier && + c.Name == target.Name) +} + +// Close closes the underlying on-disk representation. +func (c *Channel) Close() error { + if c == nil || c.root == nil { + return syscall.EINVAL + } + return c.root.Close() +} + +const ( + // channelPathMetadata points to the metadata file relative to Channel.root. + channelPathMetadata = "channel" + // channelPathVOD points to the vod directory relative to Channel.root. + channelPathVOD = "vod" + // ChannelVODSuffix is the (apparently) hardcoded asset file name suffix. + ChannelVODSuffix = ".mp4" + // channelPathPending points to the transaction backing file relative to Channel.root. + channelPathPending = "pending" +) + +// Create initialises the on-disk representation of a [Channel]. +func (c *Channel) Create(pathname string) error { + if c == nil || c.root != nil || c.Identifier == 0 || c.Name == "" { + return syscall.EINVAL + } + + if err := os.MkdirAll(pathname, 0755); err != nil { + return err + } + if root, err := os.OpenRoot(pathname); err != nil { + return err + } else { + c.root = root + } + + if w, err := c.root.OpenFile( + channelPathMetadata, + os.O_WRONLY|os.O_CREATE|os.O_EXCL, + 0444, + ); err != nil { + _ = c.root.Close() + return err + } else if err = json.NewEncoder(w).Encode(c); err != nil { + _ = c.root.Close() + return err + } + + if err := c.root.Mkdir(channelPathVOD, 0755); err != nil { + _ = c.root.Close() + return err + } + + return nil +} + +// ErrDanglingTransaction is returned when attempting to [Open] the on-disk +// representation of a [Channel] with a dangling transaction backing file. +var ErrDanglingTransaction = errors.New("dangling transaction backing file") + +// Open opens the on-disk representation of [Channel] and returns its address. +func Open(pathname string) (*Channel, error) { + var c Channel + if root, err := os.OpenRoot(pathname); err != nil { + return nil, err + } else { + c.root = root + } + + if _, err := c.root.Lstat(channelPathPending); err != nil { + if !errors.Is(err, os.ErrNotExist) { + _ = c.root.Close() + return nil, err + } + } else { + _ = c.root.Close() + return nil, ErrDanglingTransaction + } + + if f, err := c.root.Open(channelPathMetadata); err != nil { + _ = c.root.Close() + return nil, err + } else if err = json.NewDecoder(f).Decode(&c); err != nil { + _ = c.root.Close() + return nil, err + } + return &c, nil +} + +// VOD holds additional metadata associated with a vod. +type VOD struct { + // Stream title. + Title string `json:"title"` + // Day of stream start. + Date time.Time `json:"date"` + // Free-form category string. + Category string `json:"category"` +} + +// ChannelMismatchError describes a mismatching [Ident.Channel] passed to [Channel.Add]. +type ChannelMismatchError struct { + Got, Want uint64 +} + +func (c *ChannelMismatchError) Error() string { + return "attempting to add VOD from channel " + + strconv.FormatUint(c.Got, 10) + " to channel " + + strconv.FormatUint(c.Want, 10) +} + +// Add adds a [VOD] and its corresponding asset to the on-disk representation. +func (c *Channel) Add(ident *Ident, f func(v *VOD, w io.Writer) error) error { + if ident == nil || f == nil { + return syscall.EINVAL + } + if ident.Channel != c.Identifier { + return &ChannelMismatchError{ident.Channel, c.Identifier} + } + + w, err := c.root.OpenFile( + channelPathPending, + os.O_WRONLY|os.O_CREATE|os.O_EXCL, + 0, + ) + if err != nil { + return err + } + + var v VOD + err = f(&v, w) + if closeErr := w.Close(); err == nil { + err = closeErr + } + if err != nil { + _ = c.root.Remove(channelPathPending) + return err + } + + if err = c.root.Chmod(channelPathPending, 0444); err != nil { + return err + } + pathname := path.Join(channelPathVOD, ident.String()) + + if w, err = c.root.OpenFile( + pathname, + os.O_WRONLY|os.O_CREATE|os.O_EXCL, + 0444, + ); err != nil { + return err + } else if err = json.NewEncoder(w).Encode(&v); err != nil { + _ = w.Close() + _ = c.root.Remove(pathname) + return err + } else if err = w.Close(); err != nil { + _ = c.root.Remove(pathname) + return err + } + + if err = c.root.Rename( + channelPathPending, + pathname+ChannelVODSuffix, + ); err != nil { + _ = c.root.Remove(pathname) + return err + } + + return nil +} diff --git a/streamdata_test.go b/streamdata_test.go index 11a1ff5..60ab268 100644 --- a/streamdata_test.go +++ b/streamdata_test.go @@ -1,11 +1,312 @@ package streamdata_test import ( + "encoding/json" + "errors" + "io" + "os" + "path" + "reflect" + "syscall" "testing" + "time" "git.gensokyo.uk/yonah/streamdata" ) +func TestChannelInvalid(t *testing.T) { + t.Parallel() + + if err := (*streamdata.Channel)(nil).Create(""); !reflect.DeepEqual(err, syscall.EINVAL) { + t.Fatalf("Create: error = %v", err) + } + if err := (*streamdata.Channel)(nil).Close(); !reflect.DeepEqual(err, syscall.EINVAL) { + t.Fatalf("Close: error = %v", err) + } +} + +func TestChannelOpenPerm(t *testing.T) { + t.Parallel() + + d := t.TempDir() + if err := os.Chmod(d, 0); err != nil { + t.Fatal(err) + } + + wantErr := &os.PathError{ + Op: "open", + Path: d, + Err: syscall.EACCES, + } + + t.Run("open", func(t *testing.T) { + t.Parallel() + + if _, err := streamdata.Open(d); !reflect.DeepEqual(err, wantErr) { + t.Fatalf("Open: error = %#v, want %#v", err, wantErr) + } + }) + + t.Run("create", func(t *testing.T) { + t.Parallel() + + t.Run("root", func(t *testing.T) { + t.Parallel() + + if err := (&streamdata.Channel{ + Identifier: 0xcafe, + Name: ":3", + }).Create(d); !reflect.DeepEqual(err, wantErr) { + t.Fatalf("Create: error = %#v, want %#v", err, wantErr) + } + }) + + t.Run("mkdir", func(t *testing.T) { + t.Parallel() + + wantErrMkdir := &os.PathError{ + Op: "mkdir", + Path: path.Join(d, "stream"), + Err: syscall.EACCES, + } + if err := (&streamdata.Channel{ + Identifier: 0xcafe, + Name: ":3", + }).Create(path.Join(d, "stream")); !reflect.DeepEqual(err, wantErrMkdir) { + t.Fatalf("Create: error = %#v, want %#v", err, wantErrMkdir) + } + }) + + t.Run("metadata", func(t *testing.T) { + t.Parallel() + + nx := t.TempDir() + if err := os.Chmod(nx, 0400); err != nil { + t.Fatal(err) + } + + wantErrMetadata := &os.PathError{ + Op: "openat", + Path: "channel", + Err: syscall.EACCES, + } + if err := (&streamdata.Channel{ + Identifier: 0xcafe, + Name: ":3", + }).Create(nx); !reflect.DeepEqual(err, wantErrMetadata) { + t.Fatalf("Create: error = %#v, want %#v", err, wantErrMetadata) + } + }) + }) +} + +func TestChannelCreate(t *testing.T) { + t.Parallel() + + d := t.TempDir() + c := streamdata.Channel{ + Identifier: 0xcafe, + Name: ":3", + } + + if err := c.Create(d); err != nil { + t.Fatalf("Create: error = %v", err) + } + if err := c.Close(); err != nil { + t.Fatalf("Close: error = %v", err) + } + + if got, err := streamdata.Open(d); err != nil { + t.Fatalf("Open: error = %v", err) + } else if err = got.Close(); err != nil { + t.Fatalf("Close: error = %v", err) + } else if !got.Is(&c) { + t.Errorf("Open: %#v, want %#v", got, c) + } +} + +func TestChannelDangling(t *testing.T) { + t.Parallel() + + t.Run("dangling", func(t *testing.T) { + t.Parallel() + + d := t.TempDir() + if err := os.WriteFile(path.Join(d, "pending"), nil, 0); err != nil { + t.Fatal(err) + } + if _, err := streamdata.Open(d); !reflect.DeepEqual(err, streamdata.ErrDanglingTransaction) { + t.Errorf("Open: error = %v", err) + } + }) + + t.Run("perm", func(t *testing.T) { + t.Parallel() + + d := t.TempDir() + if err := os.Chmod(d, 0400); err != nil { + t.Fatal(err) + } + + wantErr := &os.PathError{ + Op: "statat", + Path: "pending", + Err: syscall.EACCES, + } + if _, err := streamdata.Open(d); !reflect.DeepEqual(err, wantErr) { + t.Errorf("Open: error = %#v, want %#v", err, wantErr) + } + }) +} + +func TestChannelBadMetadata(t *testing.T) { + t.Run("perm", func(t *testing.T) { + t.Parallel() + + d := t.TempDir() + if err := os.WriteFile(path.Join(d, "channel"), nil, 0); err != nil { + t.Fatal(err) + } + + wantErr := &os.PathError{ + Op: "openat", + Path: "channel", + Err: syscall.EACCES, + } + if _, err := streamdata.Open(d); !reflect.DeepEqual(err, wantErr) { + t.Errorf("Open: error = %#v, want %#v", err, wantErr) + } + }) + + t.Run("invalid", func(t *testing.T) { + t.Parallel() + + d := t.TempDir() + if err := os.WriteFile(path.Join(d, "channel"), nil, 0400); err != nil { + t.Fatal(err) + } + + wantErr := io.EOF + if _, err := streamdata.Open(d); !reflect.DeepEqual(err, wantErr) { + t.Errorf("Open: error = %#v, want %#v", err, wantErr) + } + }) +} + +func TestChannelAdd(t *testing.T) { + t.Parallel() + + d := t.TempDir() + c := streamdata.Channel{ + Identifier: 0xcafe, + Name: ":3", + } + + if err := c.Create(d); err != nil { + t.Fatalf("Create: error = %v", err) + } + t.Cleanup(func() { + if err := c.Close(); err != nil { + t.Fatalf("Close: error = %v", err) + } + }) + var wantErr error + + if err := c.Add(nil, nil); !reflect.DeepEqual(err, syscall.EINVAL) { + t.Errorf("(invalid) Add: error = %#v", err) + } + + wantErr = &streamdata.ChannelMismatchError{ + Got: 0xbabe, + Want: 0xcafe, + } + if err := c.Add(&streamdata.Ident{ + Channel: 0xbabe, + }, func(*streamdata.VOD, io.Writer) error { + panic("unreachable") + }); !reflect.DeepEqual(err, wantErr) { + t.Fatalf("(channel) Add: error = %#v, want %#v", err, wantErr) + } + + wantErr = &os.PathError{ + Op: "openat", + Path: "pending", + Err: syscall.EEXIST, + } + if err := os.WriteFile(path.Join(d, "pending"), nil, 0); err != nil { + t.Fatal(err) + } + if err := c.Add(&streamdata.Ident{ + Channel: 0xcafe, + }, func(*streamdata.VOD, io.Writer) error { + panic("unreachable") + }); !reflect.DeepEqual(err, wantErr) { + t.Fatalf("(create) Add: error = %#v, want %#v", err, wantErr) + } + if err := os.Remove(path.Join(d, "pending")); err != nil { + t.Fatal(err) + } + + wantErr = errors.New("unique error") + if err := c.Add(&streamdata.Ident{ + Channel: 0xcafe, + }, func(*streamdata.VOD, io.Writer) error { + return wantErr + }); !reflect.DeepEqual(err, wantErr) { + t.Fatalf("(callback) Add: error = %#v, want %#v", err, wantErr) + } + if _, err := os.Stat(path.Join(d, "pending")); !errors.Is(err, os.ErrNotExist) { + t.Fatalf("Stat: error = %v", err) + } + + wantData := []byte{0xde, 0xad, 0xbe, 0xef} + wantVOD := streamdata.VOD{ + Title: "\x00", + Date: time.Unix(0, 0).UTC(), + Category: "\t", + } + if err := c.Add(&streamdata.Ident{ + Serial: 0xfdfdfdfd, + Channel: 0xcafe, + Data: [streamdata.IdentFFLen]byte{ + 0xfe, 0xe1, 0xde, 0xad, + 0xfe, 0xed, + 0x0b, 0xad, + 0xf0, 0x0d, + 0, 0, 0, 0, 0, 0, + }, + }, func(v *streamdata.VOD, w io.Writer) error { + *v = wantVOD + _, err := w.Write(wantData) + return err + }); err != nil { + t.Fatalf("Add: error = %v", err) + } + + if dents, err := os.ReadDir(path.Join(d, "vod")); err != nil { + t.Fatal(err) + } else if len(dents) != 2 { + t.Fatalf("ReadDir: %#v", dents) + } + + const wantIdent = "4261281277-51966-fee1dead-feed-0bad-f00d-000000000000" + var got streamdata.VOD + if r, err := os.Open(path.Join(d, "vod", wantIdent)); err != nil { + t.Fatal(err) + } else if err = json.NewDecoder(r).Decode(&got); err != nil { + _ = r.Close() + t.Fatalf("Decode: error = %v", err) + } else if got != wantVOD { + t.Errorf("Add: %#v, want %#v", got, wantVOD) + } + + if gotData, err := os.ReadFile(path.Join(d, "vod", wantIdent+streamdata.ChannelVODSuffix)); err != nil { + t.Fatal(err) + } else if string(gotData) != string(wantData) { + t.Errorf("Add: data = %#v, want %#v", gotData, wantData) + } +} + func TestErrors(t *testing.T) { t.Parallel() @@ -18,6 +319,8 @@ func TestErrors(t *testing.T) { "got 51966 field(s) instead of 7"}, {"IdentFFError", &streamdata.IdentFFError{Got: 0xcafe, Want: 0xbabe}, "got 51966 bytes for a 47806-byte long free-form field"}, + {"ChannelMismatchError", &streamdata.ChannelMismatchError{Got: 0xcafe, Want: 0xbabe}, + "attempting to add VOD from channel 51966 to channel 47806"}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) {