# -*- coding: utf-8 -*-"""AWS Kinesis Data Streams consumer implementation."""importsysimporttypingasTimportdataclassesifsys.version_info>=(3,11):fromtypingimportSelfelse:fromtyping_extensionsimportSelffromfunc_args.apiimportREQfromunistream.abstractionimportT_CHECK_POINTfromunistream.checkpointimportT_POINTERfromunistream.consumerimportBaseConsumerfrom.recordsimportKinesisRecordfrom.recordsimportKinesisGetRecordsResponseRecordifT.TYPE_CHECKING:frommypy_boto3_kinesis.clientimportKinesisClient
[docs]@dataclasses.dataclassclassKinesisStreamShard:""" Represents metadata of a Kinesis Stream Shard. """ShardId:str=dataclasses.field(default=None)ParentShardId:str|None=dataclasses.field(default=None)AdjacentParentShardId:str|None=dataclasses.field(default=None)HashKeyRange:dict|None=dataclasses.field(default=None)SequenceNumberRange:dict|None=dataclasses.field(default=None)
[docs]@classmethoddeffrom_list_shards_response(cls,res:dict)->list[Self]:""" Create a list of shard objects from the ``list_shards`` API response. """shards=res.get("Shards",[])return[cls(ShardId=shard.get("ShardId"),ParentShardId=shard.get("ParentShardId"),AdjacentParentShardId=shard.get("AdjacentParentShardId"),HashKeyRange=shard.get("HashKeyRange"),SequenceNumberRange=shard.get("SequenceNumberRange"),)forshardinshards]
[docs]@dataclasses.dataclassclassBaseAwsKinesisStreamConsumer(BaseConsumer):""" Base consumer that reads records from an AWS Kinesis Data Stream. :param record_class: Record class used to deserialize received data. :param kinesis_client: A boto3 Kinesis client. :param stream_name: Kinesis Stream name. :param shard_id: Shard ID to read from. .. note:: The ``delay`` parameter in :meth:`new` defaults to **1 second** because the ``GetRecords`` API is limited to **5 transactions per second per shard** (see `Kinesis Quotas <https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html>`_). A 1-second interval provides a comfortable margin (1 TPS vs 5 TPS limit). Set a smaller value (minimum ~0.2 s) for lower latency, or a larger value to reduce cost and throttling risk. """record_class:type[KinesisRecord]=dataclasses.field(default=REQ)kinesis_client:"KinesisClient"=dataclasses.field(default=REQ)stream_name:str=dataclasses.field(default=REQ)shard_id:str=dataclasses.field(default=REQ)
[docs]defget_records(self,limit:int|None=None,)->tuple[list[KinesisRecord],T_POINTER]:""" Call ``kinesis_client.get_records(...)`` API to get records. """iflimitisNone:limit=self.limitres=self.kinesis_client.get_records(ShardIterator=self.checkpoint.start_pointer,Limit=limit,)next_pointer=res.get("NextShardIterator")response_records=KinesisGetRecordsResponseRecord.from_get_records_response(res)records=[self.record_class.from_get_record_data(response_record.data)forresponse_recordinresponse_records]returnrecords,next_pointer
[docs]@dataclasses.dataclassclassAwsKinesisStreamConsumer(BaseAwsKinesisStreamConsumer):""" Ready-to-use Kinesis consumer. Override ``process_record`` and ``process_failed_record`` to add your business logic. """