import logging import requests logger = logging.getLogger("authentik") # Ensure flow is only run during oauth logins via Github if context["source"].provider_type != "github": debug("Not a GitHub login attempt, skipping organization check") return True logger.debug(f"Source: {context['source']}") # The organization and team that users must be a member of accepted_org = "GridPane-Dev" accepted_team = "Zap" # Replace with your specific team name try: # Get the user-source connection object from the context, and get the access token if "goauthentik.io/sources/connection" not in context: logger.error("No source connection found in context") return False connection = context["goauthentik.io/sources/connection"] access_token = connection.access_token # We also access the user info authentik already retrieved, to get the correct username if "oauth_userinfo" not in context: logger.error("No oauth_userinfo found in context") return False github_userinfo = context["oauth_userinfo"] logger.debug(f"GitHub user info: {github_userinfo}") if "login" not in github_userinfo: logger.error("No login found in GitHub user info") return False github_username = github_userinfo["login"] # First, get the teams in the organization to find the team ID teams_response = requests.get( f"https://api.github.com/orgs/{accepted_org}/teams", headers={ "Authorization": f"Bearer {access_token}", "Accept": "application/vnd.github.v3+json" } ) if teams_response.status_code != 200: logger.error(f"GitHub API error when fetching teams: {teams_response.status_code} - {teams_response.text}") ak_message(f"Failed to verify GitHub team membership. Please contact support.") return False teams = teams_response.json() logger.debug(f"Teams in org: {teams}") # Find the ID of the accepted team team_id = None for team in teams: if team["name"] == accepted_team or team["slug"] == accepted_team.lower(): team_id = team["id"] break if not team_id: logger.error(f"Team '{accepted_team}' not found in organization '{accepted_org}'") ak_message(f"Configuration error: Team '{accepted_team}' not found. Please contact support.") return False # Check if the user is a member of the specific team team_membership_response = requests.get( f"https://api.github.com/teams/{team_id}/memberships/{github_username}", headers={ "Authorization": f"Bearer {access_token}", "Accept": "application/vnd.github.v3+json" } ) # 200 status means the user is a member, 404 means they're not if team_membership_response.status_code == 200: membership = team_membership_response.json() # Check if the membership state is active if membership["state"] == "active": logger.info(f"User '{github_username}' is an active member of team '{accepted_team}' in org '{accepted_org}', allowing login") return True else: logger.info(f"User '{github_username}' has a pending invitation to team '{accepted_team}' in org '{accepted_org}', denying login") ak_message(f"Access denied: Your invitation to the '{accepted_team}' team is still pending. Please accept it first.") return False else: logger.info(f"User '{github_username}' is not a member of team '{accepted_team}' in org '{accepted_org}', denying login") ak_message(f"Access denied: You are not a member of the '{accepted_team}' team in the '{accepted_org}' GitHub organization.") return False except Exception as e: logger.exception(f"Error checking GitHub team membership: {e}") ak_message("An error occurred while verifying your GitHub team membership. Please contact support.") return False