Processing¶
The Piper component processes data through a pipeline of transformations, enrichers, filters, and samplers. Piper runs after the Receiver stores raw data and before Packer converts to Parquet.
Pipeline Overview¶
Raw data from Receiver (S3)
│
▼
┌─────────────┐
│ Filter │ ──▶ Drop unwanted events
└──────┬──────┘
│
▼
┌─────────────┐
│ Sample │ ──▶ Reduce volume (e.g., 10%)
└──────┬──────┘
│
▼
┌─────────────┐
│ Transform │ ──▶ Modify fields, parse, rename
└──────┬──────┘
│
▼
┌─────────────┐
│ Enrich │ ──▶ Add geo data, lookups
└──────┬──────┘
│
▼
Processed data → S3 → Packer
Performance¶
Piper runs parallel workers per dataset. Worker count is configurable via the Control API or UI. Each worker processes batches independently, allowing horizontal scaling by adding more workers to high-volume datasets.
AI Configuration¶
Build transformation pipelines using the AI assistant in the UI:
- Navigate to a dataset's Pipeline tab
- Open the Agent tab
- Describe what you want in natural language (e.g., "drop all debug-level events and extract usernames from the message field")
- Review the generated pipeline configuration
- Test with sample data before activating
The AI assistant is also available via the /api/v1/ai/agent/chat endpoint for programmatic access.
Filters¶
Filters drop events that match (or don't match) specified criteria.
filters:
# Drop debug logs
- type: drop
condition: "level == 'debug'"
# Keep only specific sources
- type: keep
condition: "source in ['firewall', 'ids', 'waf']"
# Drop by field presence
- type: drop
condition: "internal_test == true"
Filter Conditions¶
| Operator | Example | Description |
|---|---|---|
== | level == 'error' | Exact match |
!= | status != 200 | Not equal |
in | source in ['a', 'b'] | Value in list |
contains | message contains 'failed' | Substring match |
matches | ip matches '^10\.' | Regex match |
exists | user_id exists | Field is present |
Sampling¶
Reduce data volume while maintaining statistical visibility.
sampling:
# Random sampling - keep 10% of events
- type: random
rate: 0.1
# Hash-based sampling - consistent sampling per user
- type: hash
field: user_id
rate: 0.1
# Rate limiting - max 1000 events/second
- type: rate_limit
events_per_second: 1000
Transformations¶
Modify event data before storage.
transformations:
# Rename field
- type: rename
from: src_ip
to: source_ip
# Parse JSON string
- type: parse_json
field: raw_data
target: parsed
# Extract with regex
- type: extract
field: message
pattern: 'user=(\w+)'
target: username
# Add computed field
- type: add_field
field: processed_at
value: "${timestamp}"
# Remove sensitive field
- type: remove
fields: [password, token, secret]
Transformation Types¶
| Type | Description |
|---|---|
rename | Rename a field |
remove | Delete field(s) |
add_field | Add new field with value |
parse_json | Parse JSON string to object |
parse_kv | Parse key=value pairs |
extract | Extract with regex |
lowercase | Convert to lowercase |
uppercase | Convert to uppercase |
trim | Remove whitespace |
split | Split string to array |
Enrichers¶
Add context to events from external data sources.
Geo Enrichment¶
Add geographic information based on IP addresses.
Result:
{
"source_ip": "8.8.8.8",
"geo": {
"country": "US",
"country_name": "United States",
"city": "Mountain View",
"lat": 37.4056,
"lon": -122.0775
}
}
Lookup Enrichment¶
Enrich from lookup tables (CSV, database).
enrichers:
- type: lookup
field: user_id
table: users
target: user
# Adds: user.name, user.department, user.role
Custom Enrichment¶
Run custom logic via HTTP or script.
enrichers:
- type: http
url: https://threat-intel.example.com/lookup
field: source_ip
target: threat
- type: script
script: |
if event.bytes > 1000000:
event.size_category = 'large'
else:
event.size_category = 'normal'
Pipeline Configuration¶
Complete pipeline example:
pipeline:
name: security-events
filters:
- type: drop
condition: "level == 'debug'"
sampling:
- type: random
rate: 1.0 # Keep all (no sampling)
transformations:
- type: rename
from: src
to: source_ip
- type: remove
fields: [internal_id]
enrichers:
- type: geo
field: source_ip
target: geo
Best Practices¶
- Filter early — drop unwanted data before expensive processing
- Sample wisely — use hash-based sampling for consistent user journeys
- Enrich selectively — only enrich fields you'll query
- Test pipelines — use preview mode before deploying
See also: