Utoljára aktív 1742026462

github_team_policy.py Eredeti
1import logging
2import requests
3
4logger = logging.getLogger("authentik")
5
6# Ensure flow is only run during oauth logins via Github
7if context["source"].provider_type != "github":
8 debug("Not a GitHub login attempt, skipping organization check")
9 return True
10
11logger.debug(f"Source: {context['source']}")
12
13# The organization and team that users must be a member of
14accepted_org = "GridPane-Dev"
15accepted_team = "Zap" # Replace with your specific team name
16
17try:
18 # Get the user-source connection object from the context, and get the access token
19 if "goauthentik.io/sources/connection" not in context:
20 logger.error("No source connection found in context")
21 return False
22
23 connection = context["goauthentik.io/sources/connection"]
24 access_token = connection.access_token
25
26 # We also access the user info authentik already retrieved, to get the correct username
27 if "oauth_userinfo" not in context:
28 logger.error("No oauth_userinfo found in context")
29 return False
30
31 github_userinfo = context["oauth_userinfo"]
32 logger.debug(f"GitHub user info: {github_userinfo}")
33
34 if "login" not in github_userinfo:
35 logger.error("No login found in GitHub user info")
36 return False
37
38 github_username = github_userinfo["login"]
39
40 # First, get the teams in the organization to find the team ID
41 teams_response = requests.get(
42 f"https://api.github.com/orgs/{accepted_org}/teams",
43 headers={
44 "Authorization": f"Bearer {access_token}",
45 "Accept": "application/vnd.github.v3+json"
46 }
47 )
48
49 if teams_response.status_code != 200:
50 logger.error(f"GitHub API error when fetching teams: {teams_response.status_code} - {teams_response.text}")
51 ak_message(f"Failed to verify GitHub team membership. Please contact support.")
52 return False
53
54 teams = teams_response.json()
55 logger.debug(f"Teams in org: {teams}")
56
57 # Find the ID of the accepted team
58 team_id = None
59 for team in teams:
60 if team["name"] == accepted_team or team["slug"] == accepted_team.lower():
61 team_id = team["id"]
62 break
63
64 if not team_id:
65 logger.error(f"Team '{accepted_team}' not found in organization '{accepted_org}'")
66 ak_message(f"Configuration error: Team '{accepted_team}' not found. Please contact support.")
67 return False
68
69 # Check if the user is a member of the specific team
70 team_membership_response = requests.get(
71 f"https://api.github.com/teams/{team_id}/memberships/{github_username}",
72 headers={
73 "Authorization": f"Bearer {access_token}",
74 "Accept": "application/vnd.github.v3+json"
75 }
76 )
77
78 # 200 status means the user is a member, 404 means they're not
79 if team_membership_response.status_code == 200:
80 membership = team_membership_response.json()
81
82 # Check if the membership state is active
83 if membership["state"] == "active":
84 logger.info(f"User '{github_username}' is an active member of team '{accepted_team}' in org '{accepted_org}', allowing login")
85 return True
86 else:
87 logger.info(f"User '{github_username}' has a pending invitation to team '{accepted_team}' in org '{accepted_org}', denying login")
88 ak_message(f"Access denied: Your invitation to the '{accepted_team}' team is still pending. Please accept it first.")
89 return False
90 else:
91 logger.info(f"User '{github_username}' is not a member of team '{accepted_team}' in org '{accepted_org}', denying login")
92 ak_message(f"Access denied: You are not a member of the '{accepted_team}' team in the '{accepted_org}' GitHub organization.")
93 return False
94
95except Exception as e:
96 logger.exception(f"Error checking GitHub team membership: {e}")
97 ak_message("An error occurred while verifying your GitHub team membership. Please contact support.")
98 return False