Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ The above example uses secrets as plain strings. It is recommended to use a [sec
| batchingMaxPublishDelay | N | batchingMaxPublishDelay set the time period within which the messages sent will be batched,if batch messages are enabled. If set to a non zero value, messages will be queued until this time interval or batchingMaxMessages (see below) or batchingMaxSize (see below). There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that is processed as milliseconds. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". Default: `"10ms"` | `"10ms"`, `"10"`|
| batchingMaxMessages | N | batchingMaxMessages set the maximum number of messages permitted in a batch.If set to a value greater than 1, messages will be queued until this threshold is reached or batchingMaxSize (see below) has been reached or the batch interval has elapsed. Default: `"1000"` | `"1000"`|
| batchingMaxSize | N | batchingMaxSize sets the maximum number of bytes permitted in a batch. If set to a value greater than 1, messages will be queued until this threshold is reached or batchingMaxMessages (see above) has been reached or the batch interval has elapsed. Default: `"128KB"` | `"131072"`|
| <topic-name>.jsonschema | N | Enforces JSON schema validation for the configured topic. |
| <topic-name>.avroschema | N | Enforces Avro schema validation for the configured topic. |
| <topic-name>.jsonschema | N | Enforces JSON schema validation for the configured topic. When CloudEvents wrapping is enabled (the default), the schema registered with the Pulsar Schema Registry is a CloudEvents envelope JSON schema containing the provided schema as the `data` field. See [Publishing & subscribing messages with Cloudevents]({{% ref pubsub-cloudevents.md %}})| |
| <topic-name>.avroschema | N | Enforces Avro schema validation for the configured topic. When CloudEvents wrapping is enabled (the default), the schema registered with the Pulsar Schema Registry is a CloudEvents envelope Avro schema containing the provided schema as the `data` field. See [Publishing & subscribing messages with Cloudevents]({{% ref pubsub-cloudevents.md %}}) | |
| <topic-name>.rawschema | N | When set to `"true"`, registers the raw message schema (Avro or JSON) directly with the Pulsar Schema Registry instead of wrapping it in a CloudEvents envelope. Use this for topics that exclusively receive raw payloads. Callers must also set the publisher request metadata `rawPayload=true`. See [Publishing & subscribing messages without CloudEvents]({{% ref pubsub-raw.md %}}). Default: `"false"` | `"true"`, `"false"` |
| publicKey | N | A public key to be used for publisher and consumer encryption. Value can be one of two options: file path for a local PEM cert, or the cert data string value |
| privateKey | N | A private key to be used for consumer encryption. Value can be one of two options: file path for a local PEM cert, or the cert data string value |
| keys | N | A comma delimited string containing names of [Pulsar session keys](https://pulsar.apache.org/docs/3.0.x/security-encryption/#how-it-works-in-pulsar). Used in conjunction with `publicKey` for publisher encryption |
Expand Down Expand Up @@ -241,9 +242,77 @@ spec:
value: "openid,profile,email"
```

### Use JSON/Avro schema validation

Dapr allows you to perform schema validation on Avro and JSON message schema using the `<topic-name>.avroschema` and `<topic-name>.jsonschema` metadata properties. When using this feature with CloudEvents enabled (the [default]({{% ref pubsub-cloudevents.md %}})), Dapr automatically wraps your schema inside a CloudEvents envelope before registering it with the Pulsar Schema Registry. When using this feature with [raw messages]({{% ref pubsub-raw.md %}}), Dapr registers your raw message schema with the Pulsar Schema Registry.

For Avro schemas, the envelope follows the [CloudEvents Avro format spec](https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/avro-format.md). For JSON schemas, the envelope follows the [CloudEvents JSON format spec](https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md). This ensures the registered Pulsar schema matches the actual wire format of published messages.

The following table summarizes the behavior (applies to both `.avroschema` and `.jsonschema`):

| | CloudEvents wrapper (default) | Raw messages |
|---|---|---|
| **No schema validation configured** | Dapr wraps the message in a CloudEvents envelope (JSON). Message sent as raw bytes as the `data` field. No schema registered with Pulsar. | Dapr sends the message as raw bytes without schema validation or CloudEvents envelope. |
| **Schema validation with `<topic-name>.rawschema=false`** | Dapr wraps message in CloudEvents envelope. Publisher registers CloudEvents envelope schema (either JSON or Avro) and validates against CloudEvents codec. | Rejected with error. Publisher schema is a CloudEvents envelope and so raw payload fails validation. |
| **Schema validation with `<topic-name>.rawschema=true`** | Rejected with error. Publisher registers raw payload with Pulsar but Dapr wraps it in a CloudEvents envelope and so fails validation. | Publisher registers raw data with Pulsar and validates it against inner codec. |

{{% alert title="Important" color="warning" %}}
Because Dapr wraps messages in a CloudEvents envelope by default, never use `<topic-name>.rawschema=true` on a topic that will receive CloudEvent wrapped messages. The `<topic-name>.rawschema=true` metadata property must only be used together with the publisher setting`rawPayload=true`, on a dedicated topic, so the registered inner data schema always matches the actual wire format. Read [Publishing & subscribing messages without CloudEvents]({{% ref pubsub-raw.md %}}) for how to use `rawPayload=true`.
{{% /alert %}}

Since Pulsar enforces a single schema per topic, raw messages cannot be sent to a topic using a CloudEvents envelope schema. To handle both CloudEvent-wrapped and raw payloads, use separate topics.

#### Schema validation with CloudEvents

```yaml
Comment thread
javier-aliaga marked this conversation as resolved.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pulsar-pubsub
spec:
type: pubsub.pulsar
version: v1
metadata:
- name: host
value: "localhost:6650"
# CloudEvent-wrapped topic (default behavior) — Avro example
- name: orders.avroschema
value: '{"type":"record","name":"Order","fields":[{"name":"ID","type":"int"},{"name":"Name","type":"string"}]}'
```

Comment thread
javier-aliaga marked this conversation as resolved.
Publishing with `rawPayload=true` to a CloudEvent-wrapped topic returns the error:
> `rawPayload=true is not compatible with schema topics using CloudEvents envelope; use a separate topic for raw payloads`

#### Schema validation with raw messages

```yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pulsar-pubsub
spec:
type: pubsub.pulsar
version: v1
metadata:
- name: host
value: "localhost:6650"
# Raw-payload-only topic (skips CloudEvent wrapping) — Avro example
- name: orders-raw.avroschema
value: '{"type":"record","name":"Order","fields":[{"name":"ID","type":"int"},{"name":"Name","type":"string"}]}'
- name: orders-raw.rawschema
value: "true"
# Raw-payload-only topic — JSON schema example
- name: events-raw.jsonschema
value: '{"type":"object","properties":{"id":{"type":"integer"},"name":{"type":"string"}},"required":["id","name"]}'
- name: events-raw.rawschema
value: "true"
```



### Enabling message delivery retries

The Pulsar pub/sub component has no built-in support for retry strategies. This means that sidecar sends a message to the service only once and is not retried in case of failures. To make Dapr use more spohisticated retry policies, you can apply a [retry resiliency policy]({{% ref "retries-overview.md" %}}) to the Pulsar pub/sub component. Note that it will be the same Dapr sidecar retrying the redelivery the message to the same app instance and not other instances.
The Pulsar pub/sub component has no built-in support for retry strategies. This means that sidecar sends a message to the service only once and is not retried in case of failures. To make Dapr use more sophisticated retry policies, you can apply a [retry resiliency policy]({{% ref "retries-overview.md" %}}) to the Pulsar pub/sub component. Note that it will be the same Dapr sidecar retrying the redelivery the message to the same app instance and not other instances.

### Delay queue

Expand Down
Loading