diff --git a/patchwork/forge/__init__.py b/patchwork/forge/__init__.py
index 1c236e4..d9ac748 100644
--- a/patchwork/forge/__init__.py
+++ b/patchwork/forge/__init__.py
@@ -158,6 +158,14 @@ class ForgeBackend(ABC):
auth.update(repo_overrides)
return auth
+ def git_credentials(self, forge_config):
+ """
+ Return git credential store content as a string for the given
+ project. Written to a temporary file and passed to git via
+ GIT_CREDENTIAL_HELPER during clone and fetch operations.
+ """
+ raise NotImplementedError
+
_backends = {}
diff --git a/patchwork/forge/git.py b/patchwork/forge/git.py
new file mode 100644
index 0000000..311c042
--- /dev/null
+++ b/patchwork/forge/git.py
@@ -0,0 +1,274 @@
+# Patchwork - automated patch tracking system
+# Copyright (C) 2026 Robin Jarry <robin@jarry.cc>
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+
+"""
+Git mirror management for forge sync operations.
+"""
+
+import contextlib
+import email
+import logging
+import os
+import re
+import subprocess
+import tempfile
+
+from django.conf import settings
+
+from patchwork.forge.util import bytes_to_mbox
+from patchwork.forge.util import sender_identity
+
+logger = logging.getLogger(__name__)
+
+
+class GitMirror:
+ """
+ Bare mirror clone of a forge repository.
+
+ Provides worktree-based operations for patch generation. All git commands
+ run with isolated configuration (no user/system gitconfig, no interactive
+ prompts) and temporary credential files that are removed after each
+ operation.
+ """
+
+ def __init__(self, backend, forge_config):
+ self.mirror_path = os.path.join(
+ settings.FORGE_GIT_MIRROR_PATH,
+ f'{forge_config.project.linkname}.git',
+ )
+ self.backend = backend
+ self.forge_config = forge_config
+ self.repo_url = backend.repo_url(forge_config)
+ self.auth = backend.get_auth(forge_config)
+ self.__worktree = None
+ self.__credentials = None
+
+ def repo_dir(self):
+ if self.__worktree:
+ return self.__worktree
+ return self.mirror_path
+
+ def git(self, *args, **kwargs):
+ env = dict(os.environ)
+ env.update(
+ {
+ 'GIT_CONFIG_GLOBAL': '/dev/null',
+ 'GIT_CONFIG_SYSTEM': '/dev/null',
+ 'GIT_TERMINAL_PROMPT': '0',
+ }
+ )
+ cmd = ['git']
+ if os.path.isdir(self.repo_dir()):
+ cmd += ['-C', self.repo_dir()]
+ if self.__credentials:
+ cmd += [
+ '-c',
+ f'credential.helper=store --file={self.__credentials}',
+ ]
+ cmd.extend(args)
+ logger.debug('+ %s', ' '.join(cmd))
+ return subprocess.run(cmd, env=env, text=False, check=True, **kwargs)
+
+ def git_output(self, *args, **kwargs):
+ result = self.git(*args, capture_output=True, **kwargs)
+ return result.stdout.decode('utf-8', errors='surrogateescape').strip()
+
+ @contextlib.contextmanager
+ def credentials(self):
+ with tempfile.NamedTemporaryFile(
+ prefix='patchwork-cred-', mode='w', delete_on_close=False
+ ) as tmp:
+ tmp.write(self.backend.git_credentials(self.forge_config))
+ tmp.close()
+ try:
+ self.__credentials = tmp.name
+ yield
+ finally:
+ self.__credentials = None
+
+ def ensure_mirror(self, fetch_refs=None):
+ """
+ Create the bare mirror clone if it does not exist yet and configure the
+ refspec to also fetch pull request heads.
+ """
+ head_path = os.path.join(self.mirror_path, 'HEAD')
+ if not os.path.exists(head_path):
+ logger.info('cloning mirror to %s', self.mirror_path)
+ os.makedirs(os.path.dirname(self.mirror_path), exist_ok=True)
+ with self.credentials():
+ self.git('clone', '--mirror', self.repo_url, self.mirror_path)
+
+ if fetch_refs:
+ self.git(
+ 'config', '--replace-all', 'remote.origin.fetch', fetch_refs
+ )
+
+ def fetch(self):
+ """
+ Fetch all remotes and prune stale references.
+ """
+ logger.info('fetching mirror %s', self.mirror_path)
+ with self.credentials():
+ self.git('fetch', '--all', '--prune')
+
+ def add_worktree(self, ref, path):
+ """
+ Create a temporary worktree checked out at the given ref.
+ """
+ self.git('worktree', 'add', '-fd', '--checkout', path, ref)
+
+ def del_worktree(self, path):
+ """
+ Remove a previously created worktree.
+ """
+ self.git('worktree', 'remove', '-ff', path)
+
+ @contextlib.contextmanager
+ def worktree(self, ref):
+ w = tempfile.mkdtemp(prefix='patchwork-worktree-')
+ try:
+ self.add_worktree(ref, w)
+ self.__worktree = w
+ yield
+ finally:
+ self.__worktree = None
+ self.del_worktree(w)
+
+ def commit_count(self, base_ref):
+ """
+ Return the number of commits in base_ref..HEAD.
+ """
+ out = self.git_output('rev-list', '--count', f'{base_ref}..HEAD')
+ return int(out)
+
+ def ref_exists(self, ref):
+ """
+ Return True if ref exists in the repository.
+ """
+ try:
+ self.git_output('cat-file', '-t', ref)
+ return True
+ except subprocess.CalledProcessError:
+ return False
+
+ RECIPIENT_RE = re.compile(r'\s*\d+\s+(?P<name>.+)\s+<(?P<email>.+@.+)>')
+
+ def recipients(self, base_ref):
+ out = self.git_output(
+ 'shortlog',
+ '-se',
+ '-w0',
+ '--group=author',
+ '--group=committer',
+ '--group=trailer:cc',
+ '--group=trailer:acked-by',
+ '--group=trailer:co-authored-by',
+ '--group=trailer:reported-by',
+ '--group=trailer:requested-by',
+ '--group=trailer:reviewed-by',
+ '--group=trailer:signed-off-by',
+ '--group=trailer:suggested-by',
+ '--group=trailer:tested-by',
+ f'{base_ref}..HEAD',
+ )
+ recipients = {}
+ for m in self.RECIPIENT_RE.finditer(out):
+ name = m.group('name')
+ name = re.sub(r'\w\w+', lambda s: s.group(0).title(), name)
+ name = name.strip('"\' \t')
+ addr = m.group('email').lower()
+ recipients[addr] = name
+ for addr, name in recipients.items():
+ yield email.utils.formataddr((name, addr))
+
+ def add_commit_notes(self, base_ref, note_fn):
+ """
+ Add a git note to each commit in base_ref..HEAD.
+
+ note_fn(sha) is called for each commit and should return the
+ note text, or None to skip.
+ """
+ out = self.git_output('rev-list', f'{base_ref}..HEAD')
+ for sha in out.splitlines():
+ sha = sha.strip()
+ if not sha:
+ continue
+ note = note_fn(sha)
+ if note:
+ self.git('notes', 'add', '-f', '-m', note, sha)
+
+ def format_patches(
+ self,
+ base_ref,
+ user,
+ version=1,
+ cover_title=None,
+ cover_body=None,
+ range_diff_base=None,
+ in_reply_to=None,
+ ):
+ """
+ Generate patches for commits in base_ref..HEAD.
+
+ When the series has more than one commit and a cover_title is provided,
+ a cover letter is generated. For respins (version > 1), --in-reply-to
+ threads the cover letter under the original and --range-diff shows what
+ changed since the previous version.
+
+ Returns mailbox.mbox object containing all messages.
+ """
+ name, addr = sender_identity(user, self.forge_config)
+ args = [
+ '-c',
+ f'user.name={name}',
+ '-c',
+ f'user.email={addr}',
+ 'format-patch',
+ '--stdout',
+ '--notes',
+ '--thread=shallow',
+ f'--subject-prefix=PATCH {self.forge_config.project.linkname}',
+ f'--to={self.forge_config.project.listemail}',
+ ]
+
+ for cc in self.recipients(base_ref):
+ args.append(f'--cc={cc}')
+
+ extra_headers = {
+ 'Sender': self.forge_config.sender_email,
+ 'Reply-To': self.forge_config.project.listemail,
+ 'List-ID': f'<{self.forge_config.project.listid}>',
+ 'X-Patchwork-Hint': 'ignore',
+ }
+ for key, value in extra_headers.items():
+ args.append(f'--add-header={key}: {value}')
+
+ if in_reply_to:
+ args.append(f'--in-reply-to={in_reply_to}')
+
+ if version > 1:
+ args.append(f'-v{version}')
+
+ if self.commit_count(base_ref) > 1 and cover_title:
+ args.append('--cover-letter')
+ if cover_body:
+ desc = f'{cover_title}\n\n{cover_body}'
+ else:
+ desc = cover_title
+ desc_file = os.path.join(self.repo_dir(), '.cover-description')
+ with open(desc_file, 'w') as f:
+ f.write(desc)
+ args += [
+ '--cover-from-description=subject',
+ f'--description-file={desc_file}',
+ ]
+
+ if range_diff_base and self.ref_exists(range_diff_base):
+ args.append(f'--range-diff={base_ref}..{range_diff_base}')
+
+ args.append(f'{base_ref}..HEAD')
+
+ result = self.git(*args, capture_output=True)
+ return bytes_to_mbox(result.stdout)
diff --git a/patchwork/settings/base.py b/patchwork/settings/base.py
index e5bcbac..bc42b14 100644
--- a/patchwork/settings/base.py
+++ b/patchwork/settings/base.py
@@ -303,3 +303,6 @@ FORGE_WEBHOOK_SECRETS = {}
# },
# }
FORGE_AUTH = {}
+
+# Base directory for git mirror clones (one bare repo per project)
+FORGE_GIT_MIRROR_PATH = ''
diff --git a/patchwork/tests/forge/test_git.py b/patchwork/tests/forge/test_git.py
new file mode 100644
index 0000000..64e187c
--- /dev/null
+++ b/patchwork/tests/forge/test_git.py
@@ -0,0 +1,278 @@
+# Patchwork - automated patch tracking system
+# Copyright (C) 2026 Robin Jarry <robin@jarry.cc>
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+
+import os
+import shutil
+import subprocess
+import tempfile
+import unittest
+
+from django.test import TestCase
+from django.test import override_settings
+
+from patchwork.forge import ForgeUser
+from patchwork.forge.git import GitMirror
+
+
+def _has_git():
+ try:
+ subprocess.run(
+ ['git', '--version'],
+ capture_output=True,
+ check=True,
+ )
+ return True
+ except (FileNotFoundError, subprocess.CalledProcessError):
+ return False
+
+
+def _run_git(*args, cwd=None):
+ env = dict(os.environ)
+ env.update(
+ {
+ 'GIT_CONFIG_GLOBAL': '/dev/null',
+ 'GIT_CONFIG_SYSTEM': '/dev/null',
+ 'GIT_TERMINAL_PROMPT': '0',
+ 'GIT_AUTHOR_NAME': 'Test Author',
+ 'GIT_AUTHOR_EMAIL': 'author@example.com',
+ 'GIT_COMMITTER_NAME': 'Test Author',
+ 'GIT_COMMITTER_EMAIL': 'author@example.com',
+ }
+ )
+ return subprocess.run(
+ ['git'] + list(args),
+ cwd=cwd,
+ env=env,
+ capture_output=True,
+ text=True,
+ check=True,
+ )
+
+
+@unittest.skipUnless(_has_git(), 'git is not installed')
+class GitMirrorTestBase(TestCase):
+ """
+ Base class that creates a bare "upstream" repo with a few commits
+ and a GitMirror clone of it.
+ """
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls.tmpdir = tempfile.mkdtemp(prefix='patchwork-test-git-')
+
+ # create upstream bare repo
+ cls.upstream_path = os.path.join(cls.tmpdir, 'upstream.git')
+ work = os.path.join(cls.tmpdir, 'work')
+ os.makedirs(work)
+ _run_git('init', cwd=work)
+ _run_git('commit', '--allow-empty', '-m', 'initial', cwd=work)
+
+ # tag the base for commit range
+ _run_git('tag', 'base', cwd=work)
+
+ # add commits
+ with open(os.path.join(work, 'a.txt'), 'w') as f:
+ f.write('aaa\n')
+ _run_git('add', 'a.txt', cwd=work)
+ _run_git(
+ 'commit',
+ '-m',
+ 'add file a\n\nSigned-off-by: Test Author <author@example.com>',
+ cwd=work,
+ )
+
+ with open(os.path.join(work, 'b.txt'), 'w') as f:
+ f.write('bbb\n')
+ _run_git('add', 'b.txt', cwd=work)
+ _run_git(
+ 'commit',
+ '-m',
+ 'add file b\n\nAcked-by: Reviewer <reviewer@example.com>',
+ cwd=work,
+ )
+
+ # clone as bare
+ _run_git('clone', '--mirror', work, cls.upstream_path)
+ shutil.rmtree(work)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tmpdir, ignore_errors=True)
+ super().tearDownClass()
+
+ def _make_mirror(self):
+ from unittest.mock import MagicMock
+
+ backend = MagicMock()
+ backend.repo_url.return_value = self.upstream_path
+ backend.get_auth.return_value = {}
+ backend.git_credentials.return_value = ''
+
+ forge_config = MagicMock()
+ forge_config.project.linkname = 'mirror'
+ forge_config.project.listemail = 'list@example.com'
+ forge_config.project.listid = 'list.example.com'
+ forge_config.sender_email = 'patchwork@example.com'
+
+ with override_settings(FORGE_GIT_MIRROR_PATH=self.tmpdir):
+ mirror = GitMirror(backend, forge_config)
+
+ return mirror
+
+
+class CommitCountTest(GitMirrorTestBase):
+ def test_count_commits(self):
+ mirror = self._make_mirror()
+ mirror.ensure_mirror()
+ with mirror.worktree('HEAD'):
+ count = mirror.commit_count('base')
+ self.assertEqual(count, 2)
+
+
+class RefExistsTest(GitMirrorTestBase):
+ def test_existing_ref(self):
+ mirror = self._make_mirror()
+ mirror.ensure_mirror()
+ with mirror.worktree('HEAD'):
+ self.assertTrue(mirror.ref_exists('base'))
+
+ def test_missing_ref(self):
+ mirror = self._make_mirror()
+ mirror.ensure_mirror()
+ with mirror.worktree('HEAD'):
+ self.assertFalse(mirror.ref_exists('nonexistent'))
+
+
+class TempWorktreeTest(GitMirrorTestBase):
+ def test_creates_and_cleans_up(self):
+ mirror = self._make_mirror()
+ mirror.ensure_mirror()
+ w = None
+ with mirror.worktree('HEAD'):
+ w = mirror.repo_dir()
+ self.assertTrue(os.path.isdir(w))
+ self.assertTrue(os.path.exists(os.path.join(w, 'a.txt')))
+ self.assertTrue(os.path.exists(os.path.join(w, 'b.txt')))
+ self.assertFalse(os.path.isdir(w))
+
+
+class RecipientsTest(GitMirrorTestBase):
+ def test_extracts_recipients(self):
+ mirror = self._make_mirror()
+ mirror.ensure_mirror()
+ with mirror.worktree('HEAD'):
+ recipients = list(mirror.recipients('base'))
+ self.assertIn('Test Author <author@example.com>', recipients)
+ self.assertIn('Reviewer <reviewer@example.com>', recipients)
+
+
+class FormatPatchesTest(GitMirrorTestBase):
+ def test_single_patch(self):
+ mirror = self._make_mirror()
+ mirror.ensure_mirror()
+ with mirror.worktree('HEAD'):
+ user = ForgeUser(
+ login='author', name='Test Author', email='author@example.com'
+ )
+ mbox = mirror.format_patches('HEAD~1', user)
+ messages = list(mbox)
+ self.assertEqual(len(messages), 1)
+ self.assertIn('[PATCH', messages[0].get('Subject'))
+
+ def test_multi_patch_with_cover(self):
+ mirror = self._make_mirror()
+ mirror.ensure_mirror()
+ with mirror.worktree('HEAD'):
+ user = ForgeUser(
+ login='author', name='Test Author', email='author@example.com'
+ )
+ mbox = mirror.format_patches(
+ 'base',
+ user,
+ cover_title='Test series',
+ cover_body='This is a test.',
+ )
+ messages = list(mbox)
+ # cover letter + 2 patches
+ self.assertEqual(len(messages), 3)
+ subjects = [m.get('Subject') for m in messages]
+ self.assertTrue(
+ any('0/2' in s for s in subjects),
+ f'no cover letter: {subjects}',
+ )
+
+ def test_version_numbering(self):
+ mirror = self._make_mirror()
+ mirror.ensure_mirror()
+ with mirror.worktree('HEAD'):
+ user = ForgeUser(
+ login='author', name='Test Author', email='author@example.com'
+ )
+ mbox = mirror.format_patches('base', user, version=2)
+ messages = list(mbox)
+ subjects = [m.get('Subject') for m in messages]
+ self.assertTrue(
+ all('v2' in s for s in subjects), f'no v2: {subjects}'
+ )
+
+ def test_extra_headers(self):
+ mirror = self._make_mirror()
+ mirror.ensure_mirror()
+ with mirror.worktree('HEAD'):
+ user = ForgeUser(
+ login='author', name='Test Author', email='author@example.com'
+ )
+ mbox = mirror.format_patches('HEAD~1', user)
+ messages = list(mbox)
+ msg = messages[0]
+ self.assertIn('ignore', msg.get('X-Patchwork-Hint', ''))
+ self.assertIn('list.example.com', msg.get('List-ID', ''))
+ self.assertIn('list@example.com', msg.get('Reply-To', ''))
+ self.assertIn('patchwork@example.com', msg.get('Sender', ''))
+
+ def test_in_reply_to(self):
+ mirror = self._make_mirror()
+ mirror.ensure_mirror()
+ with mirror.worktree('HEAD'):
+ user = ForgeUser(
+ login='author', name='Test Author', email='author@example.com'
+ )
+ mbox = mirror.format_patches(
+ 'base',
+ user,
+ cover_title='Test',
+ in_reply_to='<v1-cover@example.com>',
+ )
+ messages = list(mbox)
+ cover = messages[0]
+ self.assertIn(
+ 'v1-cover@example.com',
+ cover.get('In-Reply-To', ''),
+ )
+
+ def test_cc_from_trailers(self):
+ mirror = self._make_mirror()
+ mirror.ensure_mirror()
+ with mirror.worktree('HEAD'):
+ user = ForgeUser(
+ login='author', name='Test Author', email='author@example.com'
+ )
+ mbox = mirror.format_patches('base', user)
+ messages = list(mbox)
+ # check that reviewer from Acked-by is in Cc
+ all_cc = ' '.join(m.get('Cc', '') for m in messages)
+ self.assertIn('reviewer@example.com', all_cc)
+
+ def test_to_header(self):
+ mirror = self._make_mirror()
+ mirror.ensure_mirror()
+ with mirror.worktree('HEAD'):
+ user = ForgeUser(
+ login='author', name='Test Author', email='author@example.com'
+ )
+ mbox = mirror.format_patches('HEAD~1', user)
+ messages = list(mbox)
+ self.assertIn('list@example.com', messages[0].get('To', ''))