Source code for unistream_aws_kinesis.producer

# -*- coding: utf-8 -*-

"""
AWS Kinesis Data Streams producer implementation.
"""

import typing as T
import dataclasses

from func_args.api import REQ

from unistream.abstraction import T_RECORD
from unistream.abstraction import T_BUFFER
from unistream.producer import BaseProducer
from unistream.producer import RetryConfig

if T.TYPE_CHECKING:
    from mypy_boto3_kinesis.client import KinesisClient


[docs] @dataclasses.dataclass class AwsKinesisStreamProducer(BaseProducer): """ Producer that sends records to an AWS Kinesis Data Stream. Uses ``kinesis_client.put_records()`` to send batches of records. :param kinesis_client: A boto3 Kinesis client. :param stream_name: The name of the Kinesis stream. .. note:: The ``PutRecords`` API accepts at most **500 records per request** and **5 MB per request**, with a per-shard write limit of **1,000 records/sec** and **1 MB/sec** (see `Kinesis Quotas <https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html>`_). When constructing the ``buffer``, make sure ``max_records <= 500`` and ``max_bytes`` does not exceed 5 MB (5_000_000) to stay within a single ``PutRecords`` call limit. """ kinesis_client: "KinesisClient" = dataclasses.field(default=REQ) stream_name: str = dataclasses.field(default=REQ)
[docs] @classmethod def new( cls, buffer: T_BUFFER, retry_config: RetryConfig, kinesis_client: "KinesisClient", stream_name: str, ): """ Create an :class:`AwsKinesisStreamProducer` instance. :param buffer: Buffer for batching records. :param retry_config: Retry configuration for send failures. :param kinesis_client: A boto3 Kinesis client. :param stream_name: The name of the Kinesis stream. """ return cls( buffer=buffer, retry_config=retry_config, kinesis_client=kinesis_client, stream_name=stream_name, )
[docs] def send(self, records: list[T_RECORD]): """ Send records to AWS Kinesis Data Stream via ``put_records``. """ return self.kinesis_client.put_records( Records=[ dict( Data=record.to_put_record_data(), PartitionKey=record.partition_key, ) for record in records ], StreamName=self.stream_name, )