Skip to content

Processing

The Piper component processes data through a pipeline of transformations, enrichers, filters, and samplers.

Pipeline Overview

Incoming Events
┌─────────────┐
│   Filter    │  ──▶  Drop unwanted events
└──────┬──────┘
┌─────────────┐
│   Sample    │  ──▶  Reduce volume (e.g., 10%)
└──────┬──────┘
┌─────────────┐
│  Transform  │  ──▶  Modify fields, parse, rename
└──────┬──────┘
┌─────────────┐
│   Enrich    │  ──▶  Add geo data, lookups
└──────┬──────┘
Processed Events → Storage

Filters

Filters drop events that match (or don't match) specified criteria.

filters:
  # Drop debug logs
  - type: drop
    condition: "level == 'debug'"

  # Keep only specific sources
  - type: keep
    condition: "source in ['firewall', 'ids', 'waf']"

  # Drop by field presence
  - type: drop
    condition: "internal_test == true"

Filter Conditions

Operator Example Description
== level == 'error' Exact match
!= status != 200 Not equal
in source in ['a', 'b'] Value in list
contains message contains 'failed' Substring match
matches ip matches '^10\.' Regex match
exists user_id exists Field is present

Sampling

Reduce data volume while maintaining statistical visibility.

sampling:
  # Random sampling - keep 10% of events
  - type: random
    rate: 0.1

  # Hash-based sampling - consistent sampling per user
  - type: hash
    field: user_id
    rate: 0.1

  # Rate limiting - max 1000 events/second
  - type: rate_limit
    events_per_second: 1000

Transformations

Modify event data before storage.

transformations:
  # Rename field
  - type: rename
    from: src_ip
    to: source_ip

  # Parse JSON string
  - type: parse_json
    field: raw_data
    target: parsed

  # Extract with regex
  - type: extract
    field: message
    pattern: 'user=(\w+)'
    target: username

  # Add computed field
  - type: add_field
    field: processed_at
    value: "${timestamp}"

  # Remove sensitive field
  - type: remove
    fields: [password, token, secret]

Transformation Types

Type Description
rename Rename a field
remove Delete field(s)
add_field Add new field with value
parse_json Parse JSON string to object
parse_kv Parse key=value pairs
extract Extract with regex
lowercase Convert to lowercase
uppercase Convert to uppercase
trim Remove whitespace
split Split string to array

Enrichers

Add context to events from external data sources.

Geo Enrichment

Add geographic information based on IP addresses.

enrichers:
  - type: geo
    field: source_ip
    target: geo
    # Adds: geo.country, geo.city, geo.lat, geo.lon

Result:

{
  "source_ip": "8.8.8.8",
  "geo": {
    "country": "US",
    "country_name": "United States",
    "city": "Mountain View",
    "lat": 37.4056,
    "lon": -122.0775
  }
}

Lookup Enrichment

Enrich from lookup tables (CSV, database).

enrichers:
  - type: lookup
    field: user_id
    table: users
    target: user
    # Adds: user.name, user.department, user.role

Custom Enrichment

Run custom logic via HTTP or script.

enrichers:
  - type: http
    url: https://threat-intel.example.com/lookup
    field: source_ip
    target: threat

  - type: script
    script: |
      if event.bytes > 1000000:
        event.size_category = 'large'
      else:
        event.size_category = 'normal'

Pipeline Configuration

Complete pipeline example:

pipeline:
  name: security-events

  filters:
    - type: drop
      condition: "level == 'debug'"

  sampling:
    - type: random
      rate: 1.0  # Keep all (no sampling)

  transformations:
    - type: rename
      from: src
      to: source_ip
    - type: remove
      fields: [internal_id]

  enrichers:
    - type: geo
      field: source_ip
      target: geo

Best Practices

  1. Filter early - Drop unwanted data before expensive processing
  2. Sample wisely - Use hash-based sampling for consistent user journeys
  3. Enrich selectively - Only enrich fields you'll query
  4. Test pipelines - Use preview mode before deploying

See also: