consumer

AWS Kinesis Data Streams consumer implementation.

class unistream_aws_kinesis.consumer.KinesisStreamShard(ShardId: str = None, ParentShardId: str | None = None, AdjacentParentShardId: str | None = None, HashKeyRange: dict | None = None, SequenceNumberRange: dict | None = None)[source]

Represents metadata of a Kinesis Stream Shard.

classmethod from_list_shards_response(res: dict) list[Self][source]

Create a list of shard objects from the list_shards API response.

class unistream_aws_kinesis.consumer.BaseAwsKinesisStreamConsumer(record_class: type[KinesisRecord] = REQ, limit: int = REQ, checkpoint: BaseCheckPoint = REQ, exp_backoff_multiplier: int = REQ, exp_backoff_base: int = REQ, exp_backoff_min: int = REQ, exp_backoff_max: int = REQ, skip_error: bool = REQ, delay: int | float = REQ, kinesis_client: KinesisClient = REQ, stream_name: str = REQ, shard_id: str = REQ)[source]

Base consumer that reads records from an AWS Kinesis Data Stream.

Parameters:
  • record_class – Record class used to deserialize received data.

  • kinesis_client – A boto3 Kinesis client.

  • stream_name – Kinesis Stream name.

  • shard_id – Shard ID to read from.

Note

The delay parameter in new() defaults to 1 second because the GetRecords API is limited to 5 transactions per second per shard (see Kinesis Quotas). A 1-second interval provides a comfortable margin (1 TPS vs 5 TPS limit). Set a smaller value (minimum ~0.2 s) for lower latency, or a larger value to reduce cost and throttling risk.

classmethod new(record_class: type[KinesisRecord], consumer_id: str, kinesis_client: KinesisClient, stream_name: str, shard_id: str, checkpoint: T_CHECK_POINT, limit: int = 1000, exp_backoff_multiplier: int = 1, exp_backoff_base: int = 2, exp_backoff_min: int = 1, exp_backoff_max: int = 60, skip_error: bool = True, delay: int | float = 1, additional_kwargs: dict[str, Any] | None = None)[source]

Factory method to create a consumer.

get_records(limit: int | None = None) tuple[list[KinesisRecord], str | int][source]

Call kinesis_client.get_records(...) API to get records.

class unistream_aws_kinesis.consumer.AwsKinesisStreamConsumer(record_class: type[KinesisRecord] = REQ, limit: int = REQ, checkpoint: BaseCheckPoint = REQ, exp_backoff_multiplier: int = REQ, exp_backoff_base: int = REQ, exp_backoff_min: int = REQ, exp_backoff_max: int = REQ, skip_error: bool = REQ, delay: int | float = REQ, kinesis_client: KinesisClient = REQ, stream_name: str = REQ, shard_id: str = REQ)[source]

Ready-to-use Kinesis consumer. Override process_record and process_failed_record to add your business logic.