197 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			197 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/python
 | 
						|
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 | 
						|
# implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
 | 
						|
 | 
						|
import argparse
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import pathlib
 | 
						|
import shutil
 | 
						|
import subprocess
 | 
						|
import warnings
 | 
						|
 | 
						|
from git import Repo
 | 
						|
from git import SymbolicReference
 | 
						|
 | 
						|
 | 
						|
import otc_metadata.services
 | 
						|
 | 
						|
data = otc_metadata.services.Services()
 | 
						|
 | 
						|
 | 
						|
def process_repositories(args, service):
 | 
						|
    """Checkout repositories
 | 
						|
 | 
						|
    """
 | 
						|
    workdir = pathlib.Path(args.work_dir)
 | 
						|
    workdir.mkdir(exist_ok=True)
 | 
						|
 | 
						|
    copy_from = None
 | 
						|
    copy_to = None
 | 
						|
    repo_to = None
 | 
						|
    url_from = None
 | 
						|
    url_to = None
 | 
						|
 | 
						|
    for repo in service["repositories"]:
 | 
						|
        logging.debug(f"Processing repository {repo}")
 | 
						|
        repo_dir = pathlib.Path(
 | 
						|
            workdir, repo["type"], repo["repo"]
 | 
						|
        )
 | 
						|
 | 
						|
        if repo["environment"] == args.source_environment:
 | 
						|
            copy_from = repo_dir
 | 
						|
        elif repo["environment"] == args.target_environment:
 | 
						|
            copy_to = repo_dir
 | 
						|
        else:
 | 
						|
            continue
 | 
						|
 | 
						|
        checkout_exists = repo_dir.exists()
 | 
						|
        repo_dir.mkdir(parents=True, exist_ok=True)
 | 
						|
        if repo["type"] == 'gitea':
 | 
						|
            repo_url = (
 | 
						|
                f"ssh://git@gitea.eco.tsi-dev.otc-service.com:2222/"
 | 
						|
                f"{repo['repo']}"
 | 
						|
            )
 | 
						|
        elif repo["type"] == 'github':
 | 
						|
            repo_url = f"git@github.com:/{repo['repo']}"
 | 
						|
        else:
 | 
						|
            logging.error(f"Repository type {repo['type']} is not supported")
 | 
						|
            exit(1)
 | 
						|
        if not checkout_exists:
 | 
						|
            git_repo = Repo.clone_from(
 | 
						|
                repo_url,
 | 
						|
                repo_dir, branch='main')
 | 
						|
        else:
 | 
						|
            logging.debug(f"Checkout already exists")
 | 
						|
            git_repo = Repo(repo_dir)
 | 
						|
            git_repo.remotes.origin.fetch()
 | 
						|
            git_repo.heads.main.checkout()
 | 
						|
            git_repo.remotes.origin.pull()
 | 
						|
 | 
						|
        if repo["environment"] == args.source_environment:
 | 
						|
            url_from = repo_url
 | 
						|
        elif repo["environment"] == args.target_environment:
 | 
						|
            url_to = repo_url
 | 
						|
            repo_to = git_repo
 | 
						|
    logging.debug(f"Synchronizing from={url_from}, to={url_to}")
 | 
						|
 | 
						|
    if not copy_from or not copy_to:
 | 
						|
        logging.warn(
 | 
						|
            "Not synchronizing documents of the service. "
 | 
						|
            "Target or source not known.")
 | 
						|
 | 
						|
    for doc in data.docs_by_service_type(service["service_type"]):
 | 
						|
        logging.debug(f"Analyzing document {doc}")
 | 
						|
        if args.document_type and doc.get("type") != args.document_type:
 | 
						|
            logging.info(
 | 
						|
                f"Skipping synchronizing {doc['title']} "
 | 
						|
                f"due to the doc-type filter.")
 | 
						|
            continue
 | 
						|
        if doc.get("environment"):
 | 
						|
            logging.info(
 | 
						|
                f"Skipping synchronizing {doc['title']} "
 | 
						|
                f"since it is environment bound.")
 | 
						|
            continue
 | 
						|
 | 
						|
        branch_name = f"{args.branch_name}#{doc['type']}"
 | 
						|
        remote_ref = SymbolicReference.create(
 | 
						|
            repo_to, "refs/remotes/origin/%s" % branch_name)
 | 
						|
        new_branch = repo_to.create_head(branch_name, 'main')
 | 
						|
        remote_ref = repo_to.remotes[0].refs[branch_name]  # get correct type
 | 
						|
        new_branch.set_tracking_branch(remote_ref)
 | 
						|
        new_branch.checkout()
 | 
						|
 | 
						|
        shutil.copytree(
 | 
						|
            pathlib.Path(copy_from, doc["rst_location"]),
 | 
						|
            pathlib.Path(copy_to, doc["rst_location"]),
 | 
						|
            ignore=lambda a, b: ["conf.py"],
 | 
						|
            dirs_exist_ok=True
 | 
						|
        )
 | 
						|
        repo_to.index.add([doc["rst_location"]])
 | 
						|
        if len(repo_to.index.diff("HEAD")) == 0:
 | 
						|
            # Nothing to commit
 | 
						|
            logging.debug("No changes.")
 | 
						|
            continue
 | 
						|
        repo_to.index.commit(
 | 
						|
            (
 | 
						|
                f"Synchronize {doc['title']}\n\n"
 | 
						|
                f"Overwriting document\n"
 | 
						|
                f"from: {url_from}\n"
 | 
						|
                f"to:   {url_to}\n\n"
 | 
						|
                "Performed-by: gitea/infra/otc-metadata/tools/sync_doc_repo.py"
 | 
						|
            )
 | 
						|
        )
 | 
						|
        repo_to.remotes.origin.push(new_branch)
 | 
						|
        if 'github' in url_to and args.open_pr_gh:
 | 
						|
            subprocess.run(
 | 
						|
                args=['gh', 'pr', 'create', '-f'],
 | 
						|
                cwd=copy_to,
 | 
						|
                check=True
 | 
						|
            )
 | 
						|
 | 
						|
 | 
						|
def main():
 | 
						|
    parser = argparse.ArgumentParser(
 | 
						|
        description='Synchronize document between environments.')
 | 
						|
    parser.add_argument(
 | 
						|
        '--source-environment',
 | 
						|
        required=True,
 | 
						|
        help='Environment to be used as a source'
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        '--target-environment',
 | 
						|
        required=True,
 | 
						|
        help='Environment to be used as a source'
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        '--service-type',
 | 
						|
        required=True,
 | 
						|
        help='Service to which document(s) belongs to'
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        '--document-type',
 | 
						|
        help=(
 | 
						|
            'Type of the document to synchronize. '
 | 
						|
            'All will be synchronized if not set.'
 | 
						|
        )
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        '--branch-name',
 | 
						|
        required=True,
 | 
						|
        help='Branch name to be used for synchronizing.'
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        '--work-dir',
 | 
						|
        required=True,
 | 
						|
        help='Working directory to use for repository checkout.'
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        '--open-pr-gh',
 | 
						|
        action='store_true',
 | 
						|
        help='Open Pull Request using `gh` utility (need to be present).'
 | 
						|
    )
 | 
						|
    args = parser.parse_args()
 | 
						|
    logging.basicConfig(level=logging.DEBUG)
 | 
						|
    service = data.service_dict.get(args.service_type)
 | 
						|
    if not service:
 | 
						|
        warnings.warn(f"Service {args.service_type} was not found in metadata")
 | 
						|
        os.exit(1)
 | 
						|
 | 
						|
    process_repositories(args, service)
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    main()
 |