No description
- Rust 86.5%
- Nix 13.5%
Set TimeoutStopSec=60 to allow async flush operations to complete before systemd kills the process. Previously, the service was terminated before the consumer could flush buffered data. Fixes: No data written during redeployments despite graceful shutdown handler. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com> |
||
|---|---|---|
| src | ||
| .gitignore | ||
| Cargo.lock | ||
| Cargo.toml | ||
| flake.lock | ||
| flake.nix | ||
| module.nix | ||
| NIXOS.md | ||
| package.nix | ||
| README.md | ||
| rust-toolchain.toml | ||
Kafka Exporter
A high-performance Kafka message exporter that captures Kafka topic messages to Parquet files with Hive partitioning.
Features
- Multi-topic support: consume from multiple Kafka topics simultaneously
- Buffered writes: in-memory buffering with configurable size and age limits
- Atomic operations: safe file operations with temp file + rename pattern
- Hive partitioning: organized output by year/month/day
- Compression: Snappy-compressed Parquet files
- Metrics: Prometheus metrics for monitoring
- Replay support: HTTP API to reset consumer offsets and replay topics
Quick Start
docker build -t kafka-exporter .
docker run \
-e KAFKA_BOOTSTRAP_SERVERS=kafka:9092 \
-e TOPICS=sensor-readings,other-topic \
-e EXPORT_DIR=/data/exports \
-v data:/data \
kafka-exporter
Configuration
Environment variables:
KAFKA_BOOTSTRAP_SERVERS- Kafka bootstrap servers (default:localhost:9092)TOPICS- Comma-separated list of topic names to consume (required)EXPORT_DIR- Root export directory (default:/mnt/safepool/data/feeder-kafka-exporter)WORKDIR- Working directory for temp files (default:{EXPORT_DIR}/.tmp)MAX_FILE_SIZE_MB- Max buffer size before flush (default:256)FLUSH_INTERVAL_SECS- Max age before flush (default:3600)HOST- HTTP server bind address (default:127.0.0.1)PORT- HTTP server port (default:8090)
Output Format
Messages are written to Hive-partitioned Parquet files:
{EXPORT_DIR}/{topic}/year=YYYY/month=MM/day=DD/{uuid}.parquet
Parquet schema:
partition(int32) - Kafka partition numberoffset(int64) - Kafka offsetkey(binary, nullable) - Message keykafka_timestamp(timestamp[ms], nullable) - Broker timestampreceived_at(timestamp[ms]) - Consumer receive timepayload(large_binary) - Raw message payload
Querying
With DuckDB:
SELECT * FROM read_parquet('./exports/sensor-readings/**/*.parquet', hive_partitioning=true)
HTTP API
GET /healthz- Liveness probeGET /status- Status of all topicsPOST /replay/{topic}- Reset offsets and replay topic
Building
cargo build --release