Produce
Produce
type: "io.kestra.plugin.pulsar.Produce"
Produce message to a Pulsar topic.
Examples
Read a CSV file, transform it to the right format, and publish it to Pulsar topic.
id: produce
namespace: company.team
inputs:
- type: FILE
id: file
tasks:
- id: csv_reader
type: io.kestra.plugin.serdes.csv.CsvToIon
from: "{{ inputs.file }}"
- id: file_transform
type: io.kestra.plugin.scripts.nashorn.FileTransform
from: {{ outputs.csv_reader.uri }}"
script: |
var result = {
"key": row.id,
"value": {
"username": row.username,
"tweet": row.tweet
},
"eventTime": row.timestamp,
"properties": {
"key": "value"
}
};
row = result
- id: produce
type: io.kestra.plugin.pulsar.Produce
from: "{{ outputs.file_transform.uri }}"
uri: pulsar://localhost:26650
serializer: JSON
topic: test_kestra
Properties
from
- Type: object
- Dynamic: ✔️
- Required: ✔️
Source of the sent message.
Can be a Kestra internal storage URI, a map or a list in the following format:
key
,value
,eventTime
,properties
,deliverAt
,deliverAfter
andsequenceId
.
serializer
- Type: object
- Dynamic: ❓
- Required: ✔️
topic
- Type: string
- Dynamic: ✔️
- Required: ✔️
Pulsar topic to send a message to.
uri
- Type: string
- Dynamic: ✔️
- Required: ✔️
Connection URLs.
You need to specify a Pulsar protocol URL.
- Example of localhost:
pulsar://localhost:6650
- If you have multiple brokers:
pulsar://localhost:6650,localhost:6651,localhost:6652
- If you use TLS authentication:
pulsar+ssl://pulsar.us-west.example.com:6651
accessMode
- Type: string
- Dynamic: ❌
- Required: ❌
- Possible Values:
Shared
Exclusive
ExclusiveWithFencing
WaitForExclusive
Configure the type of access mode that the producer requires on the topic.
Possible values are:
Shared
: By default, multiple producers can publish to a topic.Exclusive
: Require exclusive access for producer. Fail immediately if there's already a producer connected.WaitForExclusive
: Producer creation is pending until it can acquire exclusive access.
authenticationToken
- Type: string
- Dynamic: ✔️
- Required: ❌
Authentication token.
Authentication token that can be required by some providers such as Clever Cloud.
compressionType
- Type: string
- Dynamic: ❌
- Required: ❌
- Possible Values:
NONE
LZ4
ZLIB
ZSTD
SNAPPY
Set the compression type for the producer.
By default, message payloads are not compressed. Supported compression types are:
NONE
: No compression (Default).LZ4
: Compress with LZ4 algorithm. Faster but lower compression than ZLib.ZLIB
: Standard ZLib compression.ZSTD
Compress with Zstandard codec. Since Pulsar 2.3.SNAPPY
Compress with Snappy codec. Since Pulsar 2.4.
encryptionKey
- Type: string
- Dynamic: ✔️
- Required: ❌
Add public encryption key, used by producer to encrypt the data key.
producerName
- Type: string
- Dynamic: ✔️
- Required: ❌
Specify a name for the producer.
producerProperties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Add all the properties in the provided map to the producer.
schemaString
- Type: string
- Dynamic: ✔️
- Required: ❌
JSON string of the topic's schema
Required for connecting with topics with a defined schema and strict schema checking
schemaType
- Type: string
- Dynamic: ✔️
- Required: ❌
- Default:
NONE
- Possible Values:
NONE
AVRO
JSON
The schema type of the topic
Can be one of NONE, AVRO or JSON. None means there will be no schema enforced.
tlsOptions
- Type: AbstractPulsarConnection-TlsOptions
- Dynamic: ❌
- Required: ❌
TLS authentication options.
You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.
Outputs
messagesCount
- Type: integer
- Required: ❌
Number of messages produced.
Definitions
io.kestra.plugin.pulsar.AbstractPulsarConnection-TlsOptions
Properties
ca
- Type: string
- Dynamic: ❓
- Required: ❌
The ca certificate.
Must be a base64-encoded pem file.
cert
- Type: string
- Dynamic: ❓
- Required: ❌
The client certificate.
Must be a base64-encoded pem file.
key
- Type: string
- Dynamic: ❓
- Required: ❌
The key certificate.
Must be a base64-encoded pem file.
Was this page helpful?