Skip to content

Connector

The Connector exports data from ByteFreezer parquet files into external systems — Elasticsearch, Splunk, webhooks, or any custom destination.

Why Connector?

ByteFreezer stores all your data as Parquet in S3. That's the cheapest, most durable format for long-term retention. But you still need some data in your SIEM, search engine, or data warehouse for active investigation.

The Connector lets you export only what you need — filtered, transformed, on your schedule. Instead of sending everything to Elasticsearch at $3/GB/day, send 5% of your data and keep the rest in Parquet at $0.023/GB/month.

packer --> parquet (S3/MinIO)
                |
          [CONNECTOR] --> Elasticsearch / Splunk / webhook / stdout

How It Works

  1. Connector fetches S3 credentials from the ByteFreezer control API (per-dataset)
  2. DuckDB queries parquet files directly over S3 using the httpfs extension
  3. You write a SQL query that selects exactly the data you want
  4. Results are batched and sent to the configured destination
  5. A cursor file tracks progress for at-least-once delivery

Three Modes

Mode Description Use Case
interactive Web UI at http://localhost:8090 Explore datasets, test queries, preview exports
batch Run query once, export, exit Scheduled exports via cron
watch Poll for new data on a timer Continuous streaming to SIEM

Built-in Destinations

Destination Description
stdout JSON lines to stdout (default, useful for testing/piping)
elasticsearch Elasticsearch bulk API
webhook Generic HTTP POST to any endpoint

Quick Start

1. Get the binary

# Docker
docker pull ghcr.io/bytefreezer/bytefreezer-connector:latest

# Or build from source
git clone https://github.com/bytefreezer/connector.git
cd connector
go build -o bytefreezer-connector .

2. Configure

Create config.yaml:

control:
  url: "https://api.bytefreezer.com"
  api_key: "your-service-key"
  account_id: "your-account-id"

query:
  tenant_id: "your-tenant-id"
  dataset_id: "your-dataset-id"
  sql: >
    SELECT timestamp, source_ip, message
    FROM read_parquet('PARQUET_PATH', hive_partitioning=true, union_by_name=true)
    WHERE severity >= 4
    LIMIT 1000

destination:
  type: stdout
  config: {}

3. Run

# Interactive mode (web UI)
./bytefreezer-connector --config config.yaml

# Batch mode (run once)
./bytefreezer-connector --config config.yaml --mode batch

# Watch mode (continuous)
./bytefreezer-connector --config config.yaml --mode watch

SQL Queries

Use PARQUET_PATH as a placeholder in your SQL. The connector replaces it with the actual S3 glob path for the dataset.

-- All records (limited)
SELECT * FROM read_parquet('PARQUET_PATH', hive_partitioning=true, union_by_name=true)
LIMIT 100

-- Filter by time partition
SELECT * FROM read_parquet('PARQUET_PATH', hive_partitioning=true, union_by_name=true)
WHERE year = 2026 AND month = 3 AND day = 5

-- Specific fields only (reduces data transfer)
SELECT timestamp, source_ip, message
FROM read_parquet('PARQUET_PATH', hive_partitioning=true, union_by_name=true)
WHERE severity >= 4

-- Aggregation
SELECT source_ip, COUNT(*) as count
FROM read_parquet('PARQUET_PATH', hive_partitioning=true, union_by_name=true)
GROUP BY source_ip
ORDER BY count DESC
LIMIT 50

Destination Examples

Elasticsearch

destination:
  type: elasticsearch
  config:
    url: "http://localhost:9200"
    index: "security-alerts"
    username: "elastic"
    password: "changeme"

Webhook

destination:
  type: webhook
  config:
    url: "https://your-endpoint.com/ingest"
    method: "POST"
    headers:
      Authorization: "Bearer your-token"
      Content-Type: "application/json"

Adding Custom Destinations

The connector is designed as a starter project. Fork it and add your own destinations.

Create destinations/your_dest.go:

package destinations

import (
    "context"
    "github.com/bytefreezer/connector/connector"
)

func init() {
    connector.RegisterDestination("your_dest", func() connector.Destination {
        return &YourDestination{}
    })
}

type YourDestination struct{}

func (d *YourDestination) Name() string                                       { return "your_dest" }
func (d *YourDestination) Init(config map[string]interface{}) error           { return nil }
func (d *YourDestination) Send(ctx context.Context, batch connector.Batch) error { return nil }
func (d *YourDestination) Close() error                                       { return nil }

The init() function auto-registers the destination. No other changes needed.

Configuration Reference

logging:
  level: info                    # debug, info, warn, error

server:
  port: 8090                     # HTTP port for interactive mode web UI

control:
  url: "https://api.bytefreezer.com"
  api_key: ""                    # API key or service key
  account_id: ""                 # Your account ID

health_reporting:
  enabled: true
  report_interval: 30            # Seconds between health reports

query:
  tenant_id: ""                  # Required for batch/watch
  dataset_id: ""                 # Required for batch/watch
  sql: ""                        # SQL query with PARQUET_PATH placeholder

destination:
  type: stdout                   # stdout, elasticsearch, webhook
  config: {}                     # Destination-specific config

cursor:
  file: cursor.json              # Tracks export progress

schedule:
  interval_seconds: 60           # Watch mode poll interval
  batch_size: 1000               # Records per batch sent to destination

Health Reporting

The connector registers with the ByteFreezer control plane as bytefreezer-connector. In watch and interactive modes, it reports health every 30 seconds and appears on the Health page in the UI.

In batch mode, the process exits before the first health report cycle, so it won't appear on the Health page. This is expected.

MCP Tools

Use these ByteFreezer MCP tools to discover datasets and schema before writing queries:

Tool Purpose
bf_whoami Get your account_id
bf_list_tenants List tenants for your account
bf_list_datasets List datasets for a tenant
bf_dataset_parquet_files List parquet files for a dataset
bf_transformation_schema Get field names and types
bf_dataset_statistics Get record counts and sizes

Source Code

github.com/bytefreezer/connector