From d14438725e71fad97b7111d2e23253e0cc2374c3 Mon Sep 17 00:00:00 2001 From: Tamir Bahar Date: Thu, 13 Apr 2017 01:43:50 +0300 Subject: [PATCH] Added basic Repository.branches implementation. --- pygit2/repository.py | 70 ++++++++++++++++++++--- test/test_branch.py | 130 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 191 insertions(+), 9 deletions(-) diff --git a/pygit2/repository.py b/pygit2/repository.py index 0de245c..5c8e33f 100644 --- a/pygit2/repository.py +++ b/pygit2/repository.py @@ -32,6 +32,7 @@ from __future__ import absolute_import from string import hexdigits import sys, tarfile from time import time + if sys.version_info[0] < 3: from cStringIO import StringIO else: @@ -44,6 +45,7 @@ from _pygit2 import Repository as _Repository, init_file_backend from _pygit2 import Oid, GIT_OID_HEXSZ, GIT_OID_MINPREFIXLEN from _pygit2 import GIT_CHECKOUT_SAFE, GIT_CHECKOUT_RECREATE_MISSING, GIT_DIFF_NORMAL from _pygit2 import GIT_FILEMODE_LINK +from _pygit2 import GIT_BRANCH_LOCAL, GIT_BRANCH_REMOTE from _pygit2 import Reference, Tree, Commit, Blob, Signature from .config import Config @@ -57,7 +59,6 @@ from .submodule import Submodule class BaseRepository(_Repository): - def __init__(self, backend, *args, **kwargs): super(BaseRepository, self).__init__(backend, *args, **kwargs) self._common_init() @@ -484,6 +485,7 @@ class BaseRepository(_Repository): @staticmethod def _merge_options(favor): """Return a 'git_merge_opts *'""" + def favor_to_enum(favor): if favor == 'normal': return C.GIT_MERGE_FILE_FAVOR_NORMAL @@ -530,13 +532,13 @@ class BaseRepository(_Repository): theirs._to_c() if theirs is not None else (ffi.NULL, ffi.NULL)) err = C.git_merge_file_from_index( - cmergeresult, self._repo, - cancestor, cours, ctheirs, - ffi.NULL); + cmergeresult, self._repo, + cancestor, cours, ctheirs, + ffi.NULL); check_error(err) ret = ffi.string(cmergeresult.ptr, - cmergeresult.len).decode('utf-8') + cmergeresult.len).decode('utf-8') C.git_merge_file_result_free(cmergeresult) return ret @@ -735,11 +737,12 @@ class BaseRepository(_Repository): C.git_buf_free(buf) finally: C.git_describe_result_free(result[0]) + # # Stash # def stash(self, stasher, message=None, keep_index=False, - include_untracked=False, include_ignored=False): + include_untracked=False, include_ignored=False): """Save changes to the working directory to the stash. :param Signature stasher: The identity of the person doing the stashing. @@ -817,7 +820,6 @@ class BaseRepository(_Repository): """ check_error(C.git_stash_drop(self._repo, index)) - def stash_pop(self, index=0, **kwargs): """Apply a stashed state and remove it from the stash list. @@ -886,11 +888,11 @@ class BaseRepository(_Repository): info = tarfile.TarInfo(prefix + entry.path) info.size = len(content) info.mtime = timestamp - info.uname = info.gname = 'root' # just because git does this + info.uname = info.gname = 'root' # just because git does this if entry.mode == GIT_FILEMODE_LINK: info.type = tarfile.SYMTYPE info.linkname = content.decode("utf-8") - info.mode = 0o777 # symlinks get placeholder + info.mode = 0o777 # symlinks get placeholder info.size = 0 archive.addfile(info) else: @@ -996,6 +998,54 @@ class BaseRepository(_Repository): check_error(err) +class Branches(object): + def __init__(self, repository, flag=GIT_BRANCH_LOCAL | GIT_BRANCH_REMOTE): + self._repository = repository + self._flag = flag + + if flag == GIT_BRANCH_LOCAL | GIT_BRANCH_REMOTE: + self.local = Branches(repository, flag=GIT_BRANCH_LOCAL) + self.remote = Branches(repository, flag=GIT_BRANCH_REMOTE) + + def __getitem__(self, name): + branch = None + if self._flag & GIT_BRANCH_LOCAL: + branch = self._repository.lookup_branch(name, GIT_BRANCH_LOCAL) + + if branch is None and self._flag & GIT_BRANCH_REMOTE: + branch = self._repository.lookup_branch(name, GIT_BRANCH_REMOTE) + + if branch is None: + raise KeyError('Branch not found: {}'.format(name)) + + return branch + + def get(self, key): + try: + return self[key] + except KeyError: + return None + + def __iter__(self): + for branch_name in self._repository.listall_branches(self._flag): + yield branch_name + + def create(self, name, commit, force=False): + return self._repository.create_branch(name, commit, force) + + def delete(self, name): + self[name].delete() + + def __contains__(self, name): + try: + # If the lookup succeeds, the name is present. + _ = self[name] + return True + + except KeyError: + return False + + class Repository(BaseRepository): def __init__(self, path, *args, **kwargs): if not isinstance(path, six.string_types): @@ -1004,6 +1054,8 @@ class Repository(BaseRepository): path_backend = init_file_backend(path) super(Repository, self).__init__(backend=path_backend, *args, **kwargs) + self.branches = Branches(self) + @classmethod def _from_c(cls, ptr, owned): cptr = ffi.new('git_repository **') diff --git a/test/test_branch.py b/test/test_branch.py index 82a5a2f..ef35398 100644 --- a/test/test_branch.py +++ b/test/test_branch.py @@ -39,6 +39,135 @@ I18N_LAST_COMMIT = '5470a671a80ac3789f1a6a8cefbcf43ce7af0563' ORIGIN_MASTER_COMMIT = '784855caf26449a1914d2cf62d12b9374d76ae78' +class BranchesObjectTestCase(utils.RepoTestCase): + def test_lookup_branch_local(self): + branch = self.repo.branches['master'] + self.assertEqual(branch.target.hex, LAST_COMMIT) + + branch = self.repo.branches.local['i18n'] + self.assertEqual(branch.target.hex, I18N_LAST_COMMIT) + + self.assertTrue(self.repo.branches.get('not-exists') is None) + + self.assertRaises(KeyError, lambda: self.repo.branches['not-exists']) + + def test_listall_branches(self): + branches = sorted(self.repo.branches) + self.assertEqual(branches, ['i18n', 'master']) + + def test_create_branch(self): + commit = self.repo[LAST_COMMIT] + reference = self.repo.branches.create('version1', commit) + self.assertTrue('version1' in self.repo.branches) + reference = self.repo.branches['version1'] + self.assertEqual(reference.target.hex, LAST_COMMIT) + + # try to create existing reference + self.assertRaises(ValueError, + lambda: self.repo.branches.create('version1', commit)) + + # try to create existing reference with force + reference = self.repo.branches.create('version1', commit, True) + self.assertEqual(reference.target.hex, LAST_COMMIT) + + def test_delete(self): + self.repo.branches.delete('i18n') + + self.assertTrue(self.repo.branches.get('i18n') is None) + + def test_cant_delete_master(self): + self.assertRaises(pygit2.GitError, lambda: self.repo.branches.delete('master')) + + def test_branch_is_head_returns_true_if_branch_is_head(self): + branch = self.repo.branches.get('master') + self.assertTrue(branch.is_head()) + + def test_branch_is_head_returns_false_if_branch_is_not_head(self): + branch = self.repo.branches.get('i18n') + self.assertFalse(branch.is_head()) + + def test_branch_rename_succeeds(self): + new_branch = self.repo.branches['i18n'].rename('new-branch') + self.assertEqual(new_branch.target.hex, I18N_LAST_COMMIT) + + new_branch_2 = self.repo.branches.get('new-branch') + self.assertEqual(new_branch_2.target.hex, I18N_LAST_COMMIT) + + def test_branch_rename_fails_if_destination_already_exists(self): + original_branch = self.repo.branches.get('i18n') + self.assertRaises(ValueError, lambda: original_branch.rename('master')) + + def test_branch_rename_not_fails_if_force_is_true(self): + original_branch = self.repo.branches.get('master') + new_branch = original_branch.rename('i18n', True) + self.assertEqual(new_branch.target.hex, LAST_COMMIT) + + def test_branch_rename_fails_with_invalid_names(self): + original_branch = self.repo.branches.get('i18n') + self.assertRaises(ValueError, + lambda: original_branch.rename('abc@{123')) + + def test_branch_name(self): + branch = self.repo.branches.get('master') + self.assertEqual(branch.branch_name, 'master') + self.assertEqual(branch.name, 'refs/heads/master') + + branch = self.repo.branches.get('i18n') + self.assertEqual(branch.branch_name, 'i18n') + self.assertEqual(branch.name, 'refs/heads/i18n') + + +class BranchesObjectEmptyRepoTestCase(utils.EmptyRepoTestCase): + def setUp(self): + super(utils.EmptyRepoTestCase, self).setUp() + + remote = self.repo.remotes[0] + remote.fetch() + + def test_lookup_branch_remote(self): + branch = self.repo.branches.remote.get('origin/master') + self.assertEqual(branch.target.hex, ORIGIN_MASTER_COMMIT) + + self.assertTrue( + self.repo.branches.remote.get('origin/not-exists') is None) + + def test_listall_branches(self): + branches = sorted(self.repo.branches.remote) + self.assertEqual(branches, ['origin/master']) + + def test_branch_remote_name(self): + self.repo.remotes[0].fetch() + branch = self.repo.branches.remote['origin/master'] + self.assertEqual(branch.remote_name, 'origin') + + def test_branch_upstream(self): + self.repo.remotes[0].fetch() + remote_master = self.repo.branches.remote['origin/master'] + master = self.repo.branches.create('master', + self.repo[remote_master.target.hex]) + + self.assertTrue(master.upstream is None) + master.upstream = remote_master + self.assertEqual(master.upstream.branch_name, 'origin/master') + + def set_bad_upstream(): + master.upstream = 2.5 + + self.assertRaises(TypeError, set_bad_upstream) + + master.upstream = None + self.assertTrue(master.upstream is None) + + def test_branch_upstream_name(self): + self.repo.remotes[0].fetch() + remote_master = self.repo.branches.remote['origin/master'] + master = self.repo.branches.create('master', + self.repo[remote_master.target.hex]) + + master.upstream = remote_master + self.assertEqual(master.upstream_name, 'refs/remotes/origin/master') + + class BranchesTestCase(utils.RepoTestCase): def test_lookup_branch_local(self): branch = self.repo.lookup_branch('master') @@ -159,6 +288,7 @@ class BranchesEmptyRepoTestCase(utils.EmptyRepoTestCase): def set_bad_upstream(): master.upstream = 2.5 + self.assertRaises(TypeError, set_bad_upstream) master.upstream = None