diff options
Diffstat (limited to 'core/job')
| -rw-r--r-- | core/job/job.go | 79 | ||||
| -rw-r--r-- | core/job/queue.go | 127 |
2 files changed, 206 insertions, 0 deletions
diff --git a/core/job/job.go b/core/job/job.go new file mode 100644 index 0000000..9cfc0ed --- /dev/null +++ b/core/job/job.go @@ -0,0 +1,79 @@ +package job + +import ( + "github.com/isacikgoz/gitbatch/core/command" + "github.com/isacikgoz/gitbatch/core/git" +) + +// Job relates the type of the operation and the entity +type Job struct { + // JobType is to select operation type that will be applied to repository + JobType JobType + // Repository points to the repository that will be used for operation + Repository *git.Repository + // Options is a placeholder for operation options + Options interface{} +} + +// JobType is the a git operation supported +type JobType string + +const ( + // FetchJob is wrapper of git fetch command + FetchJob JobType = "fetch" + + // PullJob is wrapper of git pull command + PullJob JobType = "pull" + + // MergeJob is wrapper of git merge command + MergeJob JobType = "merge" +) + +// starts the job +func (j *Job) start() error { + j.Repository.SetWorkStatus(git.Working) + // TODO: Handle errors? + // TOOD: Better implementation required + switch mode := j.JobType; mode { + case FetchJob: + var opts command.FetchOptions + if j.Options != nil { + opts = j.Options.(command.FetchOptions) + } else { + opts = command.FetchOptions{ + RemoteName: j.Repository.State.Remote.Name, + } + } + if err := command.Fetch(j.Repository, opts); err != nil { + j.Repository.SetWorkStatus(git.Fail) + j.Repository.State.Message = err.Error() + return err + } + case PullJob: + var opts command.PullOptions + if j.Options != nil { + opts = j.Options.(command.PullOptions) + } else { + opts = command.PullOptions{ + RemoteName: j.Repository.State.Remote.Name, + } + } + if err := command.Pull(j.Repository, opts); err != nil { + j.Repository.SetWorkStatus(git.Fail) + j.Repository.State.Message = err.Error() + return err + } + case MergeJob: + if err := command.Merge(j.Repository, command.MergeOptions{ + BranchName: j.Repository.State.Remote.Branch.Name, + }); err != nil { + j.Repository.SetWorkStatus(git.Fail) + j.Repository.State.Message = err.Error() + return nil + } + default: + j.Repository.SetWorkStatus(git.Available) + return nil + } + return nil +} diff --git a/core/job/queue.go b/core/job/queue.go new file mode 100644 index 0000000..e337064 --- /dev/null +++ b/core/job/queue.go @@ -0,0 +1,127 @@ +package job + +import ( + "context" + "errors" + "runtime" + "sync" + + "github.com/isacikgoz/gitbatch/core/git" + log "github.com/sirupsen/logrus" + "golang.org/x/sync/semaphore" +) + +// JobQueue holds the slice of Jobs +type JobQueue struct { + series []*Job +} + +// CreateJobQueue creates a jobqueue struct and initialize its slice then return +// its pointer +func CreateJobQueue() (jq *JobQueue) { + s := make([]*Job, 0) + return &JobQueue{ + series: s, + } +} + +// AddJob adds a job to the queue +func (jq *JobQueue) AddJob(j *Job) error { + for _, job := range jq.series { + if job.Repository.RepoID == j.Repository.RepoID && job.JobType == j.JobType { + return errors.New("Same job already is in the queue") + } + } + jq.series = append(jq.series, nil) + copy(jq.series[1:], jq.series[0:]) + jq.series[0] = j + return nil +} + +// StartNext starts the next job in the queue +func (jq *JobQueue) StartNext() (j *Job, finished bool, err error) { + finished = false + if len(jq.series) < 1 { + finished = true + return nil, finished, nil + } + i := len(jq.series) - 1 + lastJob := jq.series[i] + jq.series = jq.series[:i] + if err = lastJob.start(); err != nil { + return lastJob, finished, err + } + return lastJob, finished, nil +} + +// RemoveFromQueue deletes the given entity and its job from the queue +// TODO: it is not safe if the job has been started +func (jq *JobQueue) RemoveFromQueue(r *git.Repository) error { + removed := false + for i, job := range jq.series { + if job.Repository.RepoID == r.RepoID { + jq.series = append(jq.series[:i], jq.series[i+1:]...) + removed = true + } + } + if !removed { + return errors.New("There is no job with given repoID") + } + return nil +} + +// IsInTheQueue function; since the job and entity is not tied with its own +// struct, this function returns true if that entity is in the queue along with +// the jobs type +func (jq *JobQueue) IsInTheQueue(r *git.Repository) (inTheQueue bool, j *Job) { + inTheQueue = false + for _, job := range jq.series { + if job.Repository.RepoID == r.RepoID { + inTheQueue = true + j = job + } + } + return inTheQueue, j +} + +// StartJobsAsync start he jobs in the queue asynchronously +func (jq *JobQueue) StartJobsAsync() map[*Job]error { + + ctx := context.TODO() + + var ( + maxWorkers = runtime.GOMAXPROCS(0) + sem = semaphore.NewWeighted(int64(maxWorkers)) + fails = make(map[*Job]error) + ) + + var mx sync.Mutex + for range jq.series { + + if err := sem.Acquire(ctx, 1); err != nil { + log.Errorf("Failed to acquire semaphore: %v", err) + break + } + + go func() { + + defer sem.Release(1) + j, _, err := jq.StartNext() + if err != nil { + mx.Lock() + fails[j] = err + mx.Unlock() + } + }() + } + + // Acquire all of the tokens to wait for any remaining workers to finish. + // + // If you are already waiting for the workers by some other means (such as an + // errgroup.Group), you can omit this final Acquire call. + if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil { + log.Errorf("Failed to acquire semaphore: %v", err) + } + + return fails +} |
