File: //opt/imunify360/venv/lib/python3.11/site-packages/sentry_sdk/integrations/sanic.py
import sys
import weakref
from inspect import isawaitable
from sentry_sdk._compat import urlparse, reraise
from sentry_sdk.hub import Hub
from sentry_sdk.utils import (
    capture_internal_exceptions,
    event_from_exception,
    HAS_REAL_CONTEXTVARS,
    CONTEXTVARS_ERROR_MESSAGE,
)
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk.integrations._wsgi_common import RequestExtractor, _filter_headers
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk._types import MYPY
if MYPY:
    from typing import Any
    from typing import Callable
    from typing import Optional
    from typing import Union
    from typing import Tuple
    from typing import Dict
    from sanic.request import Request, RequestParameters
    from sentry_sdk._types import Event, EventProcessor, Hint
try:
    from sanic import Sanic, __version__ as SANIC_VERSION
    from sanic.exceptions import SanicException
    from sanic.router import Router
    from sanic.handlers import ErrorHandler
except ImportError:
    raise DidNotEnable("Sanic not installed")
class SanicIntegration(Integration):
    identifier = "sanic"
    @staticmethod
    def setup_once():
        # type: () -> None
        try:
            version = tuple(map(int, SANIC_VERSION.split(".")))
        except (TypeError, ValueError):
            raise DidNotEnable("Unparsable Sanic version: {}".format(SANIC_VERSION))
        if version < (0, 8):
            raise DidNotEnable("Sanic 0.8 or newer required.")
        if not HAS_REAL_CONTEXTVARS:
            # We better have contextvars or we're going to leak state between
            # requests.
            raise DidNotEnable(
                "The sanic integration for Sentry requires Python 3.7+ "
                " or the aiocontextvars package." + CONTEXTVARS_ERROR_MESSAGE
            )
        if SANIC_VERSION.startswith("0.8."):
            # Sanic 0.8 and older creates a logger named "root" and puts a
            # stringified version of every exception in there (without exc_info),
            # which our error deduplication can't detect.
            #
            # We explicitly check the version here because it is a very
            # invasive step to ignore this logger and not necessary in newer
            # versions at all.
            #
            # https://github.com/huge-success/sanic/issues/1332
            ignore_logger("root")
        old_handle_request = Sanic.handle_request
        async def sentry_handle_request(self, request, *args, **kwargs):
            # type: (Any, Request, *Any, **Any) -> Any
            hub = Hub.current
            if hub.get_integration(SanicIntegration) is None:
                return old_handle_request(self, request, *args, **kwargs)
            weak_request = weakref.ref(request)
            with Hub(hub) as hub:
                with hub.configure_scope() as scope:
                    scope.clear_breadcrumbs()
                    scope.add_event_processor(_make_request_processor(weak_request))
                response = old_handle_request(self, request, *args, **kwargs)
                if isawaitable(response):
                    response = await response
                return response
        Sanic.handle_request = sentry_handle_request
        old_router_get = Router.get
        def sentry_router_get(self, request):
            # type: (Any, Request) -> Any
            rv = old_router_get(self, request)
            hub = Hub.current
            if hub.get_integration(SanicIntegration) is not None:
                with capture_internal_exceptions():
                    with hub.configure_scope() as scope:
                        scope.transaction = rv[0].__name__
            return rv
        Router.get = sentry_router_get
        old_error_handler_lookup = ErrorHandler.lookup
        def sentry_error_handler_lookup(self, exception):
            # type: (Any, Exception) -> Optional[object]
            _capture_exception(exception)
            old_error_handler = old_error_handler_lookup(self, exception)
            if old_error_handler is None:
                return None
            if Hub.current.get_integration(SanicIntegration) is None:
                return old_error_handler
            async def sentry_wrapped_error_handler(request, exception):
                # type: (Request, Exception) -> Any
                try:
                    response = old_error_handler(request, exception)
                    if isawaitable(response):
                        response = await response
                    return response
                except Exception:
                    # Report errors that occur in Sanic error handler. These
                    # exceptions will not even show up in Sanic's
                    # `sanic.exceptions` logger.
                    exc_info = sys.exc_info()
                    _capture_exception(exc_info)
                    reraise(*exc_info)
            return sentry_wrapped_error_handler
        ErrorHandler.lookup = sentry_error_handler_lookup
def _capture_exception(exception):
    # type: (Union[Tuple[Optional[type], Optional[BaseException], Any], BaseException]) -> None
    hub = Hub.current
    integration = hub.get_integration(SanicIntegration)
    if integration is None:
        return
    # If an integration is there, a client has to be there.
    client = hub.client  # type: Any
    with capture_internal_exceptions():
        event, hint = event_from_exception(
            exception,
            client_options=client.options,
            mechanism={"type": "sanic", "handled": False},
        )
        hub.capture_event(event, hint=hint)
def _make_request_processor(weak_request):
    # type: (Callable[[], Request]) -> EventProcessor
    def sanic_processor(event, hint):
        # type: (Event, Optional[Hint]) -> Optional[Event]
        try:
            if hint and issubclass(hint["exc_info"][0], SanicException):
                return None
        except KeyError:
            pass
        request = weak_request()
        if request is None:
            return event
        with capture_internal_exceptions():
            extractor = SanicRequestExtractor(request)
            extractor.extract_into_event(event)
            request_info = event["request"]
            urlparts = urlparse.urlsplit(request.url)
            request_info["url"] = "%s://%s%s" % (
                urlparts.scheme,
                urlparts.netloc,
                urlparts.path,
            )
            request_info["query_string"] = urlparts.query
            request_info["method"] = request.method
            request_info["env"] = {"REMOTE_ADDR": request.remote_addr}
            request_info["headers"] = _filter_headers(dict(request.headers))
        return event
    return sanic_processor
class SanicRequestExtractor(RequestExtractor):
    def content_length(self):
        # type: () -> int
        if self.request.body is None:
            return 0
        return len(self.request.body)
    def cookies(self):
        # type: () -> Dict[str, str]
        return dict(self.request.cookies)
    def raw_data(self):
        # type: () -> bytes
        return self.request.body
    def form(self):
        # type: () -> RequestParameters
        return self.request.form
    def is_json(self):
        # type: () -> bool
        raise NotImplementedError()
    def json(self):
        # type: () -> Optional[Any]
        return self.request.json
    def files(self):
        # type: () -> RequestParameters
        return self.request.files
    def size_of_file(self, file):
        # type: (Any) -> int
        return len(file.body or ())