Event-driven streaming workers that replace ChRIS CUBE's polling-based job observability with a push/event-driven architecture. This repository implements three core long-running Kafka worker processes — Event Forwarder, Status Consumer, and Log Consumer — and the surrounding infrastructure to demonstrate them working within proper event and log pipelines.
In the current ChRIS architecture, CUBE polls pfcon every 5 seconds for every active plugin instance to check status and retrieve logs. This generates O(N) API calls per poll cycle (where N = active jobs), producing excessive database queries, enqueued Celery tasks, and heavy pressure on the Kubernetes/Docker API.
This repository introduces a push-based alternative with two separate pipelines and event-driven workflow orchestration:
Event Pipeline (status changes):
Docker/K8s Runtime → Event Forwarder → Kafka [job-status-events] → Status Consumer → Celery → Celery Worker → PostgreSQL + Redis Pub/Sub
Log Pipeline (container output):
Docker/K8s Runtime → Fluent Bit → Kafka [job-logs] → Log Consumer → OpenSearch + Redis Pub/Sub
Event Forwarder → (delayed EOS marker) → Kafka [job-logs] → Log Consumer → Redis logs_flushed key
Workflow Orchestration (job lifecycle):
UI → POST /api/jobs/{id}/run → SSE Service → Celery start_workflow → pfcon (copy)
↓
Docker events → Event Forwarder → Kafka → Status Consumer → Celery process_job_status
↓
Workflow state machine:
copy → plugin → upload → delete → cleanup → completed
↓
cleanup_containers waits for logs_flushed → pfcon DELETE
Both pipelines feed into a real-time streaming layer with historical replay:
Redis Pub/Sub → SSE Service → Browser (EventSource)
PostgreSQL + OpenSearch → SSE Service → Browser (historical replay on connect)
-
Event Forwarder (
compute-event-forwarder) — Async daemon that watches Docker daemon events (or Kubernetes Job API) for ChRIS job containers, maps native container states to pfcon'sJobStatusenum (notStarted,started,finishedSuccessfully,finishedWithError,undefined), and produces structured status events to Kafka. For terminal events, also schedules delayed EOS (End-of-Stream) markers to thejob-logstopic as a best-effort hint that Fluent Bit has flushed all logs for a container (see the EOS section in ARCHITECTURE.md — cleanup does not depend on this alone). Stateless, idempotent, restart-safe with auto-reconnect. -
Status Consumer (
compute-status-consumer) — Kafka consumer that reads status events and schedules Celery tasks for DB persistence, Redis Pub/Sub publishing, terminal status confirmation, and workflow advancement. Failed messages go to a dead-letter topic after configurable retries. -
Log Consumer (
compute-logs-consumer) — Batched Kafka consumer that reads log events (produced by Fluent Bit and EOS markers from Event Forwarder), bulk-writes to OpenSearch for durable storage and search, and publishes to Redis Pub/Sub for real-time log streaming. When an EOS marker is received, flushes the current batch and sets a Redis key (job:{id}:{type}:logs_flushed) to signal that all logs have been written to OpenSearch. Configurable batch size and flush interval.
The repository also includes supporting infrastructure and a pilot test environment to demonstrate the full pipeline end-to-end:
- Kafka (KRaft mode) with SASL/PLAIN authentication, per-service users, and ACLs
- Fluent Bit reading Docker container logs, filtering by ChRIS labels, and forwarding to Kafka
- OpenSearch for log storage and historical replay
- Redis for Pub/Sub fan-out and Celery broker
- PostgreSQL for durable status tracking (written by the Celery Worker)
- SSE Service (FastAPI app) that streams events to browsers via SSE, replays historical events from PostgreSQL/OpenSearch on connect, and exposes REST endpoints for workflow submission and status queries
- Celery Worker that processes status confirmations, orchestrates the workflow state machine (copy → plugin → upload → delete → cleanup), calls pfcon to advance steps, honours pfcon's
requires_copy_job/requires_upload_jobflags to skip optional steps, and waits for log flush (or terminal-status quiescence) before container cleanup. At cleanup time it computes the overall workflow status (finishedSuccessfully,finishedWithError, orfailed) from the recorded per-step outcomes. - pfcon (
ghcr.io/fnndsc/pfcon:latest, which includesorg.chrisproject.job_typelabels) as the job control plane - Test UI for submitting jobs via the SSE service and watching status + logs stream in real-time
For a detailed view of all data flows, message schemas, Kafka topic design, resilience properties, and the confirmed status flow, see ARCHITECTURE.md.
chris_streaming_workers/
├── pyproject.toml # Python deps: aiokafka, aiodocker, pydantic, fastapi, etc.
├── docker-compose.yml # 14 services
├── .env # Credentials & config
├── .gitignore
├── .dockerignore
├── README.md
│
├── Dockerfile.event_forwarder # Image: localhost/fnndsc/compute-event-forwarder
├── Dockerfile.log_consumer # Image: localhost/fnndsc/compute-logs-consumer
├── Dockerfile.status_consumer # Image: localhost/fnndsc/compute-status-consumer
├── Dockerfile.sse_service # SSE + Celery worker image
│
├── chris_streaming/ # Python package root
│ ├── __init__.py
│ │
│ ├── common/ # Shared modules (high code reuse)
│ │ ├── __init__.py
│ │ ├── kafka.py # Async Kafka producer/consumer factory
│ │ ├── schemas.py # Pydantic models: StatusEvent, LogEvent, JobStatus
│ │ ├── settings.py # pydantic-settings for env var parsing
│ │ ├── pfcon_status.py # Docker/K8s state → pfcon JobStatus mapping
│ │ └── container_naming.py # job_id ↔ container_name parsing
│ │
│ ├── event_forwarder/ # Produces to job-status-events + EOS markers
│ │ ├── __init__.py
│ │ ├── __main__.py # python -m chris_streaming.event_forwarder
│ │ ├── watcher.py # Abstract async watcher protocol
│ │ ├── docker_watcher.py # Docker event stream → StatusEvent
│ │ ├── k8s_watcher.py # K8s Job watch API → StatusEvent
│ │ ├── producer.py # Kafka producer with idempotence + dedup
│ │ └── eos_producer.py # Delayed EOS markers → Kafka job-logs
│ │
│ ├── status_consumer/ # Consumes job-status-events
│ │ ├── __init__.py
│ │ ├── __main__.py # python -m chris_streaming.status_consumer
│ │ ├── consumer.py # Kafka consumer with retry + DLQ
│ │ └── notifier.py # Redis Pub/Sub + Celery task scheduling
│ │
│ ├── log_consumer/ # Consumes job-logs
│ │ ├── __init__.py
│ │ ├── __main__.py # python -m chris_streaming.log_consumer
│ │ ├── consumer.py # Batched Kafka consumer
│ │ ├── opensearch_writer.py # Bulk writes with daily index rotation
│ │ └── redis_publisher.py # Per-job Pub/Sub fan-out
│ │
│ └── sse_service/ # FastAPI SSE service + Celery tasks
│ ├── __init__.py
│ ├── __main__.py # python -m chris_streaming.sse_service
│ ├── app.py # FastAPI app with CORS
│ ├── routes.py # SSE + REST endpoints (run, workflow, history)
│ ├── redis_subscriber.py # Async Redis subscriber with historical replay
│ ├── pfcon_client.py # Synchronous HTTP client for pfcon REST API
│ └── tasks.py # Celery tasks: process_job_status, start_workflow, cleanup_containers
│
├── config/
│ ├── kafka/
│ │ ├── server.properties # KRaft broker config (SASL/PLAIN, ACLs)
│ │ ├── kafka_server_jaas.conf # SASL/PLAIN user credentials
│ │ ├── init-kafka.sh # Create topics and ACLs (run-once)
│ │ └── admin.properties # SASL config for kafka CLI tools
│ ├── fluent-bit/
│ │ ├── fluent-bit.conf # Input, filter, output pipeline
│ │ ├── parsers.conf # Docker JSON log parser
│ │ └── enrich.lua # Lua filter: metadata + schema reshaping
│ ├── opensearch/
│ │ └── index-template.json # job-logs-* index mapping
│ └── init-test-data.sh # Create sample files in storeBase
│
├── test_ui/
│ ├── Dockerfile # nginx serving static + reverse proxy
│ ├── nginx.conf # Proxy /pfcon/ → pfcon, /sse/ → SSE service
│ └── static/
│ ├── index.html # Job submission + status + log viewer
│ └── app.js # SSE service client + EventSource SSE client
│
├── tests/
│ └── __init__.py
│
├── kubernetes/ # Kubernetes manifests (parallel to docker-compose)
│ ├── README.md # K8s deployment and testing guide
│ ├── kustomization.yaml # Kustomize entrypoint
│ ├── 00-namespace.yaml
│ ├── 20-infra/ # Kafka, Redis, OpenSearch, Postgres, Fluent Bit
│ ├── 30-pfcon/ # pfcon with KubernetesManager
│ ├── 40-workers/ # Event forwarder, status/log consumers, SSE, celery
│ ├── 50-ui/ # Test UI (nginx)
│ └── tests/ # Unit/integration/e2e test Jobs + integration stack
│
└── justfile # Task runner: `just up`, `just k8s-up`, `just run all-tests`, etc.
All 14 services run on a single streaming Docker network.
| # | Service | Image | Role | Exposed Ports |
|---|---|---|---|---|
| 1 | kafka |
apache/kafka:3.9.0 |
KRaft broker with SASL/PLAIN | 9092 |
| 2 | kafka-init |
apache/kafka:3.9.0 |
Creates topics and ACLs (run-once) | — |
| 3 | opensearch |
opensearchproject/opensearch:2.18.0 |
Log storage and search | 9200 |
| 4 | redis |
redis:7-alpine |
Pub/Sub fan-out + Celery broker | 6379 |
| 5 | postgres |
postgres:16-alpine |
Celery worker DB | 5433 |
| 6 | fluent-bit |
fluent/fluent-bit:3.2 |
Docker log files → Kafka job-logs |
2020 (metrics) |
| 7 | pfcon |
ghcr.io/fnndsc/pfcon:latest |
Job control plane (fslink mode) | 30005 |
| 8 | init-test-data |
alpine:latest |
Creates sample fslink test data (run-once) | — |
| 9 | event-forwarder |
localhost/fnndsc/compute-event-forwarder |
Docker events → Kafka job-status-events |
— |
| 10 | status-consumer |
localhost/fnndsc/compute-status-consumer |
Kafka → Celery | — |
| 11 | log-consumer |
localhost/fnndsc/compute-logs-consumer |
Kafka → OpenSearch + Redis | — |
| 12 | sse-service |
Built from Dockerfile.sse_service |
FastAPI SSE streaming | 8080 |
| 13 | celery-worker |
Built from Dockerfile.sse_service |
Celery status processing + PostgreSQL | — |
| 14 | test-ui |
Built from test_ui/Dockerfile |
nginx + static HTML/JS test app | 8888 |
init-test-data ──→ pfcon
kafka ──→ kafka-init ──→ event-forwarder
├──→ status-consumer
├──→ log-consumer (also needs opensearch, redis)
└──→ fluent-bit
redis + postgres + pfcon ──→ celery-worker
redis + postgres ──→ sse-service
sse-service ──→ test-ui
| Topic | Partitions | Retention | Key | Purpose |
|---|---|---|---|---|
job-status-events |
12 | 3 days | job_id |
Status transitions from Event Forwarder |
job-logs |
12 | 3 days | job_id |
Log lines from Fluent Bit |
job-status-events-dlq |
3 | 7 days | job_id |
Failed status messages |
job-logs-dlq |
3 | 7 days | job_id |
Failed log messages |
Partitioning by job_id guarantees ordering of all events for a single job.
SASL/PLAIN users (defined in kafka_server_jaas.conf): event-forwarder (write events + write EOS markers to job-logs), log-producer (Fluent Bit writes logs), status-consumer (read events), log-consumer (read logs). Each user has ACLs restricting them to only the operations they need.
A parallel Kubernetes pipeline has been made available under kubernetes/ and
wrapped by just k8s-* recipes. See kubernetes/README.md
for the full guide (bring-up, teardown, in-cluster test runs). The docker-compose
pipeline described below is unaffected.
- Docker and Docker Compose v2
- The
ghcr.io/fnndsc/pfcon:latestimage (includesorg.chrisproject.job_typelabel support)
just upor without just:
docker compose up --build -dThis builds the custom service images, pulls infrastructure images (including pfcon), initializes Kafka topics/users, creates test data, and launches all services.
| URL | Service |
|---|---|
| http://localhost:8888 | Test UI — submit jobs, watch status + logs |
| http://localhost:8080/health | SSE Service health check |
| http://localhost:8080/api/jobs/{job_id}/run | Submit a workflow (POST) |
| http://localhost:8080/api/jobs/{job_id}/workflow | Workflow status (GET) |
| http://localhost:8080/api/jobs/{job_id}/status/history | Status history (GET) |
| http://localhost:8080/events/{job_id}/status | SSE status stream (with historical replay) |
| http://localhost:8080/events/{job_id}/logs | SSE log stream (with historical replay) |
| http://localhost:8080/events/{job_id}/all | SSE combined stream (with historical replay) |
| http://localhost:8080/logs/{job_id}/history | Historical logs from OpenSearch |
| http://localhost:30005/api/v1/ | pfcon API (direct) |
| http://localhost:9200 | OpenSearch API |
- Open http://localhost:8888
- The form is pre-filled with defaults for
pl-simpledsappagainst the test data - Click Run Full Workflow — the UI will:
- Submit a single
POST /sse/api/jobs/{job_id}/runrequest to the SSE service - The SSE service schedules the workflow via Celery, which orchestrates copy → plugin → upload → delete → cleanup automatically
- Submit a single
- Watch the Status Events panel for real-time SSE status updates from the event pipeline
- Watch the Container Logs panel for real-time log lines from the log pipeline
- Watch the Step Tracker for workflow progression
# Submit a workflow (single request — the SSE service orchestrates everything)
curl -s -X POST http://localhost:8080/api/jobs/my-job-1/run \
-H 'Content-Type: application/json' \
-d '{
"image": "ghcr.io/fnndsc/pl-simpledsapp:2.1.0",
"entrypoint": ["simpledsapp"],
"type": "ds",
"args": ["--dummyFloat", "3.5", "--sleepLength", "5"]
}'
# Check workflow status
curl -s http://localhost:8080/api/jobs/my-job-1/workflow | jq
# Get status history
curl -s http://localhost:8080/api/jobs/my-job-1/status/history | jq
# Stream SSE events (including historical replay for late-connecting clients)
curl -N http://localhost:8080/events/my-job-1/all# Check Kafka topics
docker compose exec kafka /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--command-config /etc/kafka/admin.properties --list
# Read status events from Kafka
docker compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--consumer.config /etc/kafka/admin.properties \
--topic job-status-events --from-beginning --max-messages 10
# Query PostgreSQL for job statuses
docker compose exec postgres psql -U chris chris_streaming \
-c "SELECT job_id, job_type, status, updated_at FROM job_status ORDER BY updated_at;"
# Query OpenSearch for logs
curl -s 'http://localhost:9200/job-logs-*/_search?q=job_id:test-job-1&sort=timestamp:asc&size=20' | jq '.hits.hits[]._source.line'
# Check Fluent Bit metrics
curl -s http://localhost:2020/api/v1/metricsjust down # stop services
just nuke # stop and remove all volumes (full reset)or without just:
docker compose down # stop services
docker compose down -v # stop and remove all volumes (full reset)Tests are split into three levels — unit tests (no infrastructure, fast), integration tests (require Kafka, Redis, OpenSearch, PostgreSQL), and end-to-end tests (require the full application stack). All run entirely in Docker via docker-compose.test.yml and Dockerfile.test, using Docker Compose profiles to start only the services needed for each level.
Install just, then:
# Run unit tests (no infrastructure needed, ~1s)
just run unit-tests
# Run integration tests (spins up Kafka, Redis, OpenSearch, PostgreSQL)
just run integration-tests
# Run E2E tests (starts full application stack, submits real workflows)
just run e2e-tests
# Run all test levels sequentially (useful for CI/CD)
just run all-tests# Unit tests only
docker compose -f docker-compose.test.yml build unit-tests
docker compose -f docker-compose.test.yml run --rm unit-tests
# Integration tests (starts infrastructure, runs tests, stops infrastructure)
docker compose -f docker-compose.test.yml --profile integration build
docker compose -f docker-compose.test.yml --profile integration up -d --wait
docker compose -f docker-compose.test.yml run --rm integration-tests
docker compose -f docker-compose.test.yml --profile integration down
# E2E tests (requires the full stack from docker-compose.yml)
docker compose up --build -d
docker compose -f docker-compose.test.yml build e2e-tests
docker compose -f docker-compose.test.yml --profile e2e run --rm e2e-tests
docker compose downtests/
├── conftest.py # Shared fixtures (sample events)
├── unit/ # No external services required
│ ├── test_common/ # schemas, pfcon_status, container_naming, settings, kafka
│ ├── test_event_forwarder/ # producer, eos_producer, docker_watcher
│ ├── test_status_consumer/ # consumer, notifier
│ ├── test_log_consumer/ # consumer, opensearch_writer, redis_publisher
│ └── test_sse_service/ # app, routes, pfcon_client, tasks
├── integration/ # Requires Docker Compose infrastructure
│ ├── test_kafka_roundtrip.py # Produce/consume through real Kafka
│ ├── test_redis_pubsub.py # Pub/Sub and key operations
│ ├── test_opensearch_writer.py # Bulk writes and queries
│ ├── test_postgres.py # Schema creation and upsert logic
│ └── test_sse_health.py # FastAPI health endpoint
└── e2e/ # Requires full application stack
└── test_workflow_e2e.py # Submit workflows, verify SSE events + logs
- Unit tests use mocks exclusively — no network or container dependencies.
- Integration tests are marked with
@pytest.mark.integrationand test real service interactions (Kafka produce/consume, Redis Pub/Sub, OpenSearch bulk writes, PostgreSQL upserts). - E2E tests are marked with
@pytest.mark.e2eand exercise the full pipeline: submit a job viaPOST /api/jobs/{id}/run, verify status events arrive through the SSE stream, check workflow completion, and confirm logs appear in OpenSearch. Includes tests for successful workflows, failure scenarios (bad image), and historical replay for late-connecting SSE clients.
All services are configured via environment variables. The .env file provides defaults for the Docker Compose deployment.
Used by: Event Forwarder, Status Consumer, Log Consumer
| Variable | Default | Description |
|---|---|---|
KAFKA_BOOTSTRAP_SERVERS |
kafka:9092 |
Kafka broker address |
KAFKA_SECURITY_PROTOCOL |
SASL_PLAINTEXT |
SASL_PLAINTEXT for dev, SASL_SSL for production |
KAFKA_SASL_MECHANISM |
PLAIN |
SASL mechanism |
KAFKA_SASL_USERNAME |
(per service) | SASL/PLAIN username |
KAFKA_SASL_PASSWORD |
(per service) | SASL/PLAIN password |
KAFKA_TOPIC_STATUS |
job-status-events |
Status events topic |
KAFKA_TOPIC_LOGS |
job-logs |
Log events topic |
KAFKA_TOPIC_STATUS_DLQ |
job-status-events-dlq |
Status dead-letter topic |
KAFKA_TOPIC_LOGS_DLQ |
job-logs-dlq |
Logs dead-letter topic |
| Variable | Default | Description |
|---|---|---|
COMPUTE_ENV |
docker |
docker or kubernetes |
DOCKER_LABEL_FILTER |
org.chrisproject.miniChRIS |
Docker label key to filter containers |
DOCKER_LABEL_VALUE |
plugininstance |
Expected value for the filter label |
K8S_NAMESPACE |
default |
Kubernetes namespace (when COMPUTE_ENV=kubernetes) |
K8S_LABEL_SELECTOR |
org.chrisproject.miniChRIS=plugininstance |
K8s label selector |
EMIT_INITIAL_STATE |
true |
Emit current state of all containers on startup |
EOS_DELAY_SECONDS |
10.0 |
Delay before sending EOS marker to job-logs (seconds) |
KAFKA_SASL_USERNAME |
event-forwarder |
Kafka SASL/PLAIN user |
KAFKA_SASL_PASSWORD |
event-forwarder-secret |
Kafka SASL/PLAIN password |
Requires the Docker socket mounted at /var/run/docker.sock (Docker mode) or in-cluster K8s config (Kubernetes mode).
| Variable | Default | Description |
|---|---|---|
KAFKA_SASL_USERNAME |
status-consumer |
Kafka SASL/PLAIN user |
KAFKA_SASL_PASSWORD |
status-consumer-secret |
Kafka SASL/PLAIN password |
KAFKA_CONSUMER_GROUP |
status-consumer-group |
Kafka consumer group ID |
CELERY_BROKER_URL |
redis://redis:6379/0 |
Celery broker URL |
MAX_RETRIES |
3 |
Retries before sending to DLQ |
| Variable | Default | Description |
|---|---|---|
KAFKA_SASL_USERNAME |
log-consumer |
Kafka SASL/PLAIN user |
KAFKA_SASL_PASSWORD |
log-consumer-secret |
Kafka SASL/PLAIN password |
KAFKA_CONSUMER_GROUP |
log-consumer-group |
Kafka consumer group ID |
OPENSEARCH_URL |
http://opensearch:9200 |
OpenSearch endpoint |
REDIS_URL |
redis://redis:6379/0 |
Redis URL for Pub/Sub |
BATCH_MAX_SIZE |
200 |
Max messages per batch before flush |
BATCH_MAX_WAIT_SECONDS |
2.0 |
Max seconds before flushing a partial batch |
| Variable | Default | Description |
|---|---|---|
HOST |
0.0.0.0 |
Bind address |
PORT |
8080 |
Bind port |
REDIS_URL |
redis://redis:6379/0 |
Redis URL for Pub/Sub subscriptions |
OPENSEARCH_URL |
http://opensearch:9200 |
OpenSearch for historical log queries |
CELERY_BROKER_URL |
redis://redis:6379/0 |
Celery broker |
DB_DSN |
postgresql://chris:chris1234@postgres:5432/chris_streaming |
PostgreSQL connection string |
PFCON_URL |
http://pfcon:30005 |
pfcon API base URL |
PFCON_USER |
pfcon |
pfcon API username |
PFCON_PASSWORD |
pfcon1234 |
pfcon API password |
EOS_QUIESCENCE_SECONDS |
10.0 |
Fallback window used by cleanup_containers: if no logs_flushed key appears, a step whose terminal status has been stable for this long is treated as drained. EOS is a hint, not a hard gate. |
Uses the same image as SSE Service. Runs with:
celery -A chris_streaming.sse_service.tasks worker -l info -Q status-processing -c 2
| Variable | Default | Description |
|---|---|---|
REDIS_URL |
redis://redis:6379/0 |
Redis URL for publishing confirmed statuses and checking logs_flushed keys |
CELERY_BROKER_URL |
redis://redis:6379/0 |
Celery broker |
DB_DSN |
postgresql://chris:chris1234@postgres:5432/chris_streaming |
PostgreSQL connection string |
PFCON_URL |
http://pfcon:30005 |
pfcon API base URL |
PFCON_USER |
pfcon |
pfcon API username |
PFCON_PASSWORD |
pfcon1234 |
pfcon API password |
| Variable | Default | Description |
|---|---|---|
KAFKA_BROKERS |
kafka:9092 |
Kafka broker address |
KAFKA_SASL_USERNAME |
log-producer |
Kafka SASL/PLAIN user |
KAFKA_SASL_PASSWORD |
log-producer-secret |
Kafka SASL/PLAIN password |
Requires /var/lib/docker/containers and /var/run/docker.sock mounted read-only.
| Variable | Default | Description |
|---|---|---|
APPLICATION_MODE |
development |
Enables DevConfig with hardcoded test credentials |
PFCON_INNETWORK |
true |
In-network mode (containers share a volume) |
STORAGE_ENV |
fslink |
Filesystem with ChRIS link expansion |
CONTAINER_ENV |
docker |
Schedule containers via Docker API |
JOB_LABELS |
org.chrisproject.miniChRIS=plugininstance |
Labels applied to all job containers |
REMOVE_JOBS |
yes |
Remove containers on DELETE |
| Variable | Default | Description |
|---|---|---|
KAFKA_ADMIN_PASSWORD |
admin-secret |
SASL/PLAIN password for the admin super-user |
| Variable | Default | Description |
|---|---|---|
POSTGRES_DB |
chris_streaming |
Database name (used by Celery Worker) |
POSTGRES_USER |
chris |
Database user |
POSTGRES_PASSWORD |
chris1234 |
Database password |
The dev environment uses SASL_PLAINTEXT with SASL/PLAIN (credentials in cleartext over the wire). For production:
- Switch authentication from
SASL/PLAINtoSASL/SCRAM-SHA-512(hashed credentials) - Generate CA, broker, and client certificates
- Configure the Kafka broker with
ssl.keystore.location,ssl.truststore.location - Change
KAFKA_SECURITY_PROTOCOLtoSASL_SSLandKAFKA_SASL_MECHANISMtoSCRAM-SHA-512on all clients - Distribute client keystores/truststores to each service container
- Update Fluent Bit
rdkafka.security.protocoltoSASL_SSL,rdkafka.sasl.mechanismtoSCRAM-SHA-512, and addrdkafka.ssl.ca.location