Skip to content

Commit 69961eb

Browse files
authored
Merge pull request #447 from Unpackerr/unstable
fix retry bug
2 parents 6f4978b + 4e2ff69 commit 69961eb

16 files changed

Lines changed: 286 additions & 294 deletions

.golangci.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,6 @@ run:
2121

2222
issues:
2323
max-issues-per-linter: 0
24-
max-same-issues: 0
24+
max-same-issues: 0
25+
output:
26+
sort-results: true

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ require (
1919
golift.io/rotatorr v0.0.0-20230911015553-cd2abbd726c7
2020
golift.io/starr v1.0.0
2121
golift.io/version v0.0.2
22-
golift.io/xtractr v0.2.3-0.20240705214848-788bbcf7087a
22+
golift.io/xtractr v0.2.3-0.20240710043203-2d7c8a38d931
23+
2324
)
2425

2526
require (

go.sum

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
4040
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4141
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
4242
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
43+
github.com/dsnet/compress v0.0.1 h1:PlZu0n3Tuv04TzpfPbrnI0HW/YwodEXDS+oPKahKF0Q=
44+
github.com/dsnet/compress v0.0.1/go.mod h1:Aw8dCMJ7RioblQeTqt88akK31OvO8Dhf5JflhBbQEHo=
4345
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
4446
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
4547
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a h1:yDWHCSQ40h88yih2JAcL6Ls/kVkSE8GFACTGVnMPruw=
@@ -259,8 +261,6 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB
259261
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
260262
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
261263
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
262-
golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0=
263-
golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
264264
golang.org/x/mod v0.19.0 h1:fEdghXQSo20giMthA7cd28ZC+jts4amQ3YMXiP5oMQ8=
265265
golang.org/x/mod v0.19.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
266266
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -367,12 +367,6 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
367367
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
368368
golift.io/cnfg v0.2.3 h1:cQsC4JS20njJyu5drtGefNmgN7M4HrLaRDNBPLit3pQ=
369369
golift.io/cnfg v0.2.3/go.mod h1:T4t8MFa8aZilCdIk1qQrN4mOGaFVPZ/qHQBBMbCIZJ0=
370-
golift.io/cnfgfile v0.0.0-20240703071809-367eaee314e8 h1:wWcFD3JG0cjKQOL5R2ey8vcMyzpeP5ScD62KZjnpcpo=
371-
golift.io/cnfgfile v0.0.0-20240703071809-367eaee314e8/go.mod h1:zHm9o8SkZ6Mm5DfGahsrEJPsogyR0qItP59s5lJ98/I=
372-
golift.io/cnfgfile v0.0.0-20240703080440-8fa7fd722f78 h1:RDcPb8J3X1UxJC6z4K0YPWc5OW0DIAiZdMM7uJQakEE=
373-
golift.io/cnfgfile v0.0.0-20240703080440-8fa7fd722f78/go.mod h1:zHm9o8SkZ6Mm5DfGahsrEJPsogyR0qItP59s5lJ98/I=
374-
golift.io/cnfgfile v0.0.0-20240704052514-df2becf136db h1:OQU9DVd8i3yuqyJax1Pi063iDephQwEapZhhF3ztdc8=
375-
golift.io/cnfgfile v0.0.0-20240704052514-df2becf136db/go.mod h1:zHm9o8SkZ6Mm5DfGahsrEJPsogyR0qItP59s5lJ98/I=
376370
golift.io/cnfgfile v0.0.0-20240704165116-48378d0c6c38 h1:euXQUUWtsi2M+Bf6Do4+yG3YrVj88WyN0WJdv/abeW0=
377371
golift.io/cnfgfile v0.0.0-20240704165116-48378d0c6c38/go.mod h1:zHm9o8SkZ6Mm5DfGahsrEJPsogyR0qItP59s5lJ98/I=
378372
golift.io/rotatorr v0.0.0-20230911015553-cd2abbd726c7 h1:8reg8mRdLxCz168FaGPf/kVxmDRDc92/Dhub54trdOc=
@@ -381,10 +375,8 @@ golift.io/starr v1.0.0 h1:IDSaSL+ZYxdLT/Lg//dg/iwZ39LHO3D5CmbLCOgSXbI=
381375
golift.io/starr v1.0.0/go.mod h1:xnUwp4vK62bDvozW9QHUYc08m6kjwaZnGw3Db65fQHw=
382376
golift.io/version v0.0.2 h1:i0gXRuSDHKs4O0sVDUg4+vNIuOxYoXhaxspftu2FRTE=
383377
golift.io/version v0.0.2/go.mod h1:76aHNz8/Pm7CbuxIsDi97jABL5Zui3f2uZxDm4vB6hU=
384-
golift.io/xtractr v0.2.3-0.20240701231217-9baa707ddfd1 h1:zHuHMtUVpezHQxbRlFTP2HSGcXL1+c5dn/2SrEIoppo=
385-
golift.io/xtractr v0.2.3-0.20240701231217-9baa707ddfd1/go.mod h1:ykPX5R7pubYwM30nD9ERBo9/6mvOzmRxulfa0Axw+AU=
386-
golift.io/xtractr v0.2.3-0.20240705214848-788bbcf7087a h1:bcRW4Ze2droo7tg14clsvRqtJHs27gi+LRyJzFFxllo=
387-
golift.io/xtractr v0.2.3-0.20240705214848-788bbcf7087a/go.mod h1:TrvZlQlpYaZC8kgu/vjvynnNW4rtRqKPbhKF74WoBGk=
378+
golift.io/xtractr v0.2.3-0.20240710043203-2d7c8a38d931 h1:3VYISfgN0VCI6qsW/131UaqYuT6glyk1/Eu9Y6ndPMU=
379+
golift.io/xtractr v0.2.3-0.20240710043203-2d7c8a38d931/go.mod h1:TrvZlQlpYaZC8kgu/vjvynnNW4rtRqKPbhKF74WoBGk=
388380
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
389381
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
390382
google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=

pkg/unpackerr/apps.go

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"strings"
77
"sync"
8+
"time"
89

910
"golift.io/cnfg"
1011
"golift.io/starr"
@@ -77,53 +78,52 @@ type FoldersConfig struct {
7778
Interval cnfg.Duration `json:"interval" toml:"interval" xml:"interval" yaml:"interval"` // undocumented.
7879
}
7980

80-
type workThread struct {
81-
Funcs []func()
82-
}
83-
8481
func (u *Unpackerr) watchWorkThread() {
85-
const maxWorkers = 5 // 5 starr apps.
86-
87-
workers := u.Parallel
88-
if workers > maxWorkers {
89-
workers = maxWorkers
90-
}
91-
92-
for range workers {
82+
// 1 worker for each app, so they poll quickly.
83+
for range len(u.Lidarr) + len(u.Radarr) + len(u.Readarr) + len(u.Sonarr) + len(u.Whisparr) {
9384
go func() {
94-
for w := range u.workChan {
95-
for _, f := range w.Funcs {
96-
f()
85+
for funcs := range u.workChan {
86+
for _, fn := range funcs {
87+
fn()
9788
}
9889
}
9990
}()
10091
}
10192
}
10293

103-
// retrieveAppQueues polls Sonarr, Lidarr and Radarr. At the same time.
104-
// Then calls the check methods to scan their queues for changes.
105-
func (u *Unpackerr) retrieveAppQueues() {
106-
var wg sync.WaitGroup
107-
108-
// Run each method in a go routine as a waitgroup.
109-
for _, app := range []func(){
110-
u.getLidarrQueue,
111-
u.getRadarrQueue,
112-
u.getReadarrQueue,
113-
u.getSonarrQueue,
114-
u.getWhisparrQueue,
115-
} {
116-
wg.Add(1)
117-
u.workChan <- &workThread{[]func(){app, wg.Done}}
94+
// retrieveAppQueues polls all the starr app queues. At the same time.
95+
// Then calls the check methods to scan their queue contents for changes.
96+
func (u *Unpackerr) retrieveAppQueues(now time.Time) {
97+
wg := sync.WaitGroup{}
98+
wg.Add(len(u.Lidarr) + len(u.Radarr) + len(u.Readarr) + len(u.Sonarr) + len(u.Whisparr))
99+
// Run each app's getQueue method in a go routine as a waitgroup.
100+
for _, server := range u.Lidarr {
101+
u.workChan <- []func(){func() { u.getLidarrQueue(server, now) }, wg.Done}
102+
}
103+
104+
for _, server := range u.Radarr {
105+
u.workChan <- []func(){func() { u.getRadarrQueue(server, now) }, wg.Done}
106+
}
107+
108+
for _, server := range u.Readarr {
109+
u.workChan <- []func(){func() { u.getReadarrQueue(server, now) }, wg.Done}
110+
}
111+
112+
for _, server := range u.Sonarr {
113+
u.workChan <- []func(){func() { u.getSonarrQueue(server, now) }, wg.Done}
114+
}
115+
116+
for _, server := range u.Whisparr {
117+
u.workChan <- []func(){func() { u.getWhisparrQueue(server, now) }, wg.Done}
118118
}
119119

120120
wg.Wait()
121121
// These are not thread safe because they call saveCompletedDownload.
122-
u.checkLidarrQueue()
123-
u.checkRadarrQueue()
124-
u.checkReadarrQueue()
125-
u.checkSonarrQueue()
126-
u.checkWhisparrQueue()
122+
u.checkLidarrQueue(now)
123+
u.checkRadarrQueue(now)
124+
u.checkReadarrQueue(now)
125+
u.checkSonarrQueue(now)
126+
u.checkWhisparrQueue(now)
127127
}
128128

129129
// validateApps is broken-out into this file to make adding new apps easier.

pkg/unpackerr/build_other.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,13 @@
22

33
package unpackerr
44

5+
import "syscall"
6+
57
const defaultSavePath = "/downloads"
8+
9+
func getUmask() int {
10+
umask := syscall.Umask(0)
11+
syscall.Umask(umask)
12+
13+
return umask
14+
}

pkg/unpackerr/build_windows.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,7 @@
33
package unpackerr
44

55
const defaultSavePath = `C:\downloads`
6+
7+
func getUmask() int {
8+
return -1
9+
}

pkg/unpackerr/folder.go

Lines changed: 30 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ type Folder struct {
6464
config *FolderConfig
6565
files []string
6666
retries uint
67-
archives []string
67+
archives xtractr.ArchiveList
6868
}
6969

7070
type eventData struct {
@@ -239,10 +239,10 @@ func (f *Folders) Remove(folder string) {
239239
}
240240

241241
// extractTrackedItem starts an archive or folder's extraction after it hasn't been written to in a while.
242-
func (u *Unpackerr) extractTrackedItem(name string, folder *Folder) {
242+
func (u *Unpackerr) extractTrackedItem(name string, folder *Folder, now time.Time) {
243243
u.folders.Remove(name) // stop the fs watcher(s).
244244
// update status.
245-
u.folders.Folders[name].updated = time.Now()
245+
u.folders.Folders[name].updated = now
246246
u.folders.Folders[name].status = QUEUED
247247

248248
// Do not extract r00 file if rar file with same name exists.
@@ -255,7 +255,7 @@ func (u *Unpackerr) extractTrackedItem(name string, folder *Folder) {
255255
}
256256

257257
// create a queue counter in the main history; add to u.Map and send webhook for a new folder.
258-
u.updateQueueStatus(&newStatus{Name: name, Status: QUEUED}, true)
258+
u.updateQueueStatus(&newStatus{Name: name, Status: QUEUED}, u.folders.Folders[name].updated, true)
259259
u.updateHistory(FolderString + ": " + name)
260260

261261
var exclude []string
@@ -320,39 +320,24 @@ func (u *Unpackerr) folderXtractrCallback(resp *xtractr.Response) {
320320
folder.status = EXTRACTEDNOTHING
321321
u.Printf("[Folder] %s: %s: %v", folder.status.Desc(), resp.X.Name, resp.Error)
322322
case resp.Error != nil:
323+
folder.archives = resp.Archives
323324
folder.status = EXTRACTFAILED
324325
u.Errorf("[Folder] %s: %s: %v", folder.status.Desc(), resp.X.Name, resp.Error)
325326
u.updateMetrics(resp, FolderString, folder.config.Path)
326-
327-
for _, v := range resp.Archives {
328-
folder.archives = append(folder.archives, v...)
329-
}
330327
default: // this runs in a go routine
331-
for _, v := range resp.Archives {
332-
folder.archives = append(folder.archives, v...)
333-
}
334-
335328
u.updateMetrics(resp, FolderString, folder.config.Path)
336329
u.Printf("[Folder] Extraction Finished: %s => elapsed: %v, archives: %d, "+
337330
"extra archives: %d, files extracted: %d, written: %dMiB",
338-
resp.X.Name, resp.Elapsed.Round(time.Second), len(folder.archives),
339-
mapLen(resp.Extras), len(resp.NewFiles), resp.Size/mebiByte)
331+
resp.X.Name, resp.Elapsed.Round(time.Second), resp.Archives.Count(),
332+
resp.Extras.Count(), len(resp.NewFiles), resp.Size/mebiByte)
340333

334+
folder.archives = resp.Archives
341335
folder.status = EXTRACTED
342336
folder.files = resp.NewFiles
343337
}
344338

345-
folder.updated = time.Now()
346-
347-
u.updateQueueStatus(&newStatus{Name: resp.X.Name, Resp: resp, Status: folder.status}, true)
348-
}
349-
350-
func mapLen(in map[string][]string) (out int) {
351-
for _, v := range in {
352-
out += len(v)
353-
}
354-
355-
return out
339+
folder.updated = resp.Started.Add(resp.Elapsed)
340+
u.updateQueueStatus(&newStatus{Name: resp.X.Name, Resp: resp, Status: folder.status}, folder.updated, true)
356341
}
357342

358343
// watchFSNotify reads file system events from a channel and processes them.
@@ -415,17 +400,17 @@ func (f *Folders) handleFileEvent(name, operation string) {
415400
}
416401

417402
// processEvent is here to process the event in the `*Unpackerr` scope before sending it back to the `*Folders` scope.
418-
func (u *Unpackerr) processEvent(event *eventData) {
403+
func (u *Unpackerr) processEvent(event *eventData, now time.Time) {
419404
// Do not watch our own log file.
420405
if event.file == u.Config.LogFile || event.file == u.Config.Webserver.LogFile {
421406
return
422407
}
423408

424-
u.folders.processEvent(event)
409+
u.folders.processEvent(event, now)
425410
}
426411

427412
// processEvent processes the event that was received.
428-
func (f *Folders) processEvent(event *eventData) {
413+
func (f *Folders) processEvent(event *eventData, now time.Time) {
429414
dirPath := filepath.Join(event.cnfg.Path, event.name)
430415

431416
stat, err := os.Stat(dirPath)
@@ -447,13 +432,13 @@ func (f *Folders) processEvent(event *eventData) {
447432
return
448433
}
449434

450-
f.saveEvent(event, dirPath)
435+
f.saveEvent(event, dirPath, now)
451436
}
452437

453-
func (f *Folders) saveEvent(event *eventData, dirPath string) {
438+
func (f *Folders) saveEvent(event *eventData, dirPath string, now time.Time) {
454439
if _, ok := f.Folders[dirPath]; ok {
455440
// f.Debugf("Item Updated: %v", event.file)
456-
f.Folders[dirPath].updated = time.Now()
441+
f.Folders[dirPath].updated = now
457442
return
458443
}
459444

@@ -468,23 +453,23 @@ func (f *Folders) saveEvent(event *eventData, dirPath string) {
468453
f.Printf("[Folder] Tracking New Item: %v (event: %s)", dirPath, event.op)
469454

470455
f.Folders[dirPath] = &Folder{
471-
updated: time.Now(),
456+
updated: now,
472457
status: WAITING,
473458
config: event.cnfg,
474459
}
475460
}
476461

477462
// checkFolderStats runs at an interval to see if any folders need work done on them.
478463
// This runs on an interval ticker in the main go routine.
479-
func (u *Unpackerr) checkFolderStats() {
464+
func (u *Unpackerr) checkFolderStats(now time.Time) {
480465
for name, folder := range u.folders.Folders {
481-
switch elapsed := time.Since(folder.updated); {
466+
switch elapsed := now.Sub(folder.updated); {
482467
case WAITING == folder.status && elapsed >= u.StartDelay.Duration:
483468
// The folder hasn't been written to in a while, extract it.
484-
u.extractTrackedItem(name, folder)
469+
u.extractTrackedItem(name, folder, now)
485470
case EXTRACTEDNOTHING == folder.status:
486471
// Wait until this item hasn't been touched for a while, so it doesn't re-queue.
487-
if time.Since(folder.updated) > u.StartDelay.Duration {
472+
if now.Sub(folder.updated) > u.StartDelay.Duration {
488473
// Ignore "no compressed files" errors for folders.
489474
delete(u.Map, name)
490475
delete(u.folders.Folders, name)
@@ -493,24 +478,24 @@ func (u *Unpackerr) checkFolderStats() {
493478
(u.MaxRetries == 0 || folder.retries < u.MaxRetries):
494479
u.Retries++
495480
folder.retries++
496-
folder.updated = time.Now()
481+
folder.updated = now
497482
folder.status = WAITING
498483
u.Printf("[Folder] Re-starting Failed Extraction: %s (%d/%d, failed %v ago)",
499484
folder.config.Path, folder.retries, u.MaxRetries, elapsed.Round(time.Second))
500485
case EXTRACTFAILED == folder.status && folder.retries < u.MaxRetries:
501486
// This empty block is to avoid deleting an item that needs more retries.
502487
case folder.status > EXTRACTING && folder.config.DeleteAfter.Duration <= 0:
503488
// if DeleteAfter is 0 we don't delete anything. we are done.
504-
u.updateQueueStatus(&newStatus{Name: name, Status: DELETED, Resp: nil}, false)
489+
u.updateQueueStatus(&newStatus{Name: name, Status: DELETED, Resp: nil}, now, false)
505490
delete(u.folders.Folders, name)
506491
case EXTRACTED == folder.status && elapsed >= folder.config.DeleteAfter.Duration:
507-
u.deleteAfterReached(name, folder)
492+
u.deleteAfterReached(name, now, folder)
508493
}
509494
}
510495
}
511496

512497
//nolint:wsl
513-
func (u *Unpackerr) deleteAfterReached(name string, folder *Folder) {
498+
func (u *Unpackerr) deleteAfterReached(name string, now time.Time, folder *Folder) {
514499
var webhook bool
515500

516501
// Folder reached delete delay (after extraction), nuke it.
@@ -526,11 +511,11 @@ func (u *Unpackerr) deleteAfterReached(name string, folder *Folder) {
526511
u.delChan <- &fileDeleteReq{Paths: []string{name}}
527512
webhook = true
528513
} else if folder.config.DeleteOrig && len(folder.archives) > 0 {
529-
u.delChan <- &fileDeleteReq{Paths: folder.archives}
514+
u.delChan <- &fileDeleteReq{Paths: folder.archives.List()}
530515
webhook = true
531516
}
532517

533-
u.updateQueueStatus(&newStatus{Name: name, Status: DELETED, Resp: nil}, webhook)
518+
u.updateQueueStatus(&newStatus{Name: name, Status: DELETED, Resp: nil}, now, webhook)
534519
// Folder reached delete delay (after extraction), nuke it.
535520
delete(u.folders.Folders, name)
536521
}
@@ -544,15 +529,15 @@ type newStatus struct {
544529
// updateQueueStatus for an on-going tracked extraction.
545530
// This is called from a channel callback to update status in a single go routine.
546531
// This is used by apps and Folders in a few other places as well.
547-
func (u *Unpackerr) updateQueueStatus(data *newStatus, sendHook bool) {
532+
func (u *Unpackerr) updateQueueStatus(data *newStatus, now time.Time, sendHook bool) {
548533
if _, ok := u.Map[data.Name]; !ok {
549534
// This is a new Folder being queued for extraction.
550535
// Arr apps do not land here. They create their own queued items in u.Map.
551536
u.Map[data.Name] = &Extract{
552537
Path: data.Name,
553538
App: FolderString,
554539
Status: QUEUED,
555-
Updated: time.Now(),
540+
Updated: now,
556541
IDs: map[string]interface{}{"title": data.Name}, // required or webhook may break.
557542
}
558543

@@ -568,7 +553,7 @@ func (u *Unpackerr) updateQueueStatus(data *newStatus, sendHook bool) {
568553
}
569554

570555
u.Map[data.Name].Status = data.Status
571-
u.Map[data.Name].Updated = time.Now()
556+
u.Map[data.Name].Updated = now
572557

573558
if sendHook {
574559
u.runAllHooks(u.Map[data.Name])

0 commit comments

Comments
 (0)