"""The endpoints for restriction objects.
SPDX-License-Identifier: AGPL-3.0-only OR BSD-3-Clause-Clear
"""
from __future__ import annotations
import os
import typing as t
import cg_request_args as rqa
from cg_maybe import Maybe, Nothing
from cg_maybe.utils import maybe_from_nullable
from .. import paginated, parsers, utils
if t.TYPE_CHECKING or os.getenv("CG_EAGERIMPORT", False):
from .. import client
from ..models.enter_restriction_data import EnterRestrictionData
from ..models.entry_overview_entry import EntryOverviewEntry
from ..models.ip_range import IPRange
from ..models.patch_entry_override_restriction_data import (
PatchEntryOverrideRestrictionData,
)
from ..models.patch_restriction_data import PatchRestrictionData
from ..models.restriction import Restriction
from ..models.user_login_response import UserLoginResponse
_ClientT = t.TypeVar("_ClientT", bound="client._BaseClient")
[docs]
class RestrictionService(t.Generic[_ClientT]):
__slots__ = ("__client",)
def __init__(self, client: _ClientT) -> None:
self.__client = client
[docs]
def put_allowed_ip(
self: RestrictionService[client.AuthenticatedClient],
*,
restriction_id: str,
ip_range: str,
) -> IPRange:
"""Add a single IP range to a restriction.
:param restriction_id: The id of the restriction.
:param ip_range: The IP range to add in CIDR notation.
:returns: The added IP range.
"""
url = "/api/v1/restrictions/{restrictionId}/allowed_ips/{ipRange}".format(
restrictionId=restriction_id, ipRange=ip_range
)
params = None
with self.__client as client:
resp = client.http.put(url=url, params=params)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.ip_range import IPRange
return parsers.JsonResponseParser(
parsers.ParserFor.make(IPRange)
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
[docs]
def delete_allowed_ip(
self: RestrictionService[client.AuthenticatedClient],
*,
restriction_id: str,
ip_range: str,
) -> None:
"""Remove a single IP range from a restriction.
:param restriction_id: The ID of the restriction.
:param ip_range: The IP range to remove.
:returns: An empty response.
"""
url = "/api/v1/restrictions/{restrictionId}/allowed_ips/{ipRange}".format(
restrictionId=restriction_id, ipRange=ip_range
)
params = None
with self.__client as client:
resp = client.http.delete(url=url, params=params)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 204):
return parsers.ConstantlyParser(None).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
[docs]
def get_all_entries(
self: RestrictionService[client.AuthenticatedClient],
*,
restriction_id: str,
q: str = "",
page_size: int = 50,
) -> paginated.Response[EntryOverviewEntry]:
"""List all enrolled students with their entry counts and overrides.
:param restriction_id: The ID of the restriction.
:param q: Only retrieve students whose name or username matches this
value.
:param page_size: The size of a single page, maximum is 100.
:returns: A paginated list of entry overview entries per student.
"""
url = "/api/v1/restrictions/{restrictionId}/entries/".format(
restrictionId=restriction_id
)
params: t.Dict[str, str | int | bool] = {
"q": q,
"page-size": page_size,
}
if t.TYPE_CHECKING:
import httpx
def do_request(next_token: str | None) -> httpx.Response:
if next_token is None:
params.pop("next-token", "")
else:
params["next-token"] = next_token
with self.__client as client:
resp = client.http.get(url=url, params=params)
utils.log_warnings(resp)
return resp
def parse_response(
resp: httpx.Response,
) -> t.Sequence[EntryOverviewEntry]:
if utils.response_code_matches(resp.status_code, 200):
from ..models.entry_overview_entry import EntryOverviewEntry
return parsers.JsonResponseParser(
rqa.List(parsers.ParserFor.make(EntryOverviewEntry))
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
return paginated.Response(do_request, parse_response)
[docs]
def enter(
self: RestrictionService[client.AuthenticatedClient],
json_body: EnterRestrictionData,
*,
restriction_id: str,
) -> UserLoginResponse:
"""Enter a restricted course, recording an entry event.
Validates the password (if set), enforces the entry limit (if
session_lockdown is set), records a :class:.RestrictionEntryEvent, and
returns the caller's existing access token enriched with any new
restrictions. The same token is reused so the student is not locked out
if the response is lost in transit.
:param json_body: The body of the request. See
:class:`.EnterRestrictionData` for information about the possible
fields. You can provide this data as a
:class:`.EnterRestrictionData` or as a dictionary.
:param restriction_id: The ID of the restriction.
:returns: A login response containing the (updated) access token.
"""
url = "/api/v1/restrictions/{restrictionId}/entries/".format(
restrictionId=restriction_id
)
params = None
with self.__client as client:
resp = client.http.put(
url=url, json=utils.to_dict(json_body), params=params
)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.user_login_response import UserLoginResponse
return parsers.JsonResponseParser(
parsers.ParserFor.make(UserLoginResponse)
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
[docs]
def get(
self: RestrictionService[client.AuthenticatedClient],
*,
restriction_id: str,
) -> Restriction:
"""Get the full settings for a restriction (admin only).
:param restriction_id: The id of the restriction.
:returns: The restriction object including the password.
"""
url = "/api/v1/restrictions/{restrictionId}".format(
restrictionId=restriction_id
)
params = None
with self.__client as client:
resp = client.http.get(url=url, params=params)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.restriction import Restriction
return parsers.JsonResponseParser(
parsers.ParserFor.make(Restriction)
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
[docs]
def patch(
self: RestrictionService[client.AuthenticatedClient],
json_body: PatchRestrictionData,
*,
restriction_id: str,
) -> Restriction:
"""Update restriction settings.
The request body mirrors the restriction JSON shape. All fields are
optional; omitted fields are left unchanged.
:param json_body: The body of the request. See
:class:`.PatchRestrictionData` for information about the possible
fields. You can provide this data as a
:class:`.PatchRestrictionData` or as a dictionary.
:param restriction_id: The id of the restriction.
:returns: The updated restriction.
"""
url = "/api/v1/restrictions/{restrictionId}".format(
restrictionId=restriction_id
)
params = None
with self.__client as client:
resp = client.http.patch(
url=url, json=utils.to_dict(json_body), params=params
)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.restriction import Restriction
return parsers.JsonResponseParser(
parsers.ParserFor.make(Restriction)
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
[docs]
def get_all_allowed_ips(
self: RestrictionService[client.AuthenticatedClient],
*,
restriction_id: str,
page_size: int = 50,
) -> paginated.Response[IPRange]:
"""Get all IP ranges for a restriction.
:param restriction_id: The ID of the restriction.
:param page_size: The size of a single page, maximum is 100.
:returns: A paginated list of IP ranges.
"""
url = "/api/v1/restrictions/{restrictionId}/allowed_ips/".format(
restrictionId=restriction_id
)
params: t.Dict[str, str | int | bool] = {
"page-size": page_size,
}
if t.TYPE_CHECKING:
import httpx
def do_request(next_token: str | None) -> httpx.Response:
if next_token is None:
params.pop("next-token", "")
else:
params["next-token"] = next_token
with self.__client as client:
resp = client.http.get(url=url, params=params)
utils.log_warnings(resp)
return resp
def parse_response(resp: httpx.Response) -> t.Sequence[IPRange]:
if utils.response_code_matches(resp.status_code, 200):
from ..models.ip_range import IPRange
return parsers.JsonResponseParser(
rqa.List(parsers.ParserFor.make(IPRange))
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
return paginated.Response(do_request, parse_response)
[docs]
def patch_entry_override(
self: RestrictionService[client.AuthenticatedClient],
json_body: PatchEntryOverrideRestrictionData,
*,
restriction_id: str,
user_id: int,
) -> EntryOverviewEntry:
"""Set or remove a per-student entry override for a restriction.
Sending `null` removes the override row entirely (the student falls
back to the global `session_lockdown` limit). Sending a positive
integer creates or updates the override for this student.
:param json_body: The body of the request. See
:class:`.PatchEntryOverrideRestrictionData` for information about
the possible fields. You can provide this data as a
:class:`.PatchEntryOverrideRestrictionData` or as a dictionary.
:param restriction_id: The ID of the restriction.
:param user_id: The ID of the student.
:returns: The updated entry overview for this student.
"""
url = "/api/v1/restrictions/{restrictionId}/entries/{userId}".format(
restrictionId=restriction_id, userId=user_id
)
params = None
with self.__client as client:
resp = client.http.patch(
url=url, json=utils.to_dict(json_body), params=params
)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.entry_overview_entry import EntryOverviewEntry
return parsers.JsonResponseParser(
parsers.ParserFor.make(EntryOverviewEntry)
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)