Skip to content

telefonicaid/kafnus-connect

Repository files navigation

FIWARE Incubating Coverage Status

Coverage badge scope: custom Java SMT unit tests only (HeaderRouter and MongoNamespacePrefix).

🛰️ Kafnus Connect

Kafnus Connect is the persistence layer of the Kafnus ecosystem — a modern, Kafka-based replacement for Cygnus in FIWARE smart city environments.

It provides ready-to-use Kafka Connect images with custom Single Message Transforms (SMTs) and pre-integrated sink connectors for PostGIS, MongoDB, and HTTP endpoints.

This project is part of FIWARE. For more information check the FIWARE Catalogue entry for the Core Context Management.

🐳 Docker Hub

⚙️ Overview

Kafnus Connect consumes processed NGSI events from Kafka topics (produced by Kafnus NGSI) and persists them into target datastores or APIs.

Supported sinks

  • 🗺️ PostGIS (via custom JDBC connector)
    • Forked and extended to handle GeoJSON geometries and NGSI-specific data structures.
  • 📦 MongoDB
    • Official MongoDB Kafka connector for JSON document storage.
  • 🌐 HTTP

🧱 Architecture

Kafka (processed topics)
       │
  Header Router (datamodels)
       │ 
       ▼
  Kafnus Connect (Kafka Connect)
   ├─ JDBC Sink (PostGIS)
   ├─ MongoDB Sink
   └─ HTTP Sink

The HeaderRouter SMT is responsible for dynamically resolving the destination database schema and table name at runtime based on NGSI headers and a configurable SQL datamodel, removing any SQL layout logic from upstream producers.

Each connector can be independently configured via environment variables or connect-distributed.properties.
Custom SMTs can be chained to transform headers or message formats before persistence.


🚀 Usage

Build locally

docker build -t telefonicaiot/kafnus-connect:latest .

Run example

docker run -d   --name kafnus-connect   -e CONNECT_BOOTSTRAP_SERVERS=kafka:9092   -e CONNECT_GROUP_ID=kafnus-connect   -e CONNECT_CONFIG_STORAGE_TOPIC=connect-configs   -e CONNECT_OFFSET_STORAGE_TOPIC=connect-offsets   -e CONNECT_STATUS_STORAGE_TOPIC=connect-status   telefonicaiot/kafnus-connect:latest

For complete examples, see the tests_end2end folder in the main Kafnus repository.


🧪 Testing

Integration and end-to-end testing are performed from the Kafnus NGSI repository, where complete data flow scenarios are executed using Testcontainers.

This repository also includes his own python tests (similar to Kafnus tests) and unit tests for the custom Java SMTs in src/kafnus-connect-smt/src/test/java, executed with Maven and JUnit 5.

Coverage is generated with JaCoCo for this SMT module and published to Coveralls from CI. This coverage reflects SMT unit tests (Java), while functional validation of the complete pipeline remains in the Python E2E suite.


🧰 Configuration & Extensions

  • Custom SMTs are available in src/kafnus-connect-smt/ covering JDBC and MongoDB sinks:
    • HeaderRouter: dynamic SQL routing for JDBC (schema/table resolution)
    • MongoNamespacePrefix: MongoDB database/collection prefixing
  • New sinks can be added by extending the base image and adding plugins under /usr/share/java/.
  • Monitoring via Prometheus JMX Exporter is supported out of the box.

For deeper technical details about how Kafnus Connect is configured, built, and extended — including:

  • Environment setup and logging configuration
  • Plugin management and sink registration
  • Supported sinks and custom SMTs
  • Usage of EnvVarConfigProvider for connector configuration

👉 See Technical Configuration Guide


🛠️ Logging

There is a way to change log level of kafnus-connect using logger API.

For example to change from default (info) to debug some clases related with http connector like HttpSinkTask, AbstractHttpSender, BasicAuthHttpSender you can use:

curl -s -X PUT -H "Content-Type: application/json" \
  http://localhost:8083/admin/loggers/io.aiven.kafka.connect.http.HttpSinkTask \
  -d '{"level":"DEBUG"}' | jq

curl -s -X PUT -H "Content-Type: application/json" \
  http://localhost:8083/admin/loggers/io.aiven.kafka.connect.http.sender.AbstractHttpSender \
  -d '{"level":"DEBUG"}' | jq

curl -s -X PUT -H "Content-Type: application/json" \
  http://localhost:8083/admin/loggers/io.aiven.kafka.connect.http.sender.BasicAuthHttpSender \
  -d '{"level":"DEBUG"}' | jq

📚 Documentation


🧭 Project structure note

This repository is part of the Kafnus ecosystem:


👥 Contributors

The list of contributors to the Kafnus-Connect project can be found in CONTRIBUTORS.md.

About

Connector component used by Kafnus

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors