Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 175 additions & 71 deletions cmd/private-org-sync/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os/exec"
"path/filepath"
"strings"
"sync"
"time"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -65,6 +66,7 @@ type options struct {
confirm bool
failOnNonexistentDst bool
debug bool
parallelism int
}

const defaultPrefix = "https://github.com"
Expand Down Expand Up @@ -144,6 +146,7 @@ func gatherOptions() options {
fs.BoolVar(&o.failOnNonexistentDst, "fail-on-missing-destination", false, "Set true to make the tool to consider missing sync destination as an error")

fs.BoolVar(&o.debug, "debug", false, "Set true to enable debug logging level")
fs.IntVar(&o.parallelism, "parallelism", 4, "Number of repos to sync in parallel")

o.Options.Bind(fs)
o.WhitelistOptions.Bind(fs)
Expand All @@ -170,6 +173,27 @@ func withRetryOnNonzero(f gitFunc, retries int) gitFunc {
}
}

func withRetryOnTransientError(f gitFunc, retries int) gitFunc {
return func(logger *logrus.Entry, dir string, command ...string) (string, int, error) {
var out string
var exitCode int
var commandErr error
for attempt := 1; attempt <= retries; attempt++ {
out, exitCode, commandErr = f(logger, dir, command...)
if commandErr == nil && exitCode == 0 {
return out, exitCode, nil
}
if attempt < retries && isTransientNetworkError(out) {
logger.Infof("Transient network error, retrying git command (%d/%d)", attempt, retries)
time.Sleep(5 * time.Second)
continue
}
break
}
return out, exitCode, commandErr
}
}

func gitExec(logger *logrus.Entry, dir string, command ...string) (string, int, error) {
cmdLogger := logger.WithField("command", fmt.Sprintf("git %s", strings.Join(command, " ")))
cmd := exec.Command("git", command...)
Expand Down Expand Up @@ -319,11 +343,30 @@ func maybeTooShallow(pushOutput string) bool {
return false
}

func isTransientNetworkError(output string) bool {
patterns := []string{
"Could not resolve host",
"Failed to connect to",
"Connection timed out",
"Connection refused",
"Connection reset by peer",
"The requested URL returned error: 5",
}
for _, pattern := range patterns {
if strings.Contains(output, pattern) {
return true
}
}
return false
}

// location specifies a GitHub repository branch used as a source or destination
type location struct {
org, repo, branch string
}

type repoKey struct{ org, repo string }

func (l location) String() string {
return fmt.Sprintf("%s/%s@%s", l.org, l.repo, l.branch)
}
Expand Down Expand Up @@ -364,58 +407,126 @@ func (g gitSyncer) initRepo(repoDir, org, repo string) error {
return nil
}

// mirror syncs content from source location to destination one, using a local
// repository in the given path. The `repoDir` must have been previously
// initialized via initRepo(). The git content from the `src` location will
// be fetched to this local repository and then pushed to the `dst` location.
// Multiple `mirror` calls over the same `repoDir` will reuse the content
// fetched in previous calls, acting like a cache.
func (g gitSyncer) mirror(repoDir string, src, dst location) error {
mirrorFields := logrus.Fields{
"source": src.String(),
"destination": dst.String(),
"local-repo": repoDir,
// syncRepo initializes a local git repo, fetches branch heads from both source
// and destination via ls-remote, and mirrors each branch that needs syncing.
func (g gitSyncer) syncRepo(org, repo, targetOrg, dstRepo string, branches []location) []error {
var errs []error
repoLogger := g.logger

gitDir, err := g.makeGitDir(org, repo)
if err != nil {
for _, source := range branches {
errs = append(errs, fmt.Errorf("%s: %w", source.String(), err))
}
return errs
}

if err := g.initRepo(gitDir, org, repo); err != nil {
for _, source := range branches {
errs = append(errs, fmt.Errorf("%s: %w", source.String(), err))
}
return errs
}
logger := g.logger.WithFields(mirrorFields)
logger.Info("Syncing content between locations")

// We ls-remote destination first thing because when it does not exist
// we do not need to do any of the remaining operations.
logger.Debug("Determining HEAD of destination branch")
destUrlRaw := fmt.Sprintf("%s/%s/%s", g.prefix, dst.org, dst.repo)
// ls-remote source and destination in parallel
destUrlRaw := fmt.Sprintf("%s/%s/%s", g.prefix, targetOrg, dstRepo)
destUrl, err := url.Parse(destUrlRaw)
if err != nil {
logger.WithField("remote-url", destUrlRaw).WithError(err).Error("Failed to construct URL for the destination remote")
return fmt.Errorf("failed to construct URL for the destination remote")
repoLogger.WithField("remote-url", destUrlRaw).WithError(err).Error("Failed to construct URL for the destination remote")
for _, source := range branches {
errs = append(errs, fmt.Errorf("%s: failed to construct URL for the destination remote", source.String()))
}
return errs
}
if g.token != "" {
destUrl.User = url.User(g.token)
}

dstHeads, err := getRemoteBranchHeads(logger, g.git, repoDir, destUrl.String())
if err != nil {
srcRemote := fmt.Sprintf("%s-%s", org, repo)

type lsRemoteResult struct {
heads RemoteBranchHeads
err error
}
dstResult := make(chan lsRemoteResult, 1)
srcResult := make(chan lsRemoteResult, 1)
go func() {
heads, err := getRemoteBranchHeads(repoLogger, g.git, gitDir, destUrl.String())
dstResult <- lsRemoteResult{heads, err}
}()
go func() {
heads, err := getRemoteBranchHeads(repoLogger, withRetryOnNonzero(g.git, 5), gitDir, srcRemote)
srcResult <- lsRemoteResult{heads, err}
}()

dst := <-dstResult
src := <-srcResult

if dst.err != nil {
message := "destination repository does not exist or we cannot access it"
if g.failOnNonexistentDst {
logger.Errorf("%s", message)
return fmt.Errorf("%s", message)
repoLogger.Errorf("%s", message)
for _, source := range branches {
errs = append(errs, fmt.Errorf("%s: %s", source.String(), message))
}
} else {
repoLogger.Warn(message)
}
return errs
}

logger.Warn(message)
return nil
if src.err != nil {
repoLogger.WithError(src.err).Error("Failed to determine branch HEADs in source")
for _, source := range branches {
errs = append(errs, fmt.Errorf("%s: failed to determine branch HEADs in source", source.String()))
}
return errs
}

dstHeads := dst.heads
srcHeads := src.heads

for _, source := range branches {
g.logger = config.LoggerForInfo(config.Info{
Metadata: api.Metadata{
Org: source.org,
Repo: source.repo,
Branch: source.branch,
},
})

destination := location{org: targetOrg, repo: dstRepo, branch: source.branch}

if err := g.mirror(gitDir, source, destination, srcHeads, dstHeads, destUrl); err != nil {
errs = append(errs, fmt.Errorf("%s->%s: %w", source.String(), destination.String(), err))
}
}

return errs
}

// mirror syncs a single branch from source to destination, using pre-fetched
// branch head information. The `repoDir` must have been previously initialized
// with git init and remote setup. The `srcHeads` and `dstHeads` must have been
// obtained from ls-remote calls against the source and destination repos.
// Multiple `mirror` calls over the same `repoDir` will reuse the content
// fetched in previous calls, acting like a cache.
func (g gitSyncer) mirror(repoDir string, src, dst location, srcHeads, dstHeads RemoteBranchHeads, destUrl *url.URL) error {
mirrorFields := logrus.Fields{
"source": src.String(),
"destination": dst.String(),
"local-repo": repoDir,
}
logger := g.logger.WithFields(mirrorFields)
logger.Info("Syncing content between locations")

dstCommitHash := dstHeads[dst.branch]

srcRemote := fmt.Sprintf("%s-%s", src.org, src.repo)

logger.Debug("Determining HEAD of source branch")
srcHeads, err := getRemoteBranchHeads(logger, withRetryOnNonzero(g.git, 5), repoDir, srcRemote)
if err != nil {
logger.WithError(err).Error("Failed to determine branch HEADs in source")
return fmt.Errorf("failed to determine branch HEADs in source")
}
srcCommitHash, ok := srcHeads[src.branch]
if !ok {
logger.WithError(err).Error("Branch does not exist in source remote")
logger.Error("Branch does not exist in source remote")
return fmt.Errorf("branch does not exist in source remote")
}

Expand Down Expand Up @@ -641,7 +752,7 @@ func main() {
token: token,
root: o.gitDir,
confirm: o.confirm,
git: gitExec,
git: withRetryOnTransientError(gitExec, 3),
failOnNonexistentDst: o.failOnNonexistentDst,
gitName: o.gitName,
gitEmail: o.gitEmail,
Expand All @@ -667,7 +778,6 @@ func main() {
}

// Group locations by (org, repo) so we can initialize each repo once
type repoKey struct{ org, repo string }
grouped := make(map[repoKey][]location)
for source := range locations {
key := repoKey{org: source.org, repo: source.repo}
Expand All @@ -680,46 +790,40 @@ func main() {
flattenedOrgs.Insert(o.org)
}

for key, branches := range grouped {
gitDir, err := syncer.makeGitDir(key.org, key.repo)
if err != nil {
for _, source := range branches {
errs = append(errs, fmt.Errorf("%s: %w", source.String(), err))
}
continue
}

syncer.logger = logrus.WithFields(logrus.Fields{
"org": key.org,
"repo": key.repo,
})
if err := syncer.initRepo(gitDir, key.org, key.repo); err != nil {
for _, source := range branches {
errs = append(errs, fmt.Errorf("%s: %w", source.String(), err))
}
continue
}

for _, source := range branches {
syncer.logger = config.LoggerForInfo(config.Info{
Metadata: api.Metadata{
Org: source.org,
Repo: source.repo,
Branch: source.branch,
},
})

destination := source
destination.org = o.targetOrg
if !flattenedOrgs.Has(source.org) {
destination.repo = fmt.Sprintf("%s-%s", source.org, source.repo)
type repoWork struct {
key repoKey
branches []location
}
work := make(chan repoWork)
var errsMu sync.Mutex
var wg sync.WaitGroup

for i := 0; i < o.parallelism; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for item := range work {
repoSyncer := syncer
repoSyncer.logger = logrus.WithFields(logrus.Fields{"org": item.key.org, "repo": item.key.repo})
dstRepo := item.key.repo
if !flattenedOrgs.Has(item.key.org) {
dstRepo = fmt.Sprintf("%s-%s", item.key.org, item.key.repo)
}
repoErrs := repoSyncer.syncRepo(item.key.org, item.key.repo, o.targetOrg, dstRepo, item.branches)
if len(repoErrs) > 0 {
errsMu.Lock()
errs = append(errs, repoErrs...)
errsMu.Unlock()
}
}
}()
}

if err := syncer.mirror(gitDir, source, destination); err != nil {
errs = append(errs, fmt.Errorf("%s->%s: %w", source.String(), destination.String(), err))
}
}
for key, branches := range grouped {
work <- repoWork{key: key, branches: branches}
}
close(work)
wg.Wait()

if len(errs) > 0 {
logrus.WithError(utilerrors.NewAggregate(errs)).Fatal("There were failures")
Expand Down
Loading