Source code for codegrade._api.restriction

"""The endpoints for restriction objects.

SPDX-License-Identifier: AGPL-3.0-only OR BSD-3-Clause-Clear
"""

from __future__ import annotations

import os
import typing as t

import cg_request_args as rqa
from cg_maybe import Maybe, Nothing
from cg_maybe.utils import maybe_from_nullable

from .. import paginated, parsers, utils

if t.TYPE_CHECKING or os.getenv("CG_EAGERIMPORT", False):
    from .. import client
    from ..models.enter_restriction_data import EnterRestrictionData
    from ..models.entry_overview_entry import EntryOverviewEntry
    from ..models.ip_range import IPRange
    from ..models.patch_entry_override_restriction_data import (
        PatchEntryOverrideRestrictionData,
    )
    from ..models.patch_restriction_data import PatchRestrictionData
    from ..models.restriction import Restriction
    from ..models.user_login_response import UserLoginResponse


_ClientT = t.TypeVar("_ClientT", bound="client._BaseClient")


[docs] class RestrictionService(t.Generic[_ClientT]): __slots__ = ("__client",) def __init__(self, client: _ClientT) -> None: self.__client = client
[docs] def put_allowed_ip( self: RestrictionService[client.AuthenticatedClient], *, restriction_id: str, ip_range: str, ) -> IPRange: """Add a single IP range to a restriction. :param restriction_id: The id of the restriction. :param ip_range: The IP range to add in CIDR notation. :returns: The added IP range. """ url = "/api/v1/restrictions/{restrictionId}/allowed_ips/{ipRange}".format( restrictionId=restriction_id, ipRange=ip_range ) params = None with self.__client as client: resp = client.http.put(url=url, params=params) utils.log_warnings(resp) if utils.response_code_matches(resp.status_code, 200): from ..models.ip_range import IPRange return parsers.JsonResponseParser( parsers.ParserFor.make(IPRange) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), )
[docs] def delete_allowed_ip( self: RestrictionService[client.AuthenticatedClient], *, restriction_id: str, ip_range: str, ) -> None: """Remove a single IP range from a restriction. :param restriction_id: The ID of the restriction. :param ip_range: The IP range to remove. :returns: An empty response. """ url = "/api/v1/restrictions/{restrictionId}/allowed_ips/{ipRange}".format( restrictionId=restriction_id, ipRange=ip_range ) params = None with self.__client as client: resp = client.http.delete(url=url, params=params) utils.log_warnings(resp) if utils.response_code_matches(resp.status_code, 204): return parsers.ConstantlyParser(None).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), )
[docs] def get_all_entries( self: RestrictionService[client.AuthenticatedClient], *, restriction_id: str, q: str = "", page_size: int = 50, ) -> paginated.Response[EntryOverviewEntry]: """List all enrolled students with their entry counts and overrides. :param restriction_id: The ID of the restriction. :param q: Only retrieve students whose name or username matches this value. :param page_size: The size of a single page, maximum is 100. :returns: A paginated list of entry overview entries per student. """ url = "/api/v1/restrictions/{restrictionId}/entries/".format( restrictionId=restriction_id ) params: t.Dict[str, str | int | bool] = { "q": q, "page-size": page_size, } if t.TYPE_CHECKING: import httpx def do_request(next_token: str | None) -> httpx.Response: if next_token is None: params.pop("next-token", "") else: params["next-token"] = next_token with self.__client as client: resp = client.http.get(url=url, params=params) utils.log_warnings(resp) return resp def parse_response( resp: httpx.Response, ) -> t.Sequence[EntryOverviewEntry]: if utils.response_code_matches(resp.status_code, 200): from ..models.entry_overview_entry import EntryOverviewEntry return parsers.JsonResponseParser( rqa.List(parsers.ParserFor.make(EntryOverviewEntry)) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), ) return paginated.Response(do_request, parse_response)
[docs] def enter( self: RestrictionService[client.AuthenticatedClient], json_body: EnterRestrictionData, *, restriction_id: str, ) -> UserLoginResponse: """Enter a restricted course, recording an entry event. Validates the password (if set), enforces the entry limit (if session_lockdown is set), records a :class:.RestrictionEntryEvent, and returns the caller's existing access token enriched with any new restrictions. The same token is reused so the student is not locked out if the response is lost in transit. :param json_body: The body of the request. See :class:`.EnterRestrictionData` for information about the possible fields. You can provide this data as a :class:`.EnterRestrictionData` or as a dictionary. :param restriction_id: The ID of the restriction. :returns: A login response containing the (updated) access token. """ url = "/api/v1/restrictions/{restrictionId}/entries/".format( restrictionId=restriction_id ) params = None with self.__client as client: resp = client.http.put( url=url, json=utils.to_dict(json_body), params=params ) utils.log_warnings(resp) if utils.response_code_matches(resp.status_code, 200): from ..models.user_login_response import UserLoginResponse return parsers.JsonResponseParser( parsers.ParserFor.make(UserLoginResponse) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), )
[docs] def get( self: RestrictionService[client.AuthenticatedClient], *, restriction_id: str, ) -> Restriction: """Get the full settings for a restriction (admin only). :param restriction_id: The id of the restriction. :returns: The restriction object including the password. """ url = "/api/v1/restrictions/{restrictionId}".format( restrictionId=restriction_id ) params = None with self.__client as client: resp = client.http.get(url=url, params=params) utils.log_warnings(resp) if utils.response_code_matches(resp.status_code, 200): from ..models.restriction import Restriction return parsers.JsonResponseParser( parsers.ParserFor.make(Restriction) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), )
[docs] def patch( self: RestrictionService[client.AuthenticatedClient], json_body: PatchRestrictionData, *, restriction_id: str, ) -> Restriction: """Update restriction settings. The request body mirrors the restriction JSON shape. All fields are optional; omitted fields are left unchanged. :param json_body: The body of the request. See :class:`.PatchRestrictionData` for information about the possible fields. You can provide this data as a :class:`.PatchRestrictionData` or as a dictionary. :param restriction_id: The id of the restriction. :returns: The updated restriction. """ url = "/api/v1/restrictions/{restrictionId}".format( restrictionId=restriction_id ) params = None with self.__client as client: resp = client.http.patch( url=url, json=utils.to_dict(json_body), params=params ) utils.log_warnings(resp) if utils.response_code_matches(resp.status_code, 200): from ..models.restriction import Restriction return parsers.JsonResponseParser( parsers.ParserFor.make(Restriction) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), )
[docs] def get_all_allowed_ips( self: RestrictionService[client.AuthenticatedClient], *, restriction_id: str, page_size: int = 50, ) -> paginated.Response[IPRange]: """Get all IP ranges for a restriction. :param restriction_id: The ID of the restriction. :param page_size: The size of a single page, maximum is 100. :returns: A paginated list of IP ranges. """ url = "/api/v1/restrictions/{restrictionId}/allowed_ips/".format( restrictionId=restriction_id ) params: t.Dict[str, str | int | bool] = { "page-size": page_size, } if t.TYPE_CHECKING: import httpx def do_request(next_token: str | None) -> httpx.Response: if next_token is None: params.pop("next-token", "") else: params["next-token"] = next_token with self.__client as client: resp = client.http.get(url=url, params=params) utils.log_warnings(resp) return resp def parse_response(resp: httpx.Response) -> t.Sequence[IPRange]: if utils.response_code_matches(resp.status_code, 200): from ..models.ip_range import IPRange return parsers.JsonResponseParser( rqa.List(parsers.ParserFor.make(IPRange)) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), ) return paginated.Response(do_request, parse_response)
[docs] def patch_entry_override( self: RestrictionService[client.AuthenticatedClient], json_body: PatchEntryOverrideRestrictionData, *, restriction_id: str, user_id: int, ) -> EntryOverviewEntry: """Set or remove a per-student entry override for a restriction. Sending `null` removes the override row entirely (the student falls back to the global `session_lockdown` limit). Sending a positive integer creates or updates the override for this student. :param json_body: The body of the request. See :class:`.PatchEntryOverrideRestrictionData` for information about the possible fields. You can provide this data as a :class:`.PatchEntryOverrideRestrictionData` or as a dictionary. :param restriction_id: The ID of the restriction. :param user_id: The ID of the student. :returns: The updated entry overview for this student. """ url = "/api/v1/restrictions/{restrictionId}/entries/{userId}".format( restrictionId=restriction_id, userId=user_id ) params = None with self.__client as client: resp = client.http.patch( url=url, json=utils.to_dict(json_body), params=params ) utils.log_warnings(resp) if utils.response_code_matches(resp.status_code, 200): from ..models.entry_overview_entry import EntryOverviewEntry return parsers.JsonResponseParser( parsers.ParserFor.make(EntryOverviewEntry) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), )