Security teams are drowning in logs. The average SOC analyst triages hundreds of alerts per day, most of them false positives, most of them requiring the same manual steps: query the SIEM, correlate events, check thresholds, escalate or dismiss. Meanwhile, the actual threats — credential stuffing at 2 AM, a slow data exfiltration over weeks, a privilege escalation buried in a sea of routine admin changes — slip through because no human can maintain vigilance at machine scale.
What if your AI coding agent already knew how to build the detection pipelines, write the threat-hunting queries, and wire up the alerting — all inside Snowflake, using the security views and compliance frameworks your organisation already cares about?
That is what the Security Operations AI Skill does. It is a ~2,800-line knowledge file that transforms any AI coding agent into a cybersecurity expert — one that understands Snowflake’s security views, detection patterns for the OWASP Top 10:2025, MITRE ATT&CK tactics, and NIST CSF 2.0 functions. The agent reads the skill, then builds whatever you ask: detection pipelines, alert dashboards, compliance reports, incident response automation.
Works with Any AI Coding Agent#
While this skill was originally built for Snowflake Cortex Code, the SKILL.md file format is agent-agnostic. Any AI coding agent that can read context files can use it:
| Agent | How to Use the Skill |
|---|---|
| Cortex Code (CoCo) | Place SKILL.md in your project — CoCo reads it automatically |
| Claude Code | Place SKILL.md in your project root or reference it in CLAUDE.md |
| Cursor | Add SKILL.md to your project — Cursor indexes it as context |
| Continue.dev | Reference in .continuerules or place in project root |
| Windsurf | Place in project root — picked up as workspace context |
| Any agent with file context | Point the agent to SKILL.md before making requests |
The skill file is universal. You are not locked into any single agent or vendor. The same SKILL.md that powers a Cortex Code session works identically in Claude Code or Cursor — the agent reads the knowledge, then generates the SQL, Python, and configuration you need.
What the Skill Knows#
The Security Operations Skill encodes three major cybersecurity frameworks, mapped directly to Snowflake’s capabilities.
OWASP Top 10:2025#
The skill covers every category in the 2025 edition, with detection patterns and SQL queries for each:
| # | Category | Snowflake Detection Pattern |
|---|---|---|
| A01 | Broken Access Control | Unauthorised role grants, privilege escalation, cross-account access anomalies |
| A02 | Cryptographic Failures | Unencrypted data movement, weak key usage, certificate expiration monitoring |
| A03 | Injection | SQL injection patterns in query logs, suspicious UDF creation, dynamic SQL abuse |
| A04 | Insecure Design | Overly permissive roles, missing network policies, public stage exposure |
| A05 | Security Misconfiguration | Default settings, disabled MFA, overly broad grants, missing audit logging |
| A06 | Vulnerable & Outdated Components | Driver version tracking, deprecated connector usage, EOL client detection |
| A07 | Identification & Authentication Failures | Brute force, credential stuffing, impossible travel, session hijacking |
| A08 | Software & Data Integrity Failures | Unauthorised schema changes, data tampering detection, pipeline integrity |
| A09 | Security Logging & Monitoring Failures | Gaps in audit coverage, disabled logging, monitoring blind spots |
| A10 | Server-Side Request Forgery | External function abuse, network integration misuse, egress anomalies |
MITRE ATT&CK Tactics#
Each tactic maps to specific Snowflake views and detection logic:
| Tactic | Snowflake Views | Detection Examples |
|---|---|---|
| Initial Access | LOGIN_HISTORY, SESSIONS | Brute force, credential stuffing, anomalous login locations |
| Execution | QUERY_HISTORY, TASK_HISTORY | Suspicious stored procedure calls, UDF creation, task manipulation |
| Persistence | GRANTS_TO_ROLES, GRANTS_TO_USERS | Backdoor role creation, persistent grant changes, shadow admin accounts |
| Privilege Escalation | GRANTS_TO_ROLES, ROLES | Role hierarchy manipulation, ACCOUNTADMIN grants, privilege chaining |
| Defence Evasion | QUERY_HISTORY, SESSIONS | Query obfuscation, log tampering attempts, session token reuse |
| Credential Access | LOGIN_HISTORY, USERS | Password spraying, credential reuse across accounts, MFA bypass attempts |
| Discovery | QUERY_HISTORY | Schema enumeration, data profiling queries, metadata harvesting |
| Collection | QUERY_HISTORY, COPY_HISTORY | Bulk data reads, unusual COPY operations, large result set extraction |
| Exfiltration | COPY_HISTORY, STAGES | External stage writes, unusual data volumes, off-hours transfers |
| Impact | QUERY_HISTORY, TASK_HISTORY | Mass deletes, schema drops, warehouse manipulation, resource abuse |
NIST Cybersecurity Framework 2.0#
The skill maps NIST CSF 2.0 functions to actionable Snowflake implementations:
| Function | Implementation in Snowflake |
|---|---|
| Govern (GV) | Role hierarchy policies, access review automation, compliance reporting |
| Identify (ID) | Asset inventory via INFORMATION_SCHEMA, data classification, risk assessment queries |
| Protect (PR) | Network policies, MFA enforcement validation, encryption verification |
| Detect (DE) | Dynamic Table detection pipelines, anomaly scoring, threshold alerting |
| Respond (RS) | Automated account disabling, incident ticket creation via webhooks, forensic query generation |
| Recover (RC) | Time Travel forensics, fail-safe recovery procedures, post-incident reporting |
Architecture#
The skill teaches agents to build a complete security operations pipeline inside Snowflake:
┌──────────────────────────────────────────────────────────────────────┐
│ DATA SOURCES │
│ Cloud Logs ─┐ App Logs ─┐ Auth Logs ─┐ Network ─┐ Endpoints ─┐│
└──────────────┼────────────┼─────────────┼───────────┼──────────────┘│
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌──────────────────────────────────────────────────────────────────────┐
│ INGESTION (Snowpipe / REST API) │
│ Streaming ingest · Auto-ingest · Kafka connector │
└─────────────────────────────────┬────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────┐
│ DYNAMIC TABLE DETECTION PIPELINE │
│ │
│ ┌────────────────┐ ┌────────────────┐ ┌─────────────────────┐ │
│ │ RAW EVENTS │──▶│ NORMALIZED │──▶│ ENRICHED + SCORED │ │
│ │ (1-min lag) │ │ (2-min lag) │ │ (5-min lag) │ │
│ └────────────────┘ └────────────────┘ └────────┬────────────┘ │
│ │ │
│ ┌─────────▼────────────┐ │
│ │ SECURITY ALERTS │ │
│ │ (threshold breach) │ │
│ └─────────┬────────────┘ │
└──────────────────────────────────────────────────────┼───────────────┘
│
┌──────────────────────────────────┬┼──────────────┐
▼ ▼ ▼▼ ▼
┌─────────────┐ ┌────────────┐ ┌──────────────┐ ┌───────────┐
│ Interactive │ │ Cortex │ │ Streamlit │ │ Webhook │
│ Tables │ │ Agents │ │ Dashboard │ │ (SIEM) │
└─────────────┘ └────────────┘ └──────────────┘ └───────────┘Data flows in from any source through Snowpipe or REST API. Dynamic Tables chain together to normalize, enrich, score, and alert — with configurable lag from 1 to 5 minutes. Outputs go to Interactive Tables for analyst review, Cortex agents for natural language querying, Streamlit dashboards for visual monitoring, and webhooks for integration with your existing SIEM.
Detection Capabilities#
The skill covers five core detection categories, each with ready-to-use SQL patterns.
Brute Force Detection#
Identifies accounts under sustained password guessing attacks by correlating failed login attempts within a sliding time window:
CREATE OR REPLACE DYNAMIC TABLE security_db.detections.brute_force_alerts
TARGET_LAG = '2 minutes'
WAREHOUSE = security_wh
AS
SELECT
user_name,
client_ip,
COUNT(*) AS failed_attempts,
MIN(event_timestamp) AS window_start,
MAX(event_timestamp) AS window_end,
DATEDIFF('second', MIN(event_timestamp), MAX(event_timestamp)) AS window_seconds,
'BRUTE_FORCE' AS detection_type,
CASE
WHEN COUNT(*) >= 20 THEN 'CRITICAL'
WHEN COUNT(*) >= 10 THEN 'HIGH'
ELSE 'MEDIUM'
END AS severity
FROM snowflake.account_usage.login_history
WHERE is_success = 'NO'
AND event_timestamp > DATEADD('hour', -1, CURRENT_TIMESTAMP())
GROUP BY user_name, client_ip
HAVING COUNT(*) >= 5
AND DATEDIFF('second', MIN(event_timestamp), MAX(event_timestamp)) < 600;Credential Stuffing Detection#
Differentiates credential stuffing from brute force by looking for many distinct usernames failing from the same source — the hallmark of attackers testing stolen credential lists:
CREATE OR REPLACE DYNAMIC TABLE security_db.detections.credential_stuffing_alerts
TARGET_LAG = '2 minutes'
WAREHOUSE = security_wh
AS
SELECT
client_ip,
COUNT(DISTINCT user_name) AS distinct_users_targeted,
COUNT(*) AS total_attempts,
MIN(event_timestamp) AS window_start,
MAX(event_timestamp) AS window_end,
'CREDENTIAL_STUFFING' AS detection_type,
CASE
WHEN COUNT(DISTINCT user_name) >= 20 THEN 'CRITICAL'
WHEN COUNT(DISTINCT user_name) >= 10 THEN 'HIGH'
ELSE 'MEDIUM'
END AS severity
FROM snowflake.account_usage.login_history
WHERE is_success = 'NO'
AND event_timestamp > DATEADD('hour', -1, CURRENT_TIMESTAMP())
GROUP BY client_ip
HAVING COUNT(DISTINCT user_name) >= 5;Impossible Travel Detection#
Flags users whose login locations change faster than physically possible — a strong indicator of credential compromise:
CREATE OR REPLACE DYNAMIC TABLE security_db.detections.impossible_travel_alerts
TARGET_LAG = '5 minutes'
WAREHOUSE = security_wh
AS
WITH login_pairs AS (
SELECT
user_name,
reported_client_type,
client_ip AS current_ip,
LAG(client_ip) OVER (PARTITION BY user_name ORDER BY event_timestamp) AS prev_ip,
event_timestamp AS current_time,
LAG(event_timestamp) OVER (PARTITION BY user_name ORDER BY event_timestamp) AS prev_time,
-- Geo lookup from enrichment table or IP-to-location UDF
reported_client_type AS current_location,
DATEDIFF('minute',
LAG(event_timestamp) OVER (PARTITION BY user_name ORDER BY event_timestamp),
event_timestamp
) AS minutes_between_logins
FROM snowflake.account_usage.login_history
WHERE is_success = 'YES'
AND event_timestamp > DATEADD('day', -1, CURRENT_TIMESTAMP())
)
SELECT
user_name,
prev_ip,
current_ip,
prev_time,
current_time,
minutes_between_logins,
'IMPOSSIBLE_TRAVEL' AS detection_type,
'HIGH' AS severity
FROM login_pairs
WHERE prev_ip IS NOT NULL
AND current_ip != prev_ip
AND minutes_between_logins < 30;Data Exfiltration Detection#
Monitors for unusual data movement patterns — bulk exports, off-hours transfers, and abnormal volumes that may indicate insider threats or compromised accounts:
CREATE OR REPLACE DYNAMIC TABLE security_db.detections.data_exfiltration_alerts
TARGET_LAG = '5 minutes'
WAREHOUSE = security_wh
AS
WITH user_baselines AS (
SELECT
user_name,
AVG(rows_produced) AS avg_rows,
STDDEV(rows_produced) AS stddev_rows
FROM snowflake.account_usage.query_history
WHERE start_time > DATEADD('day', -30, CURRENT_TIMESTAMP())
GROUP BY user_name
)
SELECT
qh.user_name,
qh.query_id,
qh.query_text,
qh.rows_produced,
qh.start_time,
ub.avg_rows AS baseline_avg,
(qh.rows_produced - ub.avg_rows) / NULLIF(ub.stddev_rows, 0) AS z_score,
'DATA_EXFILTRATION' AS detection_type,
CASE
WHEN (qh.rows_produced - ub.avg_rows) / NULLIF(ub.stddev_rows, 0) > 5 THEN 'CRITICAL'
WHEN (qh.rows_produced - ub.avg_rows) / NULLIF(ub.stddev_rows, 0) > 3 THEN 'HIGH'
ELSE 'MEDIUM'
END AS severity
FROM snowflake.account_usage.query_history qh
JOIN user_baselines ub ON qh.user_name = ub.user_name
WHERE qh.start_time > DATEADD('hour', -1, CURRENT_TIMESTAMP())
AND qh.rows_produced > ub.avg_rows + (3 * ub.stddev_rows)
AND qh.rows_produced > 10000;Privilege Escalation Detection#
Catches role grants and privilege changes that deviate from normal administrative patterns — including grants to sensitive roles like ACCOUNTADMIN:
CREATE OR REPLACE DYNAMIC TABLE security_db.detections.privilege_escalation_alerts
TARGET_LAG = '2 minutes'
WAREHOUSE = security_wh
AS
SELECT
qh.user_name AS granting_user,
qh.role_name AS granting_role,
qh.query_text,
qh.start_time,
'PRIVILEGE_ESCALATION' AS detection_type,
CASE
WHEN UPPER(qh.query_text) LIKE '%ACCOUNTADMIN%' THEN 'CRITICAL'
WHEN UPPER(qh.query_text) LIKE '%SECURITYADMIN%' THEN 'HIGH'
WHEN UPPER(qh.query_text) LIKE '%SYSADMIN%' THEN 'HIGH'
ELSE 'MEDIUM'
END AS severity
FROM snowflake.account_usage.query_history qh
WHERE qh.start_time > DATEADD('hour', -1, CURRENT_TIMESTAMP())
AND (
UPPER(qh.query_text) LIKE '%GRANT%ROLE%TO%'
OR UPPER(qh.query_text) LIKE '%GRANT%OWNERSHIP%'
OR UPPER(qh.query_text) LIKE '%ALTER%USER%DEFAULT_ROLE%'
)
AND qh.execution_status = 'SUCCESS';15-Minute Demo#
The repository includes a complete demo deployment that generates realistic security events — including injected attack patterns — so you can see every detection pipeline in action without needing real production data.
The demo runs through five scripts, executed in order:
Step 1: Setup Security Schema#
01_setup_security_schema.sqlCreates the SECURITY_DB database, schemas for raw events, normalised data, detections, and alerting. Sets up the warehouse, roles, and grants needed for the pipeline.
Step 2: Generate Demo Data#
02_generate_demo_data.sqlGenerates 10,000+ realistic security events including:
- Normal login patterns across business hours
- Brute force attacks against specific accounts
- Credential stuffing from known malicious IPs
- Impossible travel scenarios
- Bulk data extraction queries
- Privilege escalation attempts
The injected attack patterns are designed to trigger every detection type so you can validate the pipeline end-to-end.
Step 3: Detection Pipeline#
03_detection_pipeline.sqlCreates the Dynamic Table chain — raw ingestion, normalisation, enrichment, scoring, and alerting. Each table feeds the next with configurable lag targets from 1 to 5 minutes.
Step 4: Interactive Tables#
04_interactive_tables.sqlSets up Interactive Tables for analyst review. Security analysts can view, filter, sort, and acknowledge alerts directly in Snowsight — no external tools required.
Step 5: Streamlit Dashboard#
05_streamlit_dashboard.pyDeploys a Streamlit-in-Snowflake dashboard with:
- Real-time alert feed with severity indicators
- Detection type breakdown charts
- Timeline visualisations of attack patterns
- Drill-down from alert to raw event detail
Dynamic Table Pipeline#
The core of the detection architecture is a three-stage Dynamic Table pipeline. Each stage reads from the previous one, and Snowflake manages the refresh automatically — no orchestration, no scheduling, no external tooling.
Stage 1: Raw Ingestion#
The first Dynamic Table captures events from Snowflake’s built-in security views and any external sources, with a 1-minute target lag:
CREATE OR REPLACE DYNAMIC TABLE security_db.pipeline.raw_events
TARGET_LAG = '1 minute'
WAREHOUSE = security_wh
AS
SELECT
event_timestamp,
event_type,
user_name,
client_ip,
reported_client_type,
is_success,
error_code,
error_message,
CURRENT_TIMESTAMP() AS ingested_at
FROM snowflake.account_usage.login_history
WHERE event_timestamp > DATEADD('day', -7, CURRENT_TIMESTAMP());Stage 2: Normalisation and Enrichment#
The second stage normalises event formats, joins with enrichment data (IP reputation, user metadata, geo-location), and applies initial classification:
CREATE OR REPLACE DYNAMIC TABLE security_db.pipeline.enriched_events
TARGET_LAG = '2 minutes'
WAREHOUSE = security_wh
AS
SELECT
r.event_timestamp,
r.event_type,
r.user_name,
r.client_ip,
r.is_success,
u.email AS user_email,
u.default_role AS user_default_role,
u.has_mfa AS user_has_mfa,
CASE
WHEN r.is_success = 'NO' THEN 'authentication_failure'
WHEN r.reported_client_type = 'OTHER' THEN 'suspicious_client'
ELSE 'normal'
END AS event_classification,
r.ingested_at
FROM security_db.pipeline.raw_events r
LEFT JOIN security_db.reference.user_directory u
ON r.user_name = u.user_name;Stage 3: Alert Generation#
The final stage applies detection logic, scores severity, and produces actionable alerts:
CREATE OR REPLACE DYNAMIC TABLE security_db.pipeline.security_alerts
TARGET_LAG = '5 minutes'
WAREHOUSE = security_wh
AS
SELECT
alert_id,
detection_type,
severity,
affected_user,
source_ip,
event_count,
first_seen,
last_seen,
description,
recommended_action,
mitre_tactic,
mitre_technique,
owasp_category,
nist_function,
FALSE AS acknowledged,
NULL AS acknowledged_by,
NULL AS acknowledged_at,
CURRENT_TIMESTAMP() AS alert_created_at
FROM (
SELECT * FROM security_db.detections.brute_force_alerts
UNION ALL
SELECT * FROM security_db.detections.credential_stuffing_alerts
UNION ALL
SELECT * FROM security_db.detections.impossible_travel_alerts
UNION ALL
SELECT * FROM security_db.detections.data_exfiltration_alerts
UNION ALL
SELECT * FROM security_db.detections.privilege_escalation_alerts
);The result: events flow from source to actionable alert in 1 to 5 minutes, with zero orchestration code. Dynamic Tables handle refresh scheduling, dependency ordering, and incremental processing automatically.
Natural Language Security Queries#
Once the skill is loaded, your AI coding agent becomes a security analyst you can talk to. Example prompts:
Threat Hunting
"Are there any brute force attempts in the last 24 hours?"
"Show me credential stuffing patterns grouped by source IP"
"Hunt for impossible travel across all users this week"
"Find any accounts with failed logins followed by a successful login within 5 minutes"Compliance and Posture
"What's our OWASP Top 10 exposure right now?"
"Generate a NIST CSF 2.0 Detect function compliance report"
"Show me all MITRE ATT&CK Initial Access detections this month"
"Which users don't have MFA enabled and have ACCOUNTADMIN access?"Incident Response
"Show me privilege escalation events this week and who granted them"
"Build a timeline of all activity for user JOHN_DOE in the last 48 hours"
"Create a forensic report for the credential stuffing attack from 10.0.0.5"
"Disable the compromised account and generate an incident summary"Pipeline and Dashboard
"Create a new detection rule for off-hours data exports"
"Add a Streamlit chart showing alert trends by severity over the last 30 days"
"Set up a webhook notification for all CRITICAL alerts"
"Build an Interactive Table for the SOC team to triage brute force alerts"The agent generates the SQL, Python, or configuration — you review and run it. Every response is grounded in the frameworks and patterns encoded in the skill.
Integration with Your SIEM#
The detection pipeline produces structured alerts that can be forwarded to any downstream system. The webhook output stage sends JSON payloads to your SIEM, XDR, or incident management platform — Splunk, Microsoft Sentinel, CrowdStrike, ServiceNow, PagerDuty, or any system with a REST API.
For a detailed walkthrough of wiring Snowflake alerts to your SIEM via webhook notifications, see the companion article: How to Let Snowflake Raise Security Incidents in Your SIEM or XDR Automatically.
Getting Started#
Clone the repository and deploy the demo:
git clone https://github.com/sfc-gh-kkeller/security-ops-skill.git
cd security-ops-skillFor the AI skill, copy SKILL.md into your project and start prompting. For the demo, run the five SQL scripts in order in a Snowsight worksheet. The entire deployment takes about 15 minutes.
Related#
- How to Let Snowflake Raise Security Incidents in Your SIEM or XDR Automatically — webhook-based alert forwarding to Splunk, Sentinel, and other SIEMs
- How JWT Tokens Work — understanding the authentication tokens that secure API integrations
- Governing AI Inference in the Data Cloud — security architecture for AI workloads
- Containing AI Agents in Production — sandboxing and security controls for autonomous agents
