summaryrefslogtreecommitdiff
path: root/core/job
diff options
context:
space:
mode:
Diffstat (limited to 'core/job')
-rw-r--r--core/job/job.go79
-rw-r--r--core/job/queue.go127
2 files changed, 206 insertions, 0 deletions
diff --git a/core/job/job.go b/core/job/job.go
new file mode 100644
index 0000000..9cfc0ed
--- /dev/null
+++ b/core/job/job.go
@@ -0,0 +1,79 @@
+package job
+
+import (
+ "github.com/isacikgoz/gitbatch/core/command"
+ "github.com/isacikgoz/gitbatch/core/git"
+)
+
+// Job relates the type of the operation and the entity
+type Job struct {
+ // JobType is to select operation type that will be applied to repository
+ JobType JobType
+ // Repository points to the repository that will be used for operation
+ Repository *git.Repository
+ // Options is a placeholder for operation options
+ Options interface{}
+}
+
+// JobType is the a git operation supported
+type JobType string
+
+const (
+ // FetchJob is wrapper of git fetch command
+ FetchJob JobType = "fetch"
+
+ // PullJob is wrapper of git pull command
+ PullJob JobType = "pull"
+
+ // MergeJob is wrapper of git merge command
+ MergeJob JobType = "merge"
+)
+
+// starts the job
+func (j *Job) start() error {
+ j.Repository.SetWorkStatus(git.Working)
+ // TODO: Handle errors?
+ // TOOD: Better implementation required
+ switch mode := j.JobType; mode {
+ case FetchJob:
+ var opts command.FetchOptions
+ if j.Options != nil {
+ opts = j.Options.(command.FetchOptions)
+ } else {
+ opts = command.FetchOptions{
+ RemoteName: j.Repository.State.Remote.Name,
+ }
+ }
+ if err := command.Fetch(j.Repository, opts); err != nil {
+ j.Repository.SetWorkStatus(git.Fail)
+ j.Repository.State.Message = err.Error()
+ return err
+ }
+ case PullJob:
+ var opts command.PullOptions
+ if j.Options != nil {
+ opts = j.Options.(command.PullOptions)
+ } else {
+ opts = command.PullOptions{
+ RemoteName: j.Repository.State.Remote.Name,
+ }
+ }
+ if err := command.Pull(j.Repository, opts); err != nil {
+ j.Repository.SetWorkStatus(git.Fail)
+ j.Repository.State.Message = err.Error()
+ return err
+ }
+ case MergeJob:
+ if err := command.Merge(j.Repository, command.MergeOptions{
+ BranchName: j.Repository.State.Remote.Branch.Name,
+ }); err != nil {
+ j.Repository.SetWorkStatus(git.Fail)
+ j.Repository.State.Message = err.Error()
+ return nil
+ }
+ default:
+ j.Repository.SetWorkStatus(git.Available)
+ return nil
+ }
+ return nil
+}
diff --git a/core/job/queue.go b/core/job/queue.go
new file mode 100644
index 0000000..e337064
--- /dev/null
+++ b/core/job/queue.go
@@ -0,0 +1,127 @@
+package job
+
+import (
+ "context"
+ "errors"
+ "runtime"
+ "sync"
+
+ "github.com/isacikgoz/gitbatch/core/git"
+ log "github.com/sirupsen/logrus"
+ "golang.org/x/sync/semaphore"
+)
+
+// JobQueue holds the slice of Jobs
+type JobQueue struct {
+ series []*Job
+}
+
+// CreateJobQueue creates a jobqueue struct and initialize its slice then return
+// its pointer
+func CreateJobQueue() (jq *JobQueue) {
+ s := make([]*Job, 0)
+ return &JobQueue{
+ series: s,
+ }
+}
+
+// AddJob adds a job to the queue
+func (jq *JobQueue) AddJob(j *Job) error {
+ for _, job := range jq.series {
+ if job.Repository.RepoID == j.Repository.RepoID && job.JobType == j.JobType {
+ return errors.New("Same job already is in the queue")
+ }
+ }
+ jq.series = append(jq.series, nil)
+ copy(jq.series[1:], jq.series[0:])
+ jq.series[0] = j
+ return nil
+}
+
+// StartNext starts the next job in the queue
+func (jq *JobQueue) StartNext() (j *Job, finished bool, err error) {
+ finished = false
+ if len(jq.series) < 1 {
+ finished = true
+ return nil, finished, nil
+ }
+ i := len(jq.series) - 1
+ lastJob := jq.series[i]
+ jq.series = jq.series[:i]
+ if err = lastJob.start(); err != nil {
+ return lastJob, finished, err
+ }
+ return lastJob, finished, nil
+}
+
+// RemoveFromQueue deletes the given entity and its job from the queue
+// TODO: it is not safe if the job has been started
+func (jq *JobQueue) RemoveFromQueue(r *git.Repository) error {
+ removed := false
+ for i, job := range jq.series {
+ if job.Repository.RepoID == r.RepoID {
+ jq.series = append(jq.series[:i], jq.series[i+1:]...)
+ removed = true
+ }
+ }
+ if !removed {
+ return errors.New("There is no job with given repoID")
+ }
+ return nil
+}
+
+// IsInTheQueue function; since the job and entity is not tied with its own
+// struct, this function returns true if that entity is in the queue along with
+// the jobs type
+func (jq *JobQueue) IsInTheQueue(r *git.Repository) (inTheQueue bool, j *Job) {
+ inTheQueue = false
+ for _, job := range jq.series {
+ if job.Repository.RepoID == r.RepoID {
+ inTheQueue = true
+ j = job
+ }
+ }
+ return inTheQueue, j
+}
+
+// StartJobsAsync start he jobs in the queue asynchronously
+func (jq *JobQueue) StartJobsAsync() map[*Job]error {
+
+ ctx := context.TODO()
+
+ var (
+ maxWorkers = runtime.GOMAXPROCS(0)
+ sem = semaphore.NewWeighted(int64(maxWorkers))
+ fails = make(map[*Job]error)
+ )
+
+ var mx sync.Mutex
+ for range jq.series {
+
+ if err := sem.Acquire(ctx, 1); err != nil {
+ log.Errorf("Failed to acquire semaphore: %v", err)
+ break
+ }
+
+ go func() {
+
+ defer sem.Release(1)
+ j, _, err := jq.StartNext()
+ if err != nil {
+ mx.Lock()
+ fails[j] = err
+ mx.Unlock()
+ }
+ }()
+ }
+
+ // Acquire all of the tokens to wait for any remaining workers to finish.
+ //
+ // If you are already waiting for the workers by some other means (such as an
+ // errgroup.Group), you can omit this final Acquire call.
+ if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
+ log.Errorf("Failed to acquire semaphore: %v", err)
+ }
+
+ return fails
+}