Coverage badge scope: custom Java SMT unit tests only (HeaderRouter and MongoNamespacePrefix).
Kafnus Connect is the persistence layer of the Kafnus ecosystem — a modern, Kafka-based replacement for Cygnus in FIWARE smart city environments.
It provides ready-to-use Kafka Connect images with custom Single Message Transforms (SMTs) and pre-integrated sink connectors for PostGIS, MongoDB, and HTTP endpoints.
This project is part of FIWARE. For more information check the FIWARE Catalogue entry for the Core Context Management.
| 🐳 Docker Hub |
|---|
Kafnus Connect consumes processed NGSI events from Kafka topics (produced by Kafnus NGSI) and persists them into target datastores or APIs.
- 🗺️ PostGIS (via custom JDBC connector)
- Forked and extended to handle GeoJSON geometries and NGSI-specific data structures.
- 📦 MongoDB
- Official MongoDB Kafka connector for JSON document storage.
- 🌐 HTTP
- Aiven Open HTTP Connector for forwarding events to REST endpoints.
- Forked to handle 200 responses with errors
Kafka (processed topics)
│
Header Router (datamodels)
│
▼
Kafnus Connect (Kafka Connect)
├─ JDBC Sink (PostGIS)
├─ MongoDB Sink
└─ HTTP Sink
The HeaderRouter SMT is responsible for dynamically resolving the destination database schema and table name at runtime based on NGSI headers and a configurable SQL datamodel, removing any SQL layout logic from upstream producers.
Each connector can be independently configured via environment variables or connect-distributed.properties.
Custom SMTs can be chained to transform headers or message formats before persistence.
docker build -t telefonicaiot/kafnus-connect:latest .docker run -d --name kafnus-connect -e CONNECT_BOOTSTRAP_SERVERS=kafka:9092 -e CONNECT_GROUP_ID=kafnus-connect -e CONNECT_CONFIG_STORAGE_TOPIC=connect-configs -e CONNECT_OFFSET_STORAGE_TOPIC=connect-offsets -e CONNECT_STATUS_STORAGE_TOPIC=connect-status telefonicaiot/kafnus-connect:latestFor complete examples, see the
tests_end2endfolder in the main Kafnus repository.
Integration and end-to-end testing are performed from the Kafnus NGSI repository, where complete data flow scenarios are executed using Testcontainers.
This repository also includes his own python tests (similar to Kafnus tests) and unit tests for the custom Java SMTs in src/kafnus-connect-smt/src/test/java, executed with Maven and JUnit 5.
Coverage is generated with JaCoCo for this SMT module and published to Coveralls from CI. This coverage reflects SMT unit tests (Java), while functional validation of the complete pipeline remains in the Python E2E suite.
- Custom SMTs are available in
src/kafnus-connect-smt/covering JDBC and MongoDB sinks:HeaderRouter: dynamic SQL routing for JDBC (schema/table resolution)MongoNamespacePrefix: MongoDB database/collection prefixing
- New sinks can be added by extending the base image and adding plugins under
/usr/share/java/. - Monitoring via Prometheus JMX Exporter is supported out of the box.
For deeper technical details about how Kafnus Connect is configured, built, and extended — including:
- Environment setup and logging configuration
- Plugin management and sink registration
- Supported sinks and custom SMTs
- Usage of EnvVarConfigProvider for connector configuration
👉 See Technical Configuration Guide
There is a way to change log level of kafnus-connect using logger API.
For example to change from default (info) to debug some clases related with http connector like HttpSinkTask, AbstractHttpSender, BasicAuthHttpSender you can use:
curl -s -X PUT -H "Content-Type: application/json" \
http://localhost:8083/admin/loggers/io.aiven.kafka.connect.http.HttpSinkTask \
-d '{"level":"DEBUG"}' | jq
curl -s -X PUT -H "Content-Type: application/json" \
http://localhost:8083/admin/loggers/io.aiven.kafka.connect.http.sender.AbstractHttpSender \
-d '{"level":"DEBUG"}' | jq
curl -s -X PUT -H "Content-Type: application/json" \
http://localhost:8083/admin/loggers/io.aiven.kafka.connect.http.sender.BasicAuthHttpSender \
-d '{"level":"DEBUG"}' | jq
- Kafnus ecosystem overview
- PostGIS connector fork
- MongoDB connector docs
- Aiven HTTP Connector
- Aiven HTTP Connector forked
🧭 Project structure note
This repository is part of the Kafnus ecosystem:
The list of contributors to the Kafnus-Connect project can be found in
CONTRIBUTORS.md.