# -*- coding: utf-8 -*-"""AWS Kinesis Data Streams producer implementation."""importtypingasTimportdataclassesfromfunc_args.apiimportREQfromunistream.abstractionimportT_RECORDfromunistream.abstractionimportT_BUFFERfromunistream.producerimportBaseProducerfromunistream.producerimportRetryConfigifT.TYPE_CHECKING:frommypy_boto3_kinesis.clientimportKinesisClient
[docs]@dataclasses.dataclassclassAwsKinesisStreamProducer(BaseProducer):""" Producer that sends records to an AWS Kinesis Data Stream. Uses ``kinesis_client.put_records()`` to send batches of records. :param kinesis_client: A boto3 Kinesis client. :param stream_name: The name of the Kinesis stream. .. note:: The ``PutRecords`` API accepts at most **500 records per request** and **5 MB per request**, with a per-shard write limit of **1,000 records/sec** and **1 MB/sec** (see `Kinesis Quotas <https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html>`_). When constructing the ``buffer``, make sure ``max_records <= 500`` and ``max_bytes`` does not exceed 5 MB (5_000_000) to stay within a single ``PutRecords`` call limit. """kinesis_client:"KinesisClient"=dataclasses.field(default=REQ)stream_name:str=dataclasses.field(default=REQ)
[docs]@classmethoddefnew(cls,buffer:T_BUFFER,retry_config:RetryConfig,kinesis_client:"KinesisClient",stream_name:str,):""" Create an :class:`AwsKinesisStreamProducer` instance. :param buffer: Buffer for batching records. :param retry_config: Retry configuration for send failures. :param kinesis_client: A boto3 Kinesis client. :param stream_name: The name of the Kinesis stream. """returncls(buffer=buffer,retry_config=retry_config,kinesis_client=kinesis_client,stream_name=stream_name,)
[docs]defsend(self,records:list[T_RECORD]):""" Send records to AWS Kinesis Data Stream via ``put_records``. """returnself.kinesis_client.put_records(Records=[dict(Data=record.to_put_record_data(),PartitionKey=record.partition_key,)forrecordinrecords],StreamName=self.stream_name,)