Send multiple records to AWS Kinesis Data Streams in a simple list of maps or using a JSON API payload
About this blueprint
AWS API
This flow will send multiple records to AWS Kinesis Data Streams. The PutRecords
task accepts either a simple list of maps (i.e., a list of dictionaries) or a URL of an internal storage file in ION format. When extracting data from an HTTP API, you must convert the JSON payload to ION format using the JsonToIon
task. Then, you can pass it to the PutRecords
task.
The Outputs tab will display all records sent to AWS Kinesis Data Streams, including the shard ID and sequence number. You can use these values to retrieve the records from the stream using the GetRecords
AWS API call.
Note that when sending the data
payload, this must be a string value. If you want to send a JSON object to your Kinesis Data Stream, wrap it into a string, as shown in the following Gist. In contrast, this payload will not work.
id: aws_kinesis_json
namespace: company.team
tasks:
- id: put_records_simple_map
type: io.kestra.plugin.aws.kinesis.PutRecords
streamName: kestra
records:
- data: sign-in
partitionKey: user1
- data: sign-out
partitionKey: user1
- id: extract
type: io.kestra.plugin.core.http.Download
uri: https://huggingface.co/datasets/kestra/datasets/resolve/main/json/user_events.json
- id: json_to_ion
type: io.kestra.plugin.serdes.json.JsonToIon
from: "{{ outputs.extract.uri }}"
newLine: false
- id: put_records
type: io.kestra.plugin.aws.kinesis.PutRecords
streamName: kestra
records: "{{ outputs.json_to_ion.uri }}"
More Related Blueprints