consumer¶
AWS Kinesis Data Streams consumer implementation.
- class unistream_aws_kinesis.consumer.KinesisStreamShard(ShardId: str = None, ParentShardId: str | None = None, AdjacentParentShardId: str | None = None, HashKeyRange: dict | None = None, SequenceNumberRange: dict | None = None)[source]¶
Represents metadata of a Kinesis Stream Shard.
- class unistream_aws_kinesis.consumer.BaseAwsKinesisStreamConsumer(record_class: type[KinesisRecord] = REQ, limit: int = REQ, checkpoint: BaseCheckPoint = REQ, exp_backoff_multiplier: int = REQ, exp_backoff_base: int = REQ, exp_backoff_min: int = REQ, exp_backoff_max: int = REQ, skip_error: bool = REQ, delay: int | float = REQ, kinesis_client: KinesisClient = REQ, stream_name: str = REQ, shard_id: str = REQ)[source]¶
Base consumer that reads records from an AWS Kinesis Data Stream.
- Parameters:
record_class – Record class used to deserialize received data.
kinesis_client – A boto3 Kinesis client.
stream_name – Kinesis Stream name.
shard_id – Shard ID to read from.
Note
The
delayparameter innew()defaults to 1 second because theGetRecordsAPI is limited to 5 transactions per second per shard (see Kinesis Quotas). A 1-second interval provides a comfortable margin (1 TPS vs 5 TPS limit). Set a smaller value (minimum ~0.2 s) for lower latency, or a larger value to reduce cost and throttling risk.- classmethod new(record_class: type[KinesisRecord], consumer_id: str, kinesis_client: KinesisClient, stream_name: str, shard_id: str, checkpoint: T_CHECK_POINT, limit: int = 1000, exp_backoff_multiplier: int = 1, exp_backoff_base: int = 2, exp_backoff_min: int = 1, exp_backoff_max: int = 60, skip_error: bool = True, delay: int | float = 1, additional_kwargs: dict[str, Any] | None = None)[source]¶
Factory method to create a consumer.
- class unistream_aws_kinesis.consumer.AwsKinesisStreamConsumer(record_class: type[KinesisRecord] = REQ, limit: int = REQ, checkpoint: BaseCheckPoint = REQ, exp_backoff_multiplier: int = REQ, exp_backoff_base: int = REQ, exp_backoff_min: int = REQ, exp_backoff_max: int = REQ, skip_error: bool = REQ, delay: int | float = REQ, kinesis_client: KinesisClient = REQ, stream_name: str = REQ, shard_id: str = REQ)[source]¶
Ready-to-use Kinesis consumer. Override
process_recordandprocess_failed_recordto add your business logic.