github_org_policy.py
· 2.5 KiB · Python
原始文件
import logging
import requests
logger = logging.getLogger("authentik")
# Ensure flow is only run during oauth logins via Github
if context["source"].provider_type != "github":
debug("Not a GitHub login attempt, skipping organization check")
return True
logger.debug(f"Source: {context['source']}")
# The organization that users must be a member of
accepted_org = "GridPane-Dev"
try:
# Get the user-source connection object from the context, and get the access token
if "goauthentik.io/sources/connection" not in context:
logger.error("No source connection found in context")
return False
connection = context["goauthentik.io/sources/connection"]
access_token = connection.access_token
# We also access the user info authentik already retrieved, to get the correct username
if "oauth_userinfo" not in context:
logger.error("No oauth_userinfo found in context")
return False
github_userinfo = context["oauth_userinfo"]
logger.debug(f"GitHub user info: {github_userinfo}")
if "login" not in github_userinfo:
logger.error("No login found in GitHub user info")
return False
# GitHub API request to get user's organizations
orgs_response = requests.get(
"https://api.github.com/user/orgs",
headers={
"Authorization": f"Bearer {access_token}",
"Accept": "application/vnd.github.v3+json"
}
)
if orgs_response.status_code != 200:
logger.error(f"GitHub API error: {orgs_response.status_code} - {orgs_response.text}")
ak_message(f"Failed to verify GitHub organization membership. Please contact support.")
return False
orgs = orgs_response.json()
logger.debug(f"Organizations: {orgs}")
# Check if user is a member of the accepted organization
user_matched = any(org['login'] == accepted_org for org in orgs)
if not user_matched:
logger.info(f"User '{github_userinfo['login']}' is not a member of organization '{accepted_org}'")
ak_message(f"Access denied: You are not a member of the '{accepted_org}' GitHub organization.")
return False
logger.info(f"User '{github_userinfo['login']}' is a member of organization '{accepted_org}', allowing login")
return True
except Exception as e:
logger.exception(f"Error checking GitHub organization membership: {e}")
ak_message("An error occurred while verifying your GitHub organization membership. Please contact support.")
return False
| 1 | import logging |
| 2 | import requests |
| 3 | |
| 4 | logger = logging.getLogger("authentik") |
| 5 | |
| 6 | # Ensure flow is only run during oauth logins via Github |
| 7 | if context["source"].provider_type != "github": |
| 8 | debug("Not a GitHub login attempt, skipping organization check") |
| 9 | return True |
| 10 | |
| 11 | logger.debug(f"Source: {context['source']}") |
| 12 | |
| 13 | # The organization that users must be a member of |
| 14 | accepted_org = "GridPane-Dev" |
| 15 | |
| 16 | try: |
| 17 | # Get the user-source connection object from the context, and get the access token |
| 18 | if "goauthentik.io/sources/connection" not in context: |
| 19 | logger.error("No source connection found in context") |
| 20 | return False |
| 21 | |
| 22 | connection = context["goauthentik.io/sources/connection"] |
| 23 | access_token = connection.access_token |
| 24 | |
| 25 | # We also access the user info authentik already retrieved, to get the correct username |
| 26 | if "oauth_userinfo" not in context: |
| 27 | logger.error("No oauth_userinfo found in context") |
| 28 | return False |
| 29 | |
| 30 | github_userinfo = context["oauth_userinfo"] |
| 31 | logger.debug(f"GitHub user info: {github_userinfo}") |
| 32 | |
| 33 | if "login" not in github_userinfo: |
| 34 | logger.error("No login found in GitHub user info") |
| 35 | return False |
| 36 | |
| 37 | # GitHub API request to get user's organizations |
| 38 | orgs_response = requests.get( |
| 39 | "https://api.github.com/user/orgs", |
| 40 | headers={ |
| 41 | "Authorization": f"Bearer {access_token}", |
| 42 | "Accept": "application/vnd.github.v3+json" |
| 43 | } |
| 44 | ) |
| 45 | |
| 46 | if orgs_response.status_code != 200: |
| 47 | logger.error(f"GitHub API error: {orgs_response.status_code} - {orgs_response.text}") |
| 48 | ak_message(f"Failed to verify GitHub organization membership. Please contact support.") |
| 49 | return False |
| 50 | |
| 51 | orgs = orgs_response.json() |
| 52 | logger.debug(f"Organizations: {orgs}") |
| 53 | |
| 54 | # Check if user is a member of the accepted organization |
| 55 | user_matched = any(org['login'] == accepted_org for org in orgs) |
| 56 | |
| 57 | if not user_matched: |
| 58 | logger.info(f"User '{github_userinfo['login']}' is not a member of organization '{accepted_org}'") |
| 59 | ak_message(f"Access denied: You are not a member of the '{accepted_org}' GitHub organization.") |
| 60 | return False |
| 61 | |
| 62 | logger.info(f"User '{github_userinfo['login']}' is a member of organization '{accepted_org}', allowing login") |
| 63 | return True |
| 64 | |
| 65 | except Exception as e: |
| 66 | logger.exception(f"Error checking GitHub organization membership: {e}") |
| 67 | ak_message("An error occurred while verifying your GitHub organization membership. Please contact support.") |
| 68 | return False |