github_team_policy.py
· 3.9 KiB · Python
原始檔案
import logging
import requests
logger = logging.getLogger("authentik")
# Ensure flow is only run during oauth logins via Github
if context["source"].provider_type != "github":
debug("Not a GitHub login attempt, skipping organization check")
return True
logger.debug(f"Source: {context['source']}")
# The organization and team that users must be a member of
accepted_org = "GridPane-Dev"
accepted_team = "Zap" # Replace with your specific team name
try:
# Get the user-source connection object from the context, and get the access token
if "goauthentik.io/sources/connection" not in context:
logger.error("No source connection found in context")
return False
connection = context["goauthentik.io/sources/connection"]
access_token = connection.access_token
# We also access the user info authentik already retrieved, to get the correct username
if "oauth_userinfo" not in context:
logger.error("No oauth_userinfo found in context")
return False
github_userinfo = context["oauth_userinfo"]
logger.debug(f"GitHub user info: {github_userinfo}")
if "login" not in github_userinfo:
logger.error("No login found in GitHub user info")
return False
github_username = github_userinfo["login"]
# First, get the teams in the organization to find the team ID
teams_response = requests.get(
f"https://api.github.com/orgs/{accepted_org}/teams",
headers={
"Authorization": f"Bearer {access_token}",
"Accept": "application/vnd.github.v3+json"
}
)
if teams_response.status_code != 200:
logger.error(f"GitHub API error when fetching teams: {teams_response.status_code} - {teams_response.text}")
ak_message(f"Failed to verify GitHub team membership. Please contact support.")
return False
teams = teams_response.json()
logger.debug(f"Teams in org: {teams}")
# Find the ID of the accepted team
team_id = None
for team in teams:
if team["name"] == accepted_team or team["slug"] == accepted_team.lower():
team_id = team["id"]
break
if not team_id:
logger.error(f"Team '{accepted_team}' not found in organization '{accepted_org}'")
ak_message(f"Configuration error: Team '{accepted_team}' not found. Please contact support.")
return False
# Check if the user is a member of the specific team
team_membership_response = requests.get(
f"https://api.github.com/teams/{team_id}/memberships/{github_username}",
headers={
"Authorization": f"Bearer {access_token}",
"Accept": "application/vnd.github.v3+json"
}
)
# 200 status means the user is a member, 404 means they're not
if team_membership_response.status_code == 200:
membership = team_membership_response.json()
# Check if the membership state is active
if membership["state"] == "active":
logger.info(f"User '{github_username}' is an active member of team '{accepted_team}' in org '{accepted_org}', allowing login")
return True
else:
logger.info(f"User '{github_username}' has a pending invitation to team '{accepted_team}' in org '{accepted_org}', denying login")
ak_message(f"Access denied: Your invitation to the '{accepted_team}' team is still pending. Please accept it first.")
return False
else:
logger.info(f"User '{github_username}' is not a member of team '{accepted_team}' in org '{accepted_org}', denying login")
ak_message(f"Access denied: You are not a member of the '{accepted_team}' team in the '{accepted_org}' GitHub organization.")
return False
except Exception as e:
logger.exception(f"Error checking GitHub team membership: {e}")
ak_message("An error occurred while verifying your GitHub team membership. Please contact support.")
return False
| 1 | import logging |
| 2 | import requests |
| 3 | |
| 4 | logger = logging.getLogger("authentik") |
| 5 | |
| 6 | # Ensure flow is only run during oauth logins via Github |
| 7 | if context["source"].provider_type != "github": |
| 8 | debug("Not a GitHub login attempt, skipping organization check") |
| 9 | return True |
| 10 | |
| 11 | logger.debug(f"Source: {context['source']}") |
| 12 | |
| 13 | # The organization and team that users must be a member of |
| 14 | accepted_org = "GridPane-Dev" |
| 15 | accepted_team = "Zap" # Replace with your specific team name |
| 16 | |
| 17 | try: |
| 18 | # Get the user-source connection object from the context, and get the access token |
| 19 | if "goauthentik.io/sources/connection" not in context: |
| 20 | logger.error("No source connection found in context") |
| 21 | return False |
| 22 | |
| 23 | connection = context["goauthentik.io/sources/connection"] |
| 24 | access_token = connection.access_token |
| 25 | |
| 26 | # We also access the user info authentik already retrieved, to get the correct username |
| 27 | if "oauth_userinfo" not in context: |
| 28 | logger.error("No oauth_userinfo found in context") |
| 29 | return False |
| 30 | |
| 31 | github_userinfo = context["oauth_userinfo"] |
| 32 | logger.debug(f"GitHub user info: {github_userinfo}") |
| 33 | |
| 34 | if "login" not in github_userinfo: |
| 35 | logger.error("No login found in GitHub user info") |
| 36 | return False |
| 37 | |
| 38 | github_username = github_userinfo["login"] |
| 39 | |
| 40 | # First, get the teams in the organization to find the team ID |
| 41 | teams_response = requests.get( |
| 42 | f"https://api.github.com/orgs/{accepted_org}/teams", |
| 43 | headers={ |
| 44 | "Authorization": f"Bearer {access_token}", |
| 45 | "Accept": "application/vnd.github.v3+json" |
| 46 | } |
| 47 | ) |
| 48 | |
| 49 | if teams_response.status_code != 200: |
| 50 | logger.error(f"GitHub API error when fetching teams: {teams_response.status_code} - {teams_response.text}") |
| 51 | ak_message(f"Failed to verify GitHub team membership. Please contact support.") |
| 52 | return False |
| 53 | |
| 54 | teams = teams_response.json() |
| 55 | logger.debug(f"Teams in org: {teams}") |
| 56 | |
| 57 | # Find the ID of the accepted team |
| 58 | team_id = None |
| 59 | for team in teams: |
| 60 | if team["name"] == accepted_team or team["slug"] == accepted_team.lower(): |
| 61 | team_id = team["id"] |
| 62 | break |
| 63 | |
| 64 | if not team_id: |
| 65 | logger.error(f"Team '{accepted_team}' not found in organization '{accepted_org}'") |
| 66 | ak_message(f"Configuration error: Team '{accepted_team}' not found. Please contact support.") |
| 67 | return False |
| 68 | |
| 69 | # Check if the user is a member of the specific team |
| 70 | team_membership_response = requests.get( |
| 71 | f"https://api.github.com/teams/{team_id}/memberships/{github_username}", |
| 72 | headers={ |
| 73 | "Authorization": f"Bearer {access_token}", |
| 74 | "Accept": "application/vnd.github.v3+json" |
| 75 | } |
| 76 | ) |
| 77 | |
| 78 | # 200 status means the user is a member, 404 means they're not |
| 79 | if team_membership_response.status_code == 200: |
| 80 | membership = team_membership_response.json() |
| 81 | |
| 82 | # Check if the membership state is active |
| 83 | if membership["state"] == "active": |
| 84 | logger.info(f"User '{github_username}' is an active member of team '{accepted_team}' in org '{accepted_org}', allowing login") |
| 85 | return True |
| 86 | else: |
| 87 | logger.info(f"User '{github_username}' has a pending invitation to team '{accepted_team}' in org '{accepted_org}', denying login") |
| 88 | ak_message(f"Access denied: Your invitation to the '{accepted_team}' team is still pending. Please accept it first.") |
| 89 | return False |
| 90 | else: |
| 91 | logger.info(f"User '{github_username}' is not a member of team '{accepted_team}' in org '{accepted_org}', denying login") |
| 92 | ak_message(f"Access denied: You are not a member of the '{accepted_team}' team in the '{accepted_org}' GitHub organization.") |
| 93 | return False |
| 94 | |
| 95 | except Exception as e: |
| 96 | logger.exception(f"Error checking GitHub team membership: {e}") |
| 97 | ak_message("An error occurred while verifying your GitHub team membership. Please contact support.") |
| 98 | return False |