diff --git a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-pulsar.md b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-pulsar.md index 4672698dd85..93ad540f539 100644 --- a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-pulsar.md +++ b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-pulsar.md @@ -84,8 +84,9 @@ The above example uses secrets as plain strings. It is recommended to use a [sec | batchingMaxPublishDelay | N | batchingMaxPublishDelay set the time period within which the messages sent will be batched,if batch messages are enabled. If set to a non zero value, messages will be queued until this time interval or batchingMaxMessages (see below) or batchingMaxSize (see below). There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that is processed as milliseconds. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". Default: `"10ms"` | `"10ms"`, `"10"`| | batchingMaxMessages | N | batchingMaxMessages set the maximum number of messages permitted in a batch.If set to a value greater than 1, messages will be queued until this threshold is reached or batchingMaxSize (see below) has been reached or the batch interval has elapsed. Default: `"1000"` | `"1000"`| | batchingMaxSize | N | batchingMaxSize sets the maximum number of bytes permitted in a batch. If set to a value greater than 1, messages will be queued until this threshold is reached or batchingMaxMessages (see above) has been reached or the batch interval has elapsed. Default: `"128KB"` | `"131072"`| -| .jsonschema | N | Enforces JSON schema validation for the configured topic. | -| .avroschema | N | Enforces Avro schema validation for the configured topic. | +| .jsonschema | N | Enforces JSON schema validation for the configured topic. When CloudEvents wrapping is enabled (the default), the schema registered with the Pulsar Schema Registry is a CloudEvents envelope JSON schema containing the provided schema as the `data` field. See [Publishing & subscribing messages with Cloudevents]({{% ref pubsub-cloudevents.md %}})| | +| .avroschema | N | Enforces Avro schema validation for the configured topic. When CloudEvents wrapping is enabled (the default), the schema registered with the Pulsar Schema Registry is a CloudEvents envelope Avro schema containing the provided schema as the `data` field. See [Publishing & subscribing messages with Cloudevents]({{% ref pubsub-cloudevents.md %}}) | | +| .rawschema | N | When set to `"true"`, registers the raw message schema (Avro or JSON) directly with the Pulsar Schema Registry instead of wrapping it in a CloudEvents envelope. Use this for topics that exclusively receive raw payloads. Callers must also set the publisher request metadata `rawPayload=true`. See [Publishing & subscribing messages without CloudEvents]({{% ref pubsub-raw.md %}}). Default: `"false"` | `"true"`, `"false"` | | publicKey | N | A public key to be used for publisher and consumer encryption. Value can be one of two options: file path for a local PEM cert, or the cert data string value | | privateKey | N | A private key to be used for consumer encryption. Value can be one of two options: file path for a local PEM cert, or the cert data string value | | keys | N | A comma delimited string containing names of [Pulsar session keys](https://pulsar.apache.org/docs/3.0.x/security-encryption/#how-it-works-in-pulsar). Used in conjunction with `publicKey` for publisher encryption | @@ -241,9 +242,77 @@ spec: value: "openid,profile,email" ``` +### Use JSON/Avro schema validation + +Dapr allows you to perform schema validation on Avro and JSON message schema using the `.avroschema` and `.jsonschema` metadata properties. When using this feature with CloudEvents enabled (the [default]({{% ref pubsub-cloudevents.md %}})), Dapr automatically wraps your schema inside a CloudEvents envelope before registering it with the Pulsar Schema Registry. When using this feature with [raw messages]({{% ref pubsub-raw.md %}}), Dapr registers your raw message schema with the Pulsar Schema Registry. + +For Avro schemas, the envelope follows the [CloudEvents Avro format spec](https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/avro-format.md). For JSON schemas, the envelope follows the [CloudEvents JSON format spec](https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md). This ensures the registered Pulsar schema matches the actual wire format of published messages. + +The following table summarizes the behavior (applies to both `.avroschema` and `.jsonschema`): + +| | CloudEvents wrapper (default) | Raw messages | +|---|---|---| +| **No schema validation configured** | Dapr wraps the message in a CloudEvents envelope (JSON). Message sent as raw bytes as the `data` field. No schema registered with Pulsar. | Dapr sends the message as raw bytes without schema validation or CloudEvents envelope. | +| **Schema validation with `.rawschema=false`** | Dapr wraps message in CloudEvents envelope. Publisher registers CloudEvents envelope schema (either JSON or Avro) and validates against CloudEvents codec. | Rejected with error. Publisher schema is a CloudEvents envelope and so raw payload fails validation. | +| **Schema validation with `.rawschema=true`** | Rejected with error. Publisher registers raw payload with Pulsar but Dapr wraps it in a CloudEvents envelope and so fails validation. | Publisher registers raw data with Pulsar and validates it against inner codec. | + +{{% alert title="Important" color="warning" %}} +Because Dapr wraps messages in a CloudEvents envelope by default, never use `.rawschema=true` on a topic that will receive CloudEvent wrapped messages. The `.rawschema=true` metadata property must only be used together with the publisher setting`rawPayload=true`, on a dedicated topic, so the registered inner data schema always matches the actual wire format. Read [Publishing & subscribing messages without CloudEvents]({{% ref pubsub-raw.md %}}) for how to use `rawPayload=true`. +{{% /alert %}} + +Since Pulsar enforces a single schema per topic, raw messages cannot be sent to a topic using a CloudEvents envelope schema. To handle both CloudEvent-wrapped and raw payloads, use separate topics. + +#### Schema validation with CloudEvents + +```yaml +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: pulsar-pubsub +spec: + type: pubsub.pulsar + version: v1 + metadata: + - name: host + value: "localhost:6650" + # CloudEvent-wrapped topic (default behavior) — Avro example + - name: orders.avroschema + value: '{"type":"record","name":"Order","fields":[{"name":"ID","type":"int"},{"name":"Name","type":"string"}]}' +``` + +Publishing with `rawPayload=true` to a CloudEvent-wrapped topic returns the error: +> `rawPayload=true is not compatible with schema topics using CloudEvents envelope; use a separate topic for raw payloads` + +#### Schema validation with raw messages + +```yaml +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: pulsar-pubsub +spec: + type: pubsub.pulsar + version: v1 + metadata: + - name: host + value: "localhost:6650" + # Raw-payload-only topic (skips CloudEvent wrapping) — Avro example + - name: orders-raw.avroschema + value: '{"type":"record","name":"Order","fields":[{"name":"ID","type":"int"},{"name":"Name","type":"string"}]}' + - name: orders-raw.rawschema + value: "true" + # Raw-payload-only topic — JSON schema example + - name: events-raw.jsonschema + value: '{"type":"object","properties":{"id":{"type":"integer"},"name":{"type":"string"}},"required":["id","name"]}' + - name: events-raw.rawschema + value: "true" +``` + + + ### Enabling message delivery retries -The Pulsar pub/sub component has no built-in support for retry strategies. This means that sidecar sends a message to the service only once and is not retried in case of failures. To make Dapr use more spohisticated retry policies, you can apply a [retry resiliency policy]({{% ref "retries-overview.md" %}}) to the Pulsar pub/sub component. Note that it will be the same Dapr sidecar retrying the redelivery the message to the same app instance and not other instances. +The Pulsar pub/sub component has no built-in support for retry strategies. This means that sidecar sends a message to the service only once and is not retried in case of failures. To make Dapr use more sophisticated retry policies, you can apply a [retry resiliency policy]({{% ref "retries-overview.md" %}}) to the Pulsar pub/sub component. Note that it will be the same Dapr sidecar retrying the redelivery the message to the same app instance and not other instances. ### Delay queue