Reader Reader

yaml
type: "io.kestra.plugin.pulsar.Reader"

Read messages from Pulsar topic(s) without subscription.

Examples

yaml
id: pulsar_reader
namespace: company.team

tasks:
  - id: reader
    type: io.kestra.plugin.pulsar.Reader
    uri: pulsar://localhost:26650
    topic: test_kestra
    deserializer: JSON

Properties

deserializer

  • Type: object
  • Dynamic:
  • Required: ✔️

pollDuration

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Default: 2.000000000
  • Format: duration

Duration waiting for record to be polled.

If no records are available, the maximum wait to wait for a new record.

topic

  • Type: object
  • Dynamic: ✔️
  • Required: ✔️

Pulsar topic(s) where to consume messages from.

Can be a string or a list of strings to consume from multiple topics.

uri

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Connection URLs.

You need to specify a Pulsar protocol URL.

  • Example of localhost: pulsar://localhost:6650
  • If you have multiple brokers: pulsar://localhost:6650,localhost:6651,localhost:6652
  • If you use TLS authentication: pulsar+ssl://pulsar.us-west.example.com:6651

authenticationToken

  • Type: string
  • Dynamic: ✔️
  • Required:

Authentication token.

Authentication token that can be required by some providers such as Clever Cloud.

maxDuration

  • Type: string
  • Dynamic:
  • Required:
  • Format: duration

The maximum duration waiting for new record.

It's not a hard limit and is evaluated every second.

maxRecords

  • Type: integer
  • Dynamic:
  • Required:

The maximum number of records to fetch before stopping.

It's not a hard limit and is evaluated every second.

messageId

  • Type: string
  • Dynamic: ✔️
  • Required:

Position the reader on a particular message.

The first message read will be the one immediately after the specified message. If no since or messageId are provided, we start at the beginning of the topic.

schemaString

  • Type: string
  • Dynamic: ✔️
  • Required:

JSON string of the topic's schema

Required for connecting with topics with a defined schema and strict schema checking

schemaType

  • Type: string
  • Dynamic: ✔️
  • Required:
  • Default: NONE
  • Possible Values:
    • NONE
    • AVRO
    • JSON

The schema type of the topic

Can be one of NONE, AVRO or JSON. None means there will be no schema enforced.

since

  • Type: string
  • Dynamic: ✔️
  • Required:
  • Format: duration

The initial reader positioning can be set at specific timestamp by providing total rollback duration.

So, broker can find a latest message that was published before given duration. eg: since set to 5 minutes (PT5M) indicates that broker should find message published 5 minutes in the past, and set the initial position to that messageId.

tlsOptions

TLS authentication options.

You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.

Outputs

messagesCount

  • Type: integer
  • Required:

Number of messages consumed.

uri

  • Type: string
  • Required:
  • Format: uri

URI of a Kestra internal storage file containing the consumed messages.

Definitions

io.kestra.plugin.pulsar.AbstractPulsarConnection-TlsOptions

Properties

ca
  • Type: string
  • Dynamic:
  • Required:

The ca certificate.

Must be a base64-encoded pem file.

cert
  • Type: string
  • Dynamic:
  • Required:

The client certificate.

Must be a base64-encoded pem file.

key
  • Type: string
  • Dynamic:
  • Required:

The key certificate.

Must be a base64-encoded pem file.

Was this page helpful?