diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index dc6ff3b633..37d518aa86 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -87,6 +87,20 @@ func (p *WorkerPool) Push(data Data) { } } +// HasNoWorkerScaling will return true if the queue has no workers, and has no worker boosting +func (p *WorkerPool) HasNoWorkerScaling() bool { + p.lock.Lock() + defer p.lock.Unlock() + return p.hasNoWorkerScaling() +} + +func (p *WorkerPool) hasNoWorkerScaling() bool { + return p.numberOfWorkers == 0 && (p.boostTimeout == 0 || p.boostWorkers == 0 || p.maxNumberOfWorkers == 0) +} + +// zeroBoost will add a temporary boost worker for a no worker queue +// p.lock must be locked at the start of this function BUT it will be unlocked by the end of this function +// (This is because addWorkers has to be called whilst unlocked) func (p *WorkerPool) zeroBoost() { ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout) mq := GetManager().GetManagedQueue(p.qid) @@ -277,6 +291,21 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, p.cond.Broadcast() cancel() } + + select { + case <-p.baseCtx.Done(): + // Don't warn if the baseCtx is shutdown + default: + if p.hasNoWorkerScaling() { + log.Warn( + "Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.", p.qid) + } else if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 { + // OK there are no workers but... there's still work to be done -> Reboost + p.zeroBoost() + // p.lock will be unlocked by zeroBoost + return + } + } p.lock.Unlock() }() } diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index 2643200174..4234457b0b 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -59,11 +59,13 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error { handler := func(idx int, bean interface{}, limit int) error { var item SyncRequest + var repo *repo_model.Repository if m, ok := bean.(*repo_model.Mirror); ok { if m.Repo == nil { log.Error("Disconnected mirror found: %d", m.ID) return nil } + repo = m.Repo item = SyncRequest{ Type: PullMirrorType, RepoID: m.RepoID, @@ -73,6 +75,7 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error { log.Error("Disconnected push-mirror found: %d", m.ID) return nil } + repo = m.Repo item = SyncRequest{ Type: PushMirrorType, RepoID: m.RepoID, @@ -89,17 +92,16 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error { default: } - // Check if this request is already in the queue - has, err := mirrorQueue.Has(&item) - if err != nil { - return err - } - if has { - return nil - } - // Push to the Queue if err := mirrorQueue.Push(&item); err != nil { + if err == queue.ErrAlreadyInQueue { + if item.Type == PushMirrorType { + log.Trace("PushMirrors for %-v already queued for sync", repo) + } else { + log.Trace("PullMirrors for %-v already queued for sync", repo) + } + return nil + } return err } @@ -110,23 +112,29 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error { return nil } + pullMirrorsRequested := 0 if pullLimit != 0 { + requested = 0 if err := repo_model.MirrorsIterate(func(idx int, bean interface{}) error { return handler(idx, bean, pullLimit) }); err != nil && err != errLimit { log.Error("MirrorsIterate: %v", err) return err } + pullMirrorsRequested, requested = requested, 0 } + pushMirrorsRequested := 0 if pushLimit != 0 { + requested = 0 if err := repo_model.PushMirrorsIterate(func(idx int, bean interface{}) error { return handler(idx, bean, pushLimit) }); err != nil && err != errLimit { log.Error("PushMirrorsIterate: %v", err) return err } + pushMirrorsRequested, requested = requested, 0 } - log.Trace("Finished: Update") + log.Trace("Finished: Update: %d pull mirrors and %d push mirrors queued", pullMirrorsRequested, pushMirrorsRequested) return nil }