Skip to content

Commit

Permalink
Merge pull request #29 from OpenRailAssociation/handle-default-repo-p…
Browse files Browse the repository at this point in the history
…ermissions

Handle default repo permissions due to organization settings
  • Loading branch information
mxmehl authored Jul 12, 2024
2 parents 2a368c3 + cc2dd57 commit 37f6278
Showing 1 changed file with 106 additions and 28 deletions.
134 changes: 106 additions & 28 deletions gh_org_mgr/_gh_org.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class GHorg: # pylint: disable=too-many-instance-attributes
gh: Github = None # type: ignore
org: Organization = None # type: ignore
gh_token: str = ""
default_repository_permission: str = ""
org_owners: list[NamedUser] = field(default_factory=list)
org_members: list[NamedUser] = field(default_factory=list)
current_teams: dict[Team, dict] = field(default_factory=dict)
Expand Down Expand Up @@ -75,17 +76,16 @@ def pretty(d, indent=0):
# --------------------------------------------------------------------------
# Teams
# --------------------------------------------------------------------------
def get_current_teams(self):
def _get_current_teams(self):
"""Get teams of the existing organisation"""

for team in list(self.org.get_teams()):
self.current_teams[team] = {"members": {}, "repos": {}}

def create_missing_teams(self, dry: bool = False):
"""Find out which teams are configured but not part of the org yet"""

# Get list of current teams
self.get_current_teams()
self._get_current_teams()

# Get the names of the existing teams
existent_team_names = [team.name for team in self.current_teams]
Expand All @@ -108,7 +108,7 @@ def create_missing_teams(self, dry: bool = False):
logging.debug("Team '%s' already exists", team)

# Re-scan current teams as new ones may have been created
self.get_current_teams()
self._get_current_teams()

# --------------------------------------------------------------------------
# Members
Expand Down Expand Up @@ -474,13 +474,55 @@ def _get_highest_permission(self, *permissions: str) -> str:

return ""

def _get_direct_repo_permissions_of_team(self, team_dict: dict) -> tuple[dict[str, str], str]:
"""Get a list of directly configured repo permissions for a team, and
whether the team has a parent"""
repo_perms: dict[str, str] = {}
# Direct permissions
for repo, perm in team_dict.get("repos", {}).items():
repo_perms[repo] = perm

# Parent team
parent = team_dict.get("parent", "")

return repo_perms, parent

def _get_all_repo_permissions_for_team_and_parents(self, team_name: str, team_dict: dict):
"""Get a list of all configured repo permissions for a team, also those
inherited by parent teams"""
all_repo_perms, parent = self._get_direct_repo_permissions_of_team(team_dict=team_dict)
# If parents have been found, iterate and merge them
while parent:
logging.debug(
"Checking for repository permissions of %s's parent team %s", team_name, parent
)
parent_team_dict = self.configured_teams[parent]

# Handle empty parent dict
if not parent_team_dict:
break

# Get repo permissions and potential parent, and add it
repo_perm, parent = self._get_direct_repo_permissions_of_team(
team_dict=parent_team_dict
)
for repo, perm in repo_perm.items():
# Add (highest) repo permission
all_repo_perms[repo] = self._get_highest_permission(
perm, all_repo_perms.get(repo, "")
)

return all_repo_perms

def _get_configured_repos_and_user_perms(self):
"""
Get a list of repos with a list of individuals and their permissions,
based on their team memberships
"""
for _, team_attrs in self.configured_teams.items():
for repo, perm in team_attrs.get("repos", {}).items():
for team_name, team_attrs in self.configured_teams.items():
logging.debug("Getting configured repository permissions for team %s", team_name)
repo_perms = self._get_all_repo_permissions_for_team_and_parents(team_name, team_attrs)
for repo, perm in repo_perms.items():
# Create repo if non-exist
if repo not in self.configured_repos_collaborators:
self.configured_repos_collaborators[repo] = {}
Expand Down Expand Up @@ -513,35 +555,38 @@ def _convert_graphql_perm_to_rest(self, permission: str) -> str:
"""Convert a repo permission coming from the GraphQL API to the ones
coming from the REST API"""
perm_conversion = {
"READ": "pull",
"TRIAGE": "triage",
"WRITE": "push",
"MAINTAIN": "maintain",
"ADMIN": "admin",
"none": "",
"read": "pull",
"triage": "triage",
"write": "push",
"maintain": "maintain",
"admin": "admin",
}
if permission in perm_conversion:
replacement = perm_conversion.get(permission, "")
if permission.lower() in perm_conversion:
replacement = perm_conversion.get(permission.lower(), "")
return replacement

return permission

def _fetch_collaborators_of_repo(self, repo: Repository):
"""Get all collaborators (individuals) of a GitHub repo with their
permissions using the GraphQL API"""
# TODO: Consider doing this for all repositories at once, but calculate
# costs beforehand
query = """
query($owner: String!, $name: String!, $cursor: String) {
repository(owner: $owner, name: $name) {
collaborators(first: 20, after: $cursor) {
edges {
node {
login
}
permission
}
pageInfo {
endCursor
hasNextPage
}
repository(owner: $owner, name: $name) {
collaborators(first: 100, after: $cursor) {
edges {
node {
login
}
permission
}
pageInfo {
endCursor
hasNextPage
}
}
}
}
Expand All @@ -554,6 +599,7 @@ def _fetch_collaborators_of_repo(self, repo: Repository):
has_next_page = True

while has_next_page:
logging.debug("Requesting collaborators for %s", repo.name)
result = run_graphql_query(query, variables, self.gh_token)
try:
collaborators.extend(result["data"]["repository"]["collaborators"]["edges"])
Expand Down Expand Up @@ -586,6 +632,27 @@ def _get_current_repos_and_user_perms(self):
# Get users for this repo
self._fetch_collaborators_of_repo(repo)

def _get_default_repository_permission(self):
"""Get the default repository permission for all users. Convert to
admin/maintain/push/triage/pull scheme that the REST API provides"""
self.default_repository_permission = self._convert_graphql_perm_to_rest(
self.org.default_repository_permission
)

def _permission1_higher_than_permission2(self, permission1: str, permission2: str) -> bool:
"""Check whether permission 1 is higher than permission 2"""
perms_ranking = ["admin", "maintain", "push", "triage", "pull", ""]

def get_rank(permission):
return perms_ranking.index(permission) if permission in perms_ranking else 99

rank_permission1 = get_rank(permission1)
rank_permission2 = get_rank(permission2)

# The lower the index, the higher the permission. If lower than
# permission2, return True
return rank_permission1 < rank_permission2

def sync_repo_collaborator_permissions(self, dry: bool = False):
"""Compare the configured with the current repo permissions for all
repositories' collaborators"""
Expand All @@ -595,9 +662,13 @@ def sync_repo_collaborator_permissions(self, dry: bool = False):
# The resulting structure is:
# - configured_repos_collaborators: dict[Repository, dict[username, permission]]
# - current_repos_collaborators: dict[Repository, dict[username, permission]]
logging.debug("Starting to sync collaborator/individual permissions")
self._get_configured_repos_and_user_perms()
self._get_current_repos_and_user_perms()

# Get and convert the default permission for all members so we can check for it
self._get_default_repository_permission()

# Loop over all factually existing repositories. This will be a one-way
# sync. Team permissions have been set before, we are now removing
# surplus permissions. As no individual permissions are allowed, these
Expand All @@ -607,17 +678,21 @@ def sync_repo_collaborator_permissions(self, dry: bool = False):
# Get configured user permissions for this repo
try:
config_perm = self.configured_repos_collaborators[repo.name][username]
# There is no configured permission for this user in this repo
# There is no configured permission for this user in this repo,
# so we assume the default permission
except KeyError:
config_perm = ""
config_perm = self.default_repository_permission

if current_perm != config_perm:
# Evaluate whether current permission is higher than configured
# permission
if self._permission1_higher_than_permission2(current_perm, config_perm):
# Find out whether user has these unconfigured permissions
# due to being member of an unconfigured team. Check whether
# these are the same permissions as the team would get them.
unconfigured_team_repo_permission = self.unconfigured_team_repo_permissions.get(
repo.name, {}
).get(username, "")

if unconfigured_team_repo_permission:
if current_perm == unconfigured_team_repo_permission:
logging.info(
Expand All @@ -640,6 +715,9 @@ def sync_repo_collaborator_permissions(self, dry: bool = False):
current_perm,
)

# Remove person from repo, but only if their repository also
# diverges from the default repository permission given by
# the organization
logging.info(
"Remove %s from %s. They have '%s' there but should only have '%s'.",
username,
Expand Down

0 comments on commit 37f6278

Please sign in to comment.