File: //opt/imunify360/venv/lib/python3.11/site-packages/sentry_sdk/integrations/spark/spark_driver.py
from sentry_sdk import configure_scope
from sentry_sdk.hub import Hub
from sentry_sdk.integrations import Integration
from sentry_sdk.utils import capture_internal_exceptions
from sentry_sdk._types import MYPY
if MYPY:
    from typing import Any
    from typing import Optional
    from sentry_sdk._types import Event, Hint
class SparkIntegration(Integration):
    identifier = "spark"
    @staticmethod
    def setup_once():
        # type: () -> None
        patch_spark_context_init()
def _set_app_properties():
    # type: () -> None
    """
    Set properties in driver that propagate to worker processes, allowing for workers to have access to those properties.
    This allows worker integration to have access to app_name and application_id.
    """
    from pyspark import SparkContext
    spark_context = SparkContext._active_spark_context
    if spark_context:
        spark_context.setLocalProperty("sentry_app_name", spark_context.appName)
        spark_context.setLocalProperty(
            "sentry_application_id", spark_context.applicationId
        )
def _start_sentry_listener(sc):
    # type: (Any) -> None
    """
    Start java gateway server to add custom `SparkListener`
    """
    from pyspark.java_gateway import ensure_callback_server_started
    gw = sc._gateway
    ensure_callback_server_started(gw)
    listener = SentryListener()
    sc._jsc.sc().addSparkListener(listener)
def patch_spark_context_init():
    # type: () -> None
    from pyspark import SparkContext
    spark_context_init = SparkContext._do_init
    def _sentry_patched_spark_context_init(self, *args, **kwargs):
        # type: (SparkContext, *Any, **Any) -> Optional[Any]
        init = spark_context_init(self, *args, **kwargs)
        if Hub.current.get_integration(SparkIntegration) is None:
            return init
        _start_sentry_listener(self)
        _set_app_properties()
        with configure_scope() as scope:
            @scope.add_event_processor
            def process_event(event, hint):
                # type: (Event, Hint) -> Optional[Event]
                with capture_internal_exceptions():
                    if Hub.current.get_integration(SparkIntegration) is None:
                        return event
                    event.setdefault("user", {}).setdefault("id", self.sparkUser())
                    event.setdefault("tags", {}).setdefault(
                        "executor.id", self._conf.get("spark.executor.id")
                    )
                    event["tags"].setdefault(
                        "spark-submit.deployMode",
                        self._conf.get("spark.submit.deployMode"),
                    )
                    event["tags"].setdefault(
                        "driver.host", self._conf.get("spark.driver.host")
                    )
                    event["tags"].setdefault(
                        "driver.port", self._conf.get("spark.driver.port")
                    )
                    event["tags"].setdefault("spark_version", self.version)
                    event["tags"].setdefault("app_name", self.appName)
                    event["tags"].setdefault("application_id", self.applicationId)
                    event["tags"].setdefault("master", self.master)
                    event["tags"].setdefault("spark_home", self.sparkHome)
                    event.setdefault("extra", {}).setdefault("web_url", self.uiWebUrl)
                return event
        return init
    SparkContext._do_init = _sentry_patched_spark_context_init
class SparkListener(object):
    def onApplicationEnd(self, applicationEnd):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onApplicationStart(self, applicationStart):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onBlockManagerAdded(self, blockManagerAdded):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onBlockManagerRemoved(self, blockManagerRemoved):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onBlockUpdated(self, blockUpdated):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onEnvironmentUpdate(self, environmentUpdate):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onExecutorAdded(self, executorAdded):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onExecutorBlacklisted(self, executorBlacklisted):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onExecutorBlacklistedForStage(  # noqa: N802
        self, executorBlacklistedForStage  # noqa: N803
    ):
        # type: (Any) -> None
        pass
    def onExecutorMetricsUpdate(self, executorMetricsUpdate):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onExecutorRemoved(self, executorRemoved):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onJobEnd(self, jobEnd):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onJobStart(self, jobStart):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onNodeBlacklisted(self, nodeBlacklisted):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onNodeBlacklistedForStage(self, nodeBlacklistedForStage):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onNodeUnblacklisted(self, nodeUnblacklisted):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onOtherEvent(self, event):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onSpeculativeTaskSubmitted(self, speculativeTask):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onStageCompleted(self, stageCompleted):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onStageSubmitted(self, stageSubmitted):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onTaskEnd(self, taskEnd):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onTaskGettingResult(self, taskGettingResult):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onTaskStart(self, taskStart):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    def onUnpersistRDD(self, unpersistRDD):  # noqa: N802,N803
        # type: (Any) -> None
        pass
    class Java:
        implements = ["org.apache.spark.scheduler.SparkListenerInterface"]
class SentryListener(SparkListener):
    def __init__(self):
        # type: () -> None
        self.hub = Hub.current
    def onJobStart(self, jobStart):  # noqa: N802,N803
        # type: (Any) -> None
        message = "Job {} Started".format(jobStart.jobId())
        self.hub.add_breadcrumb(level="info", message=message)
        _set_app_properties()
    def onJobEnd(self, jobEnd):  # noqa: N802,N803
        # type: (Any) -> None
        level = ""
        message = ""
        data = {"result": jobEnd.jobResult().toString()}
        if jobEnd.jobResult().toString() == "JobSucceeded":
            level = "info"
            message = "Job {} Ended".format(jobEnd.jobId())
        else:
            level = "warning"
            message = "Job {} Failed".format(jobEnd.jobId())
        self.hub.add_breadcrumb(level=level, message=message, data=data)
    def onStageSubmitted(self, stageSubmitted):  # noqa: N802,N803
        # type: (Any) -> None
        stage_info = stageSubmitted.stageInfo()
        message = "Stage {} Submitted".format(stage_info.stageId())
        data = {"attemptId": stage_info.attemptId(), "name": stage_info.name()}
        self.hub.add_breadcrumb(level="info", message=message, data=data)
        _set_app_properties()
    def onStageCompleted(self, stageCompleted):  # noqa: N802,N803
        # type: (Any) -> None
        from py4j.protocol import Py4JJavaError  # type: ignore
        stage_info = stageCompleted.stageInfo()
        message = ""
        level = ""
        data = {"attemptId": stage_info.attemptId(), "name": stage_info.name()}
        # Have to Try Except because stageInfo.failureReason() is typed with Scala Option
        try:
            data["reason"] = stage_info.failureReason().get()
            message = "Stage {} Failed".format(stage_info.stageId())
            level = "warning"
        except Py4JJavaError:
            message = "Stage {} Completed".format(stage_info.stageId())
            level = "info"
        self.hub.add_breadcrumb(level=level, message=message, data=data)