Connector¶
The Connector exports data from ByteFreezer parquet files into external systems — Elasticsearch, Splunk, webhooks, or any custom destination.
Why Connector?¶
ByteFreezer stores all your data as Parquet in S3. That's the cheapest, most durable format for long-term retention. But you still need some data in your SIEM, search engine, or data warehouse for active investigation.
The Connector lets you export only what you need — filtered, transformed, on your schedule. Instead of sending everything to Elasticsearch at $3/GB/day, send 5% of your data and keep the rest in Parquet at $0.023/GB/month.
How It Works¶
- Connector fetches S3 credentials from the ByteFreezer control API (per-dataset)
- DuckDB queries parquet files directly over S3 using the httpfs extension
- You write a SQL query that selects exactly the data you want
- Results are batched and sent to the configured destination
- A cursor file tracks progress for at-least-once delivery
Three Modes¶
| Mode | Description | Use Case |
|---|---|---|
| interactive | Web UI at http://localhost:8090 | Explore datasets, test queries, preview exports |
| batch | Run query once, export, exit | Scheduled exports via cron |
| watch | Poll for new data on a timer | Continuous streaming to SIEM |
Built-in Destinations¶
| Destination | Description |
|---|---|
| stdout | JSON lines to stdout (default, useful for testing/piping) |
| elasticsearch | Elasticsearch bulk API |
| webhook | Generic HTTP POST to any endpoint |
Quick Start¶
1. Get the binary¶
# Docker
docker pull ghcr.io/bytefreezer/bytefreezer-connector:latest
# Or build from source
git clone https://github.com/bytefreezer/connector.git
cd connector
go build -o bytefreezer-connector .
2. Configure¶
Create config.yaml:
control:
url: "https://api.bytefreezer.com"
api_key: "your-service-key"
account_id: "your-account-id"
query:
tenant_id: "your-tenant-id"
dataset_id: "your-dataset-id"
sql: >
SELECT timestamp, source_ip, message
FROM read_parquet('PARQUET_PATH', hive_partitioning=true, union_by_name=true)
WHERE severity >= 4
LIMIT 1000
destination:
type: stdout
config: {}
3. Run¶
# Interactive mode (web UI)
./bytefreezer-connector --config config.yaml
# Batch mode (run once)
./bytefreezer-connector --config config.yaml --mode batch
# Watch mode (continuous)
./bytefreezer-connector --config config.yaml --mode watch
SQL Queries¶
Use PARQUET_PATH as a placeholder in your SQL. The connector replaces it with the actual S3 glob path for the dataset.
-- All records (limited)
SELECT * FROM read_parquet('PARQUET_PATH', hive_partitioning=true, union_by_name=true)
LIMIT 100
-- Filter by time partition
SELECT * FROM read_parquet('PARQUET_PATH', hive_partitioning=true, union_by_name=true)
WHERE year = 2026 AND month = 3 AND day = 5
-- Specific fields only (reduces data transfer)
SELECT timestamp, source_ip, message
FROM read_parquet('PARQUET_PATH', hive_partitioning=true, union_by_name=true)
WHERE severity >= 4
-- Aggregation
SELECT source_ip, COUNT(*) as count
FROM read_parquet('PARQUET_PATH', hive_partitioning=true, union_by_name=true)
GROUP BY source_ip
ORDER BY count DESC
LIMIT 50
Destination Examples¶
Elasticsearch¶
destination:
type: elasticsearch
config:
url: "http://localhost:9200"
index: "security-alerts"
username: "elastic"
password: "changeme"
Webhook¶
destination:
type: webhook
config:
url: "https://your-endpoint.com/ingest"
method: "POST"
headers:
Authorization: "Bearer your-token"
Content-Type: "application/json"
Adding Custom Destinations¶
The connector is designed as a starter project. Fork it and add your own destinations.
Create destinations/your_dest.go:
package destinations
import (
"context"
"github.com/bytefreezer/connector/connector"
)
func init() {
connector.RegisterDestination("your_dest", func() connector.Destination {
return &YourDestination{}
})
}
type YourDestination struct{}
func (d *YourDestination) Name() string { return "your_dest" }
func (d *YourDestination) Init(config map[string]interface{}) error { return nil }
func (d *YourDestination) Send(ctx context.Context, batch connector.Batch) error { return nil }
func (d *YourDestination) Close() error { return nil }
The init() function auto-registers the destination. No other changes needed.
Configuration Reference¶
logging:
level: info # debug, info, warn, error
server:
port: 8090 # HTTP port for interactive mode web UI
control:
url: "https://api.bytefreezer.com"
api_key: "" # API key or service key
account_id: "" # Your account ID
health_reporting:
enabled: true
report_interval: 30 # Seconds between health reports
query:
tenant_id: "" # Required for batch/watch
dataset_id: "" # Required for batch/watch
sql: "" # SQL query with PARQUET_PATH placeholder
destination:
type: stdout # stdout, elasticsearch, webhook
config: {} # Destination-specific config
cursor:
file: cursor.json # Tracks export progress
schedule:
interval_seconds: 60 # Watch mode poll interval
batch_size: 1000 # Records per batch sent to destination
Health Reporting¶
The connector registers with the ByteFreezer control plane as bytefreezer-connector. In watch and interactive modes, it reports health every 30 seconds and appears on the Health page in the UI.
In batch mode, the process exits before the first health report cycle, so it won't appear on the Health page. This is expected.
MCP Tools¶
Use these ByteFreezer MCP tools to discover datasets and schema before writing queries:
| Tool | Purpose |
|---|---|
bf_whoami | Get your account_id |
bf_list_tenants | List tenants for your account |
bf_list_datasets | List datasets for a tenant |
bf_dataset_parquet_files | List parquet files for a dataset |
bf_transformation_schema | Get field names and types |
bf_dataset_statistics | Get record counts and sizes |