Processing¶
The Piper component processes data through a pipeline of transformations, enrichers, filters, and samplers.
Pipeline Overview¶
Incoming Events
│
▼
┌─────────────┐
│ Filter │ ──▶ Drop unwanted events
└──────┬──────┘
│
▼
┌─────────────┐
│ Sample │ ──▶ Reduce volume (e.g., 10%)
└──────┬──────┘
│
▼
┌─────────────┐
│ Transform │ ──▶ Modify fields, parse, rename
└──────┬──────┘
│
▼
┌─────────────┐
│ Enrich │ ──▶ Add geo data, lookups
└──────┬──────┘
│
▼
Processed Events → Storage
Filters¶
Filters drop events that match (or don't match) specified criteria.
filters:
# Drop debug logs
- type: drop
condition: "level == 'debug'"
# Keep only specific sources
- type: keep
condition: "source in ['firewall', 'ids', 'waf']"
# Drop by field presence
- type: drop
condition: "internal_test == true"
Filter Conditions¶
| Operator | Example | Description |
|---|---|---|
== | level == 'error' | Exact match |
!= | status != 200 | Not equal |
in | source in ['a', 'b'] | Value in list |
contains | message contains 'failed' | Substring match |
matches | ip matches '^10\.' | Regex match |
exists | user_id exists | Field is present |
Sampling¶
Reduce data volume while maintaining statistical visibility.
sampling:
# Random sampling - keep 10% of events
- type: random
rate: 0.1
# Hash-based sampling - consistent sampling per user
- type: hash
field: user_id
rate: 0.1
# Rate limiting - max 1000 events/second
- type: rate_limit
events_per_second: 1000
Transformations¶
Modify event data before storage.
transformations:
# Rename field
- type: rename
from: src_ip
to: source_ip
# Parse JSON string
- type: parse_json
field: raw_data
target: parsed
# Extract with regex
- type: extract
field: message
pattern: 'user=(\w+)'
target: username
# Add computed field
- type: add_field
field: processed_at
value: "${timestamp}"
# Remove sensitive field
- type: remove
fields: [password, token, secret]
Transformation Types¶
| Type | Description |
|---|---|
rename | Rename a field |
remove | Delete field(s) |
add_field | Add new field with value |
parse_json | Parse JSON string to object |
parse_kv | Parse key=value pairs |
extract | Extract with regex |
lowercase | Convert to lowercase |
uppercase | Convert to uppercase |
trim | Remove whitespace |
split | Split string to array |
Enrichers¶
Add context to events from external data sources.
Geo Enrichment¶
Add geographic information based on IP addresses.
Result:
{
"source_ip": "8.8.8.8",
"geo": {
"country": "US",
"country_name": "United States",
"city": "Mountain View",
"lat": 37.4056,
"lon": -122.0775
}
}
Lookup Enrichment¶
Enrich from lookup tables (CSV, database).
enrichers:
- type: lookup
field: user_id
table: users
target: user
# Adds: user.name, user.department, user.role
Custom Enrichment¶
Run custom logic via HTTP or script.
enrichers:
- type: http
url: https://threat-intel.example.com/lookup
field: source_ip
target: threat
- type: script
script: |
if event.bytes > 1000000:
event.size_category = 'large'
else:
event.size_category = 'normal'
Pipeline Configuration¶
Complete pipeline example:
pipeline:
name: security-events
filters:
- type: drop
condition: "level == 'debug'"
sampling:
- type: random
rate: 1.0 # Keep all (no sampling)
transformations:
- type: rename
from: src
to: source_ip
- type: remove
fields: [internal_id]
enrichers:
- type: geo
field: source_ip
target: geo
Best Practices¶
- Filter early - Drop unwanted data before expensive processing
- Sample wisely - Use hash-based sampling for consistent user journeys
- Enrich selectively - Only enrich fields you'll query
- Test pipelines - Use preview mode before deploying
See also: