summaryrefslogtreecommitdiff
path: root/core/job/job-queue.go
blob: e337064a8c49b91e30f871a61e43c9a137ad22c8 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package job

import (
	"context"
	"errors"
	"runtime"
	"sync"

	"github.com/isacikgoz/gitbatch/core/git"
	log "github.com/sirupsen/logrus"
	"golang.org/x/sync/semaphore"
)

// JobQueue holds the slice of Jobs
type JobQueue struct {
	series []*Job
}

// CreateJobQueue creates a jobqueue struct and initialize its slice then return
// its pointer
func CreateJobQueue() (jq *JobQueue) {
	s := make([]*Job, 0)
	return &JobQueue{
		series: s,
	}
}

// AddJob adds a job to the queue
func (jq *JobQueue) AddJob(j *Job) error {
	for _, job := range jq.series {
		if job.Repository.RepoID == j.Repository.RepoID && job.JobType == j.JobType {
			return errors.New("Same job already is in the queue")
		}
	}
	jq.series = append(jq.series, nil)
	copy(jq.series[1:], jq.series[0:])
	jq.series[0] = j
	return nil
}

// StartNext starts the next job in the queue
func (jq *JobQueue) StartNext() (j *Job, finished bool, err error) {
	finished = false
	if len(jq.series) < 1 {
		finished = true
		return nil, finished, nil
	}
	i := len(jq.series) - 1
	lastJob := jq.series[i]
	jq.series = jq.series[:i]
	if err = lastJob.start(); err != nil {
		return lastJob, finished, err
	}
	return lastJob, finished, nil
}

// RemoveFromQueue deletes the given entity and its job from the queue
// TODO: it is not safe if the job has been started
func (jq *JobQueue) RemoveFromQueue(r *git.Repository) error {
	removed := false
	for i, job := range jq.series {
		if job.Repository.RepoID == r.RepoID {
			jq.series = append(jq.series[:i], jq.series[i+1:]...)
			removed = true
		}
	}
	if !removed {
		return errors.New("There is no job with given repoID")
	}
	return nil
}

// IsInTheQueue function; since the job and entity is not tied with its own
// struct, this function returns true if that entity is in the queue along with
// the jobs type
func (jq *JobQueue) IsInTheQueue(r *git.Repository) (inTheQueue bool, j *Job) {
	inTheQueue = false
	for _, job := range jq.series {
		if job.Repository.RepoID == r.RepoID {
			inTheQueue = true
			j = job
		}
	}
	return inTheQueue, j
}

// StartJobsAsync start he jobs in the queue asynchronously
func (jq *JobQueue) StartJobsAsync() map[*Job]error {

	ctx := context.TODO()

	var (
		maxWorkers = runtime.GOMAXPROCS(0)
		sem        = semaphore.NewWeighted(int64(maxWorkers))
		fails      = make(map[*Job]error)
	)

	var mx sync.Mutex
	for range jq.series {

		if err := sem.Acquire(ctx, 1); err != nil {
			log.Errorf("Failed to acquire semaphore: %v", err)
			break
		}

		go func() {

			defer sem.Release(1)
			j, _, err := jq.StartNext()
			if err != nil {
				mx.Lock()
				fails[j] = err
				mx.Unlock()
			}
		}()
	}

	// Acquire all of the tokens to wait for any remaining workers to finish.
	//
	// If you are already waiting for the workers by some other means (such as an
	// errgroup.Group), you can omit this final Acquire call.
	if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
		log.Errorf("Failed to acquire semaphore: %v", err)
	}

	return fails
}