Skip to main content
  1. Posts/

Snowflake Native Webhook Notifications for SIEM: Splunk and Microsoft Sentinel

Table of Contents

See also: How to Let Snowflake Raise Security Incidents in Your SIEM or XDR Automatically — the original article using External Access UDFs for SIEM integration.


The Problem
#

Snowflake sits on a wealth of security telemetry. Failed logins, privilege escalations, anomalous query volumes, unusual access patterns — all of it is captured in ACCOUNT_USAGE views and query history. The challenge has always been getting that information out of Snowflake and into your SIEM in real time.

Previously, the only way to push alerts from Snowflake to an external endpoint was to build a Python UDF backed by an External Access Integration, wire up a secret for the API token, and call that UDF from a Task or Alert. It worked — I covered the full pattern in the original SIEM article — but it was a lot of moving parts for what amounts to an HTTP POST.

Snowflake now supports native webhook notification integrations. You create a NOTIFICATION INTEGRATION of type WEBHOOK, point it at your SIEM endpoint, attach authentication headers via a secret, and call SYSTEM$SEND_NOTIFICATION() from an Alert or Task. No Python UDF. No External Access Integration. Just SQL.

This article covers three approaches, each suited to different authentication requirements:

  1. Native Webhook — best when your SIEM uses a static token (Splunk HEC, Logic App webhook URLs)
  2. External Access UDF with Dynamic OAuth — best when your SIEM requires OAuth tokens that expire (Sentinel Logs Ingestion API with Entra ID)
  3. Hybrid — native Alert for detection, a background Task that refreshes the token so the webhook always has a valid credential

Architecture Overview
#

┌─────────────────────────────────────┐
│        Snowflake Alert / Task       │
│  (runs detection query on schedule) │
└──────────────────┬──────────────────┘
┌─────────────────────────────────────┐
│   NOTIFICATION INTEGRATION          │
│   (TYPE = WEBHOOK)                  │
│                                     │
│   WEBHOOK_URL  = https://...        │
│   WEBHOOK_HEADERS = Authorization   │
│   WEBHOOK_BODY_TEMPLATE = {...}     │
└──────────────────┬──────────────────┘
                   ▼  HTTPS POST
        ┌──────────┼──────────────┐
        │          │              │
        ▼          ▼              ▼
   ┌─────────┐ ┌──────────┐ ┌──────────────┐
   │ Splunk  │ │ Sentinel │ │ Any SIEM     │
   │  HEC    │ │ (Logic   │ │ webhook      │
   │         │ │  App or  │ │ endpoint     │
   │ Auth:   │ │  Logs    │ │              │
   │ Splunk  │ │  Ingest) │ │ Bearer/API   │
   │ <token> │ │          │ │ Key          │
   └─────────┘ └──────────┘ └──────────────┘

The key insight is that SYSTEM$SEND_NOTIFICATION() is the bridge. You call it from any Alert or Task, pass it a notification integration name and a payload, and Snowflake fires the HTTP request natively — no UDF execution environment required.


Approach 1: Native Webhook Notification (Recommended for Static Tokens)#

This is the simplest path and covers the majority of SIEM integrations. If your SIEM accepts a static API token or a webhook URL with an embedded SAS token, this is the approach to use.

How SNOWFLAKE_WEBHOOK_SECRET Works
#

When you create a notification integration of type WEBHOOK, you can reference secrets in header values using the placeholder SNOWFLAKE_WEBHOOK_SECRET. At execution time, Snowflake replaces this placeholder with the actual secret value. The secret is never exposed in the integration definition or logs.

Splunk HEC Integration
#

Splunk’s HTTP Event Collector (HEC) authenticates with a static token passed in the Authorization header. Important: Splunk uses the scheme Splunk — not Bearer. The header format is Authorization: Splunk <token>.

Step 1: Set Up the Role and Permissions
#

USE ROLE ACCOUNTADMIN;

-- Create a dedicated role for SIEM alerting
CREATE ROLE IF NOT EXISTS SIEM_WEBHOOK_ROLE;

GRANT EXECUTE ALERT ON ACCOUNT TO ROLE SIEM_WEBHOOK_ROLE;
GRANT EXECUTE TASK ON ACCOUNT TO ROLE SIEM_WEBHOOK_ROLE;
GRANT CREATE INTEGRATION ON ACCOUNT TO ROLE SIEM_WEBHOOK_ROLE;

-- Grant access to account usage views
GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE SIEM_WEBHOOK_ROLE;

-- Create a database for SIEM objects
CREATE DATABASE IF NOT EXISTS SIEM_DB;
GRANT OWNERSHIP ON DATABASE SIEM_DB TO ROLE SIEM_WEBHOOK_ROLE;

CREATE SCHEMA IF NOT EXISTS SIEM_DB.ALERTS;
GRANT OWNERSHIP ON SCHEMA SIEM_DB.ALERTS TO ROLE SIEM_WEBHOOK_ROLE;

-- Create a warehouse
CREATE WAREHOUSE IF NOT EXISTS SIEM_WH
  WAREHOUSE_SIZE = 'XSMALL'
  AUTO_SUSPEND = 60
  AUTO_RESUME = TRUE;
GRANT USAGE ON WAREHOUSE SIEM_WH TO ROLE SIEM_WEBHOOK_ROLE;

-- Grant the role to your user
GRANT ROLE SIEM_WEBHOOK_ROLE TO USER YOUR_USERNAME;

Step 2: Create the Secret for the HEC Token
#

USE ROLE SIEM_WEBHOOK_ROLE;
USE DATABASE SIEM_DB;
USE SCHEMA ALERTS;

CREATE OR REPLACE SECRET SPLUNK_HEC_TOKEN
  TYPE = GENERIC_STRING
  SECRET_STRING = 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx';  -- Your Splunk HEC token

Step 3: Create the Notification Integration
#

CREATE OR REPLACE NOTIFICATION INTEGRATION SPLUNK_HEC_WEBHOOK
  TYPE = WEBHOOK
  ENABLED = TRUE
  WEBHOOK_URL = 'https://splunk.yourcompany.com:8088/services/collector/event'
  WEBHOOK_METHOD = POST
  WEBHOOK_HEADERS = ('Content-Type'='application/json', 'Authorization'='Splunk SNOWFLAKE_WEBHOOK_SECRET')
  WEBHOOK_BODY_TEMPLATE = '<SNOWFLAKE_WEBHOOK_MESSAGE>'
  ALLOWED_RECIPIENTS = ()
  WEBHOOK_SECRET = SIEM_DB.ALERTS.SPLUNK_HEC_TOKEN;

Note the Authorization header value: Splunk SNOWFLAKE_WEBHOOK_SECRET. At runtime, Snowflake replaces SNOWFLAKE_WEBHOOK_SECRET with the actual token from the secret, producing Splunk xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx. This is Splunk’s required format — using Bearer here would result in a 403 Forbidden.

Step 4: Create the Alert for Failed Login Detection
#

CREATE OR REPLACE ALERT SIEM_DB.ALERTS.FAILED_LOGIN_ALERT
  WAREHOUSE = SIEM_WH
  SCHEDULE = '1 MINUTE'
  IF (EXISTS (
    SELECT 1
    FROM SNOWFLAKE.ACCOUNT_USAGE.LOGIN_HISTORY
    WHERE EVENT_TIMESTAMP > SNOWFLAKE_ALERT_LAST_SUCCESSFUL_SCHEDULED_TIME()
      AND IS_SUCCESS = 'NO'
      AND ERROR_MESSAGE ILIKE '%incorrect%password%'
  ))
  THEN
    CALL SYSTEM$SEND_NOTIFICATION(
      'SPLUNK_HEC_WEBHOOK',
      TO_JSON(OBJECT_CONSTRUCT(
        'event', OBJECT_CONSTRUCT(
          'source', 'snowflake',
          'sourcetype', 'snowflake:security:failed_login',
          'severity', 'high',
          'account', CURRENT_ACCOUNT(),
          'region', CURRENT_REGION(),
          'details', (
            SELECT ARRAY_AGG(OBJECT_CONSTRUCT(
              'event_id', EVENT_ID,
              'event_timestamp', EVENT_TIMESTAMP::STRING,
              'user_name', USER_NAME,
              'client_ip', CLIENT_IP,
              'error_message', ERROR_MESSAGE,
              'reported_client_type', REPORTED_CLIENT_TYPE
            ))
            FROM SNOWFLAKE.ACCOUNT_USAGE.LOGIN_HISTORY
            WHERE EVENT_TIMESTAMP > SNOWFLAKE_ALERT_LAST_SUCCESSFUL_SCHEDULED_TIME()
              AND IS_SUCCESS = 'NO'
              AND ERROR_MESSAGE ILIKE '%incorrect%password%'
          )
        )
      ))
    );

-- Enable the alert
ALTER ALERT SIEM_DB.ALERTS.FAILED_LOGIN_ALERT RESUME;

The alert checks every minute. If any failed password login attempts occurred since the last successful check, it fires a webhook to Splunk HEC with a JSON payload containing the event details. Splunk indexes this as a snowflake:security:failed_login sourcetype.

Microsoft Sentinel Integration
#

Sentinel offers two webhook-friendly ingestion paths. Choose based on your architecture.

Option A: Logic App Webhook (Simplest)
#

Azure Logic Apps can expose an HTTP trigger URL that includes a SAS token — no Authorization header needed. The Logic App then forwards the payload to a Sentinel workspace.

-- No secret needed — the SAS token is embedded in the URL
CREATE OR REPLACE NOTIFICATION INTEGRATION SENTINEL_LOGIC_APP_WEBHOOK
  TYPE = WEBHOOK
  ENABLED = TRUE
  WEBHOOK_URL = 'https://prod-xx.westeurope.logic.azure.com:443/workflows/abc123/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=YOUR_SAS_SIGNATURE'
  WEBHOOK_METHOD = POST
  WEBHOOK_HEADERS = ('Content-Type'='application/json')
  WEBHOOK_BODY_TEMPLATE = '<SNOWFLAKE_WEBHOOK_MESSAGE>'
  ALLOWED_RECIPIENTS = ();

Option B: Sentinel Logs Ingestion API (Direct)
#

If you prefer to send data directly to a Log Analytics workspace via the Logs Ingestion API, you need a bearer token. For a static service principal secret, you can store it and use the native webhook approach.

-- Store the bearer token (from Entra ID service principal)
CREATE OR REPLACE SECRET SENTINEL_BEARER_TOKEN
  TYPE = GENERIC_STRING
  SECRET_STRING = 'eyJ0eXAiOiJKV1QiLCJhbGciOi...';  -- Your Entra ID access token

CREATE OR REPLACE NOTIFICATION INTEGRATION SENTINEL_API_WEBHOOK
  TYPE = WEBHOOK
  ENABLED = TRUE
  WEBHOOK_URL = 'https://your-dce-id.westeurope-1.ingest.monitor.azure.com/dataCollectionRules/dcr-xxxxxxxxxxxx/streams/Custom-SnowflakeSecurity_CL?api-version=2023-01-01'
  WEBHOOK_METHOD = POST
  WEBHOOK_HEADERS = ('Content-Type'='application/json', 'Authorization'='Bearer SNOWFLAKE_WEBHOOK_SECRET')
  WEBHOOK_BODY_TEMPLATE = '<SNOWFLAKE_WEBHOOK_MESSAGE>'
  ALLOWED_RECIPIENTS = ()
  WEBHOOK_SECRET = SIEM_DB.ALERTS.SENTINEL_BEARER_TOKEN;

Privilege Escalation Alert for Sentinel
#

CREATE OR REPLACE ALERT SIEM_DB.ALERTS.PRIVILEGE_ESCALATION_ALERT
  WAREHOUSE = SIEM_WH
  SCHEDULE = '5 MINUTES'
  IF (EXISTS (
    SELECT 1
    FROM SNOWFLAKE.ACCOUNT_USAGE.GRANTS_TO_USERS
    WHERE CREATED_ON > SNOWFLAKE_ALERT_LAST_SUCCESSFUL_SCHEDULED_TIME()
      AND ROLE IN ('ACCOUNTADMIN', 'SECURITYADMIN', 'SYSADMIN')
  ))
  THEN
    CALL SYSTEM$SEND_NOTIFICATION(
      'SENTINEL_LOGIC_APP_WEBHOOK',
      TO_JSON(ARRAY_CONSTRUCT(
        OBJECT_CONSTRUCT(
          'TimeGenerated', CURRENT_TIMESTAMP()::STRING,
          'Source', 'Snowflake',
          'EventType', 'PrivilegeEscalation',
          'Severity', 'Critical',
          'Account', CURRENT_ACCOUNT(),
          'Details', (
            SELECT ARRAY_AGG(OBJECT_CONSTRUCT(
              'grantee_name', GRANTEE_NAME,
              'role', ROLE,
              'granted_by', GRANTED_BY,
              'granted_on', CREATED_ON::STRING
            ))
            FROM SNOWFLAKE.ACCOUNT_USAGE.GRANTS_TO_USERS
            WHERE CREATED_ON > SNOWFLAKE_ALERT_LAST_SUCCESSFUL_SCHEDULED_TIME()
              AND ROLE IN ('ACCOUNTADMIN', 'SECURITYADMIN', 'SYSADMIN')
          )
        )
      ))
    );

ALTER ALERT SIEM_DB.ALERTS.PRIVILEGE_ESCALATION_ALERT RESUME;

Limitations of Native Webhooks
#

The native approach works well when the token does not expire. Splunk HEC tokens, Logic App SAS URLs, and long-lived API keys are ideal. But if your SIEM requires an OAuth access token that expires (typically every 60 minutes), you need one of the next two approaches.


Approach 2: External Access Function with Dynamic OAuth (For Token Refresh)
#

When your SIEM endpoint requires an OAuth 2.0 access token that expires — the most common case being Microsoft Sentinel’s Logs Ingestion API authenticated via Entra ID (Azure AD) — the native webhook approach falls short because the token in the secret goes stale.

The External Access Integration pattern solves this by obtaining a fresh token at call time using Snowflake’s built-in _snowflake.get_oauth_access_token().

Step 1: Create the Security Integration for Entra ID
#

USE ROLE ACCOUNTADMIN;

CREATE OR REPLACE SECURITY INTEGRATION SENTINEL_OAUTH
  TYPE = API_AUTHENTICATION
  AUTH_TYPE = OAUTH2
  OAUTH_CLIENT_ID = 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'          -- Entra ID app registration client ID
  OAUTH_CLIENT_SECRET = 'your-client-secret-value'
  OAUTH_TOKEN_ENDPOINT = 'https://login.microsoftonline.com/YOUR_TENANT_ID/oauth2/v2.0/token'
  OAUTH_ALLOWED_SCOPES = ('https://monitor.azure.com/.default');

Step 2: Create the OAuth Secret
#

CREATE OR REPLACE SECRET SIEM_DB.ALERTS.SENTINEL_OAUTH_SECRET
  TYPE = OAUTH2
  API_AUTHENTICATION = SENTINEL_OAUTH
  OAUTH_SCOPES = ('https://monitor.azure.com/.default');

Step 3: Create the Network Rule and External Access Integration
#

CREATE OR REPLACE NETWORK RULE SENTINEL_INGEST_RULE
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = (
    'login.microsoftonline.com:443',
    'your-dce-id.westeurope-1.ingest.monitor.azure.com:443'
  );

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION SENTINEL_ACCESS
  ALLOWED_NETWORK_RULES = (SENTINEL_INGEST_RULE)
  ALLOWED_AUTHENTICATION_SECRETS = (SIEM_DB.ALERTS.SENTINEL_OAUTH_SECRET)
  ENABLED = TRUE;

GRANT USAGE ON INTEGRATION SENTINEL_ACCESS TO ROLE SIEM_WEBHOOK_ROLE;

Step 4: Create the Python UDF
#

USE ROLE SIEM_WEBHOOK_ROLE;
USE DATABASE SIEM_DB;
USE SCHEMA ALERTS;

CREATE OR REPLACE FUNCTION SEND_TO_SENTINEL(payload STRING)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.11'
  HANDLER = 'send_alert'
  EXTERNAL_ACCESS_INTEGRATIONS = (SENTINEL_ACCESS)
  SECRETS = ('oauth_token' = SIEM_DB.ALERTS.SENTINEL_OAUTH_SECRET)
  PACKAGES = ('requests', 'snowflake-snowpark-python')
AS
$$
import _snowflake
import requests
import json

DCE_ENDPOINT = "https://your-dce-id.westeurope-1.ingest.monitor.azure.com"
DCR_RULE_ID  = "dcr-xxxxxxxxxxxx"
STREAM_NAME  = "Custom-SnowflakeSecurity_CL"

def send_alert(payload: str) -> str:
    # Get a fresh OAuth token  Snowflake handles the client_credentials flow
    token = _snowflake.get_oauth_access_token('oauth_token')

    url = f"{DCE_ENDPOINT}/dataCollectionRules/{DCR_RULE_ID}/streams/{STREAM_NAME}?api-version=2023-01-01"

    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }

    # Sentinel Logs Ingestion API expects an array of records
    body = json.loads(payload) if isinstance(payload, str) else payload
    if not isinstance(body, list):
        body = [body]

    response = requests.post(url, headers=headers, json=body, timeout=30)

    return json.dumps({
        "status_code": response.status_code,
        "response": response.text[:500]
    })
$$;

Step 5: Create an Alert That Uses the UDF
#

CREATE OR REPLACE ALERT SIEM_DB.ALERTS.FAILED_LOGIN_SENTINEL_ALERT
  WAREHOUSE = SIEM_WH
  SCHEDULE = '1 MINUTE'
  IF (EXISTS (
    SELECT 1
    FROM SNOWFLAKE.ACCOUNT_USAGE.LOGIN_HISTORY
    WHERE EVENT_TIMESTAMP > SNOWFLAKE_ALERT_LAST_SUCCESSFUL_SCHEDULED_TIME()
      AND IS_SUCCESS = 'NO'
      AND ERROR_MESSAGE ILIKE '%incorrect%password%'
  ))
  THEN
    SELECT SEND_TO_SENTINEL(
      TO_JSON(OBJECT_CONSTRUCT(
        'TimeGenerated', CURRENT_TIMESTAMP()::STRING,
        'EventType', 'FailedLogin',
        'Severity', 'High',
        'Account', CURRENT_ACCOUNT(),
        'Region', CURRENT_REGION(),
        'Events', (
          SELECT ARRAY_AGG(OBJECT_CONSTRUCT(
            'event_id', EVENT_ID,
            'event_timestamp', EVENT_TIMESTAMP::STRING,
            'user_name', USER_NAME,
            'client_ip', CLIENT_IP,
            'error_message', ERROR_MESSAGE
          ))
          FROM SNOWFLAKE.ACCOUNT_USAGE.LOGIN_HISTORY
          WHERE EVENT_TIMESTAMP > SNOWFLAKE_ALERT_LAST_SUCCESSFUL_SCHEDULED_TIME()
            AND IS_SUCCESS = 'NO'
            AND ERROR_MESSAGE ILIKE '%incorrect%password%'
        )
      ))
    );

ALTER ALERT SIEM_DB.ALERTS.FAILED_LOGIN_SENTINEL_ALERT RESUME;

This approach guarantees a fresh token on every invocation. The trade-off is the overhead of a Python UDF execution environment. For a detailed walkthrough of the OAuth token management pattern, see Snowflake Federated Queries with Cross-Account OAuth.


Approach 3: Hybrid — Alert Detection + Webhook + Token Refresh Task
#

The hybrid approach gives you the best of both worlds: the simplicity and low latency of a native webhook for alert delivery, combined with automatic token refresh via a background Task.

The idea is straightforward:

  1. A scheduled Task runs every 30 minutes, obtains a fresh OAuth token using a Python UDF, and updates the secret via ALTER SECRET.
  2. The notification integration references that secret — so it always has a valid token.
  3. Alerts use SYSTEM$SEND_NOTIFICATION() with the native webhook — no UDF in the alert path.

Step 1: Create the Token Refresh UDF
#

CREATE OR REPLACE FUNCTION SIEM_DB.ALERTS.GET_SENTINEL_TOKEN()
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.11'
  HANDLER = 'get_token'
  EXTERNAL_ACCESS_INTEGRATIONS = (SENTINEL_ACCESS)
  SECRETS = ('oauth_token' = SIEM_DB.ALERTS.SENTINEL_OAUTH_SECRET)
  PACKAGES = ('snowflake-snowpark-python')
AS
$$
import _snowflake

def get_token() -> str:
    return _snowflake.get_oauth_access_token('oauth_token')
$$;

Step 2: Create the Token Refresh Task
#

CREATE OR REPLACE TASK SIEM_DB.ALERTS.REFRESH_SENTINEL_TOKEN
  WAREHOUSE = SIEM_WH
  SCHEDULE = '30 MINUTE'
AS
  ALTER SECRET SIEM_DB.ALERTS.SENTINEL_BEARER_TOKEN
    SET SECRET_STRING = (SELECT SIEM_DB.ALERTS.GET_SENTINEL_TOKEN());

ALTER TASK SIEM_DB.ALERTS.REFRESH_SENTINEL_TOKEN RESUME;

This Task runs every 30 minutes. It calls the UDF to get a fresh Entra ID token, then updates the SENTINEL_BEARER_TOKEN secret (the GENERIC_STRING secret used by the webhook integration from Approach 1). Since OAuth tokens typically last 60 minutes, refreshing every 30 ensures the token is always valid.

Step 3: Use the Native Webhook from Alerts (Same as Approach 1)
#

Now your alerts use the native webhook path — SYSTEM$SEND_NOTIFICATION() with SENTINEL_API_WEBHOOK — and the token is always current because the refresh task keeps it up to date.

CREATE OR REPLACE ALERT SIEM_DB.ALERTS.DATA_EXFIL_ALERT
  WAREHOUSE = SIEM_WH
  SCHEDULE = '5 MINUTES'
  IF (EXISTS (
    SELECT 1
    FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
    WHERE START_TIME > SNOWFLAKE_ALERT_LAST_SUCCESSFUL_SCHEDULED_TIME()
      AND QUERY_TYPE = 'SELECT'
      AND ROWS_PRODUCED > 1000000
      AND BYTES_SCANNED > 1073741824  -- 1 GB
  ))
  THEN
    CALL SYSTEM$SEND_NOTIFICATION(
      'SENTINEL_API_WEBHOOK',
      TO_JSON(ARRAY_CONSTRUCT(
        OBJECT_CONSTRUCT(
          'TimeGenerated', CURRENT_TIMESTAMP()::STRING,
          'EventType', 'PotentialDataExfiltration',
          'Severity', 'Critical',
          'Account', CURRENT_ACCOUNT(),
          'Details', (
            SELECT ARRAY_AGG(OBJECT_CONSTRUCT(
              'query_id', QUERY_ID,
              'user_name', USER_NAME,
              'role_name', ROLE_NAME,
              'start_time', START_TIME::STRING,
              'rows_produced', ROWS_PRODUCED,
              'bytes_scanned', BYTES_SCANNED,
              'query_text', LEFT(QUERY_TEXT, 500)
            ))
            FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
            WHERE START_TIME > SNOWFLAKE_ALERT_LAST_SUCCESSFUL_SCHEDULED_TIME()
              AND QUERY_TYPE = 'SELECT'
              AND ROWS_PRODUCED > 1000000
              AND BYTES_SCANNED > 1073741824
          )
        )
      ))
    );

ALTER ALERT SIEM_DB.ALERTS.DATA_EXFIL_ALERT RESUME;

Comparison
#

AspectNative WebhookExternal Access UDFHybrid
Setup complexityLowHighMedium
Token refreshManual or via TaskAutomatic at call timeAutomatic via background Task
Payload controlTemplate-basedFull programmatic (Python)Template-based
LatencyLow (native HTTP)Higher (Python runtime)Low (native HTTP)
DependenciesSecret + IntegrationSecret + Integration + Network Rule + UDFAll of the above
Best forStatic tokens (Splunk HEC, Logic App URLs)Dynamic OAuth with complex payloadsDynamic OAuth with simple alert payloads

Recommendation: Start with the native webhook (Approach 1) whenever possible. Move to the hybrid (Approach 3) if you need dynamic OAuth tokens but want to keep alert logic in pure SQL. Use the full External Access UDF (Approach 2) only when you need programmatic control over the HTTP request — custom retry logic, payload transformation, multi-step API calls, etc.


Detection Queries
#

These are ready to use as the condition in a Snowflake Alert. Each returns rows only when the condition is met, making them suitable for the IF (EXISTS (...)) pattern.

1. Failed Login Attempts (Threshold-Based)
#

Alert when a single user fails to log in 5 or more times within the check window:

SELECT USER_NAME, COUNT(*) AS FAILED_ATTEMPTS
FROM SNOWFLAKE.ACCOUNT_USAGE.LOGIN_HISTORY
WHERE EVENT_TIMESTAMP > SNOWFLAKE_ALERT_LAST_SUCCESSFUL_SCHEDULED_TIME()
  AND IS_SUCCESS = 'NO'
GROUP BY USER_NAME
HAVING COUNT(*) >= 5;

2. Login From New or Unusual IP Address
#

Alert when a user logs in from an IP address they have never used before (compared against the last 90 days):

SELECT L.USER_NAME, L.CLIENT_IP, L.EVENT_TIMESTAMP
FROM SNOWFLAKE.ACCOUNT_USAGE.LOGIN_HISTORY L
WHERE L.EVENT_TIMESTAMP > SNOWFLAKE_ALERT_LAST_SUCCESSFUL_SCHEDULED_TIME()
  AND L.IS_SUCCESS = 'YES'
  AND L.CLIENT_IP NOT IN (
    SELECT DISTINCT CLIENT_IP
    FROM SNOWFLAKE.ACCOUNT_USAGE.LOGIN_HISTORY
    WHERE USER_NAME = L.USER_NAME
      AND IS_SUCCESS = 'YES'
      AND EVENT_TIMESTAMP > DATEADD('day', -90, CURRENT_TIMESTAMP())
      AND EVENT_TIMESTAMP <= SNOWFLAKE_ALERT_LAST_SUCCESSFUL_SCHEDULED_TIME()
  );

3. Privilege Escalation (New Admin Role Grants)
#

Alert when any user is granted ACCOUNTADMIN, SECURITYADMIN, or SYSADMIN:

SELECT GRANTEE_NAME, ROLE, GRANTED_BY, CREATED_ON
FROM SNOWFLAKE.ACCOUNT_USAGE.GRANTS_TO_USERS
WHERE CREATED_ON > SNOWFLAKE_ALERT_LAST_SUCCESSFUL_SCHEDULED_TIME()
  AND ROLE IN ('ACCOUNTADMIN', 'SECURITYADMIN', 'SYSADMIN');

4. Potential Data Exfiltration (Large SELECT Volumes)
#

Alert on queries that return more than 1 million rows or scan more than 1 GB:

SELECT QUERY_ID, USER_NAME, ROLE_NAME, ROWS_PRODUCED, BYTES_SCANNED,
       LEFT(QUERY_TEXT, 200) AS QUERY_PREVIEW
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE START_TIME > SNOWFLAKE_ALERT_LAST_SUCCESSFUL_SCHEDULED_TIME()
  AND QUERY_TYPE = 'SELECT'
  AND (ROWS_PRODUCED > 1000000 OR BYTES_SCANNED > 1073741824);

5. Queries at Unusual Hours
#

Alert on queries executed outside business hours (adjust the hour range and timezone to fit your organization):

SELECT QUERY_ID, USER_NAME, ROLE_NAME, START_TIME,
       LEFT(QUERY_TEXT, 200) AS QUERY_PREVIEW
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE START_TIME > SNOWFLAKE_ALERT_LAST_SUCCESSFUL_SCHEDULED_TIME()
  AND HOUR(CONVERT_TIMEZONE('UTC', 'Europe/Zurich', START_TIME)) NOT BETWEEN 7 AND 19
  AND DAYOFWEEK(CONVERT_TIMEZONE('UTC', 'Europe/Zurich', START_TIME)) BETWEEN 1 AND 5
  AND USER_NAME NOT IN ('SYSTEM', 'SNOWFLAKE')  -- Exclude system accounts
  AND QUERY_TYPE IN ('SELECT', 'INSERT', 'UPDATE', 'DELETE', 'COPY');

Security Considerations
#

Secret Rotation
#

Rotate your SIEM API tokens on a regular schedule. For static tokens (Splunk HEC), use ALTER SECRET ... SET SECRET_STRING = '...' or automate it via a Task. For the hybrid approach, the refresh Task handles this automatically — but you still need to rotate the Entra ID client secret in the Security Integration periodically.

Network Policies
#

Restrict which Snowflake components can reach external endpoints. The External Access Integration’s ALLOWED_NETWORK_RULES controls egress for UDFs. For native webhooks, ensure your SIEM endpoint is accessible from Snowflake’s egress IP ranges — check SYSTEM$ALLOWLIST() for the current list.

Audit Trail
#

All calls to SYSTEM$SEND_NOTIFICATION() are logged in ACCOUNT_USAGE.NOTIFICATION_HISTORY. Monitor this view to confirm alerts are firing and to detect any delivery failures:

SELECT *
FROM SNOWFLAKE.ACCOUNT_USAGE.NOTIFICATION_HISTORY
WHERE NOTIFICATION_TYPE = 'WEBHOOK'
ORDER BY CREATED DESC
LIMIT 50;

Least Privilege
#

The role that owns the Alert or Task needs:

  • EXECUTE ALERT / EXECUTE TASK on the account
  • USAGE on the notification integration
  • USAGE on the warehouse
  • IMPORTED PRIVILEGES on the SNOWFLAKE database (for ACCOUNT_USAGE views)
  • USAGE and READ on the secret

Do not grant ACCOUNTADMIN to the monitoring role. The setup in this article uses a dedicated SIEM_WEBHOOK_ROLE with only the permissions it needs.


Related Articles#

Kevin Keller
Author
Kevin Keller
Personal blog about AI, Observability & Data Sovereignty. Snowflake-related articles explore the art of the possible and are not official Snowflake solutions or endorsed by Snowflake unless explicitly stated. Opinions are my own. Content is meant as educational inspiration, not production guidance.
Share this article

Related