Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch all collaborators of all repos at once over GraphQL #61

Merged
merged 5 commits into from
Dec 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 199 additions & 39 deletions gh_org_mgr/_gh_org.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class GHorg: # pylint: disable=too-many-instance-attributes, too-many-lines
configured_teams: dict[str, dict | None] = field(default_factory=dict)
newly_added_users: list[NamedUser] = field(default_factory=list)
current_repos_teams: dict[Repository, dict[Team, str]] = field(default_factory=dict)
graphql_repos_collaborators: dict[str, list[dict]] = field(default_factory=dict)
current_repos_collaborators: dict[Repository, dict[str, str]] = field(default_factory=dict)
configured_repos_collaborators: dict[str, dict[str, str]] = field(default_factory=dict)
archived_repos: list[Repository] = field(default_factory=list)
Expand Down Expand Up @@ -115,7 +116,7 @@ def censor_half_string(string: str) -> str:
half2 = len(string) - half1
return string[:half1] + "*" * (half2)

sensible_keys = ["gh_token"]
sensible_keys = ["gh_token", "gh_app_private_key"]
for key in sensible_keys:
if value := dictionary.get(key, ""):
dictionary[key] = censor_half_string(value)
Expand Down Expand Up @@ -946,69 +947,228 @@ def _convert_graphql_perm_to_rest(self, permission: str) -> str:

return permission

def _fetch_collaborators_of_repo(self, repo: Repository):
"""Get all collaborators (individuals) of a GitHub repo with their
permissions using the GraphQL API"""
# TODO: Consider doing this for all repositories at once, but calculate
# costs beforehand
def _fetch_collaborators_of_all_organization_repos(self) -> None:
"""Get all collaborators (individuals) of all repos of a GitHub
organization with their permissions using the GraphQL API"""

graphql_query = """
query($owner: String!, $name: String!, $cursor: String) {
repository(owner: $owner, name: $name) {
collaborators(first: 100, after: $cursor) {
query($owner: String!, $cursor: String) {
organization(login: $owner) {
repositories(first: 100, after: $cursor) {
edges {
node {
login
name
collaborators(first: 100) {
edges {
node {
login
}
permission
}
pageInfo {
endCursor
hasNextPage
}
}
}
permission
}
pageInfo {
endCursor
hasNextPage
}
}
}
}
}
"""

# Initial query parameters
variables = {"owner": self.org.login, "name": repo.name, "cursor": None}
# Initial query parameters for org-level request
variables = {"owner": self.org.login, "cursor": None}

# dict in which we store repos for which there are more than 100
# collaborators, and their respective end cursors
next_page_cursors_for_repos: dict[str, str] = {}

more_repos_in_org = True
while more_repos_in_org:
logging.debug("Requesting collaborators for %s", self.org.login)
org_result = run_graphql_query(graphql_query, variables, self.gh_token)
more_repos_in_org, variables["cursor"] = self._extract_data_from_graphql_response(
graphql_response=org_result, next_page_cursors_for_repos=next_page_cursors_for_repos
)

# If there are more than 100 collaborators in a repo, we need to fetch
# rest via individual GraphQL queries
if next_page_cursors_for_repos:
logging.debug(
"Not all collaborators of all repos have been fetched. Missing data: %s",
next_page_cursors_for_repos,
)
for repo_name, end_cursor in next_page_cursors_for_repos.items():
more_collaborators_in_repo = True
while more_collaborators_in_repo:
logging.debug("Requesting additional collaborators for repo %s", repo_name)
# Initial query parameters for repo-level request
repo_variables = {
"owner": self.org.login,
"repo": repo_name,
"cursor": end_cursor,
}
repo_query = """
query($owner: String!, $repo: String!, $cursor: String) {
repository(owner: $owner, name: $repo) {
collaborators(first: 100, after: $cursor) {
edges {
node {
login
}
permission
}
pageInfo {
endCursor
hasNextPage
}
}
}
}
"""
repo_result = run_graphql_query(repo_query, repo_variables, self.gh_token)
more_collaborators_in_repo, end_cursor = (
self._extract_data_from_graphql_response(
graphql_response=repo_result,
next_page_cursors_for_repos=next_page_cursors_for_repos,
single_repo_name=repo_name,
)
)

# All collaborators from all repos have been fetched, now populate the
# actual dictionary
self._populate_current_repos_collaborators()

def _extract_data_from_graphql_response(
self,
graphql_response: dict,
next_page_cursors_for_repos: dict[str, str],
single_repo_name: str = "",
) -> tuple[bool, str]:
"""
Extracts collaborator data from a GraphQL response for either an
organization or a single repository.

Args:
graphql_response (dict): The GraphQL response containing the data.
next_page_cursors_for_repos (dict[str, str]): A dictionary to store
the next page cursors for repositories.
single_repo_name (str, optional): The name of a single repository to
extract data for. Defaults to "".

Returns:
tuple[bool, str]: A tuple containing a boolean indicating if there
is a next page and a string for the cursor.
- For organization level extraction:
- bool: Indicates if there is a next page of repositories.
- str: The cursor for the next page of repositories.
- For single repository extraction:
- bool: Indicates if there is a next page of collaborators.
- str: The cursor for the next page of collaborators.

Raises:
SystemExit: If a repository name is not found in the GraphQL
response at the organization level.

This method processes the GraphQL response to extract information about
repositories and their collaborators. It handles pagination by
identifying if there are more pages of repositories or collaborators to
be fetched.
"""
if not single_repo_name and "organization" in graphql_response["data"]:
logging.debug("Extracting collaborators for organization from GraphQL response")

# Initialise returns
org_has_next_page = graphql_response["data"]["organization"]["repositories"][
"pageInfo"
]["hasNextPage"]
org_cursor = graphql_response["data"]["organization"]["repositories"]["pageInfo"][
"endCursor"
]

for repo_edges in graphql_response["data"]["organization"]["repositories"]["edges"]:
try:
repo_name: str = repo_edges["node"]["name"]
logging.debug(
"Extracting collaborators for %s from GraphQL response", repo_name
)
except KeyError:
logging.error(
"Did not find a repo name in the GraphQL response "
"(organization level) which seems to hint to a bug: %s",
repo_edges,
)
sys.exit(1)

# fill in collaborators of repo
try:
repo_collaborators = repo_edges["node"]["collaborators"]["edges"]
self.graphql_repos_collaborators[repo_name] = repo_collaborators
except (TypeError, KeyError):
logging.debug("Repo %s does not seem to have any collaborators", repo_name)

# Find out if there are more than 100 collaborators in the
# GraphQL response for this repo
if repo_edges["node"]["collaborators"]["pageInfo"]["hasNextPage"]:
next_page_cursors_for_repos[repo_name] = repo_edges["node"]["collaborators"][
"pageInfo"
]["endCursor"]

collaborators = []
has_next_page = True
return org_has_next_page, org_cursor

while has_next_page:
logging.debug("Requesting collaborators for %s", repo.name)
result = run_graphql_query(graphql_query, variables, self.gh_token)
if single_repo_name and "repository" in graphql_response["data"]:
logging.debug(
"Extracting collaborators for repository %s from GraphQL response", single_repo_name
)

# Initialise returns
repo_has_next_page = graphql_response["data"]["repository"]["collaborators"][
"pageInfo"
]["hasNextPage"]
repo_cursor = graphql_response["data"]["repository"]["collaborators"]["pageInfo"][
"endCursor"
]

# fill in collaborators of repo
try:
collaborators.extend(result["data"]["repository"]["collaborators"]["edges"])
has_next_page = result["data"]["repository"]["collaborators"]["pageInfo"][
"hasNextPage"
]
variables["cursor"] = result["data"]["repository"]["collaborators"]["pageInfo"][
"endCursor"
repo_collaborators = graphql_response["data"]["repository"]["collaborators"][
"edges"
]
self.graphql_repos_collaborators[single_repo_name].extend(repo_collaborators)
except (TypeError, KeyError):
logging.debug("Repo %s does not seem to have any collaborators", repo.name)
continue

# Extract relevant data
for collaborator in collaborators:
login: str = collaborator["node"]["login"]
# Skip entry if collaborator is org owner, which is "admin" anyway
if login.lower() in [user.login.lower() for user in self.current_org_owners]:
continue
permission = self._convert_graphql_perm_to_rest(collaborator["permission"])
self.current_repos_collaborators[repo][login.lower()] = permission
logging.debug("Repo %s does not seem to have any collaborators", single_repo_name)

return repo_has_next_page, repo_cursor

logging.warning("No relevant data found in GraphQL response")
logging.debug("GraphQL response: %s", graphql_response)
return False, ""

def _populate_current_repos_collaborators(self) -> None:
"""Populate self.current_repos_collaborators with data from repo_collaborators"""
for repo, collaborators in self.current_repos_collaborators.items():
if repo.name in self.graphql_repos_collaborators:
# Extract each collaborator from the GraphQL response for this repo
for collaborator in self.graphql_repos_collaborators[repo.name]:
login: str = collaborator["node"]["login"]
# Skip entry if collaborator is org owner, which is "admin" anyway
if login.lower() in [user.login.lower() for user in self.current_org_owners]:
continue
permission = self._convert_graphql_perm_to_rest(collaborator["permission"])
collaborators[login.lower()] = permission

def _get_current_repos_and_user_perms(self):
"""Get all repos, their current collaborators and their permissions"""
# We copy the list of repos from self.current_repos_teams
for repo in self.current_repos_teams:
self.current_repos_collaborators[repo] = {}

for repo in self.current_repos_collaborators:
# Get users for this repo
self._fetch_collaborators_of_repo(repo)
self._fetch_collaborators_of_all_organization_repos()

def _get_default_repository_permission(self):
"""Get the default repository permission for all users. Convert to
Expand Down
Loading