# -*- coding: utf-8 -*-"""Kinesis-specific record types for producing to and consuming fromAWS Kinesis Data Streams."""importsysimportbase64importdataclassesifsys.version_info>=(3,11):fromtypingimportSelfelse:fromtyping_extensionsimportSelffromunistream.apiimportDataClassRecord
[docs]@dataclasses.dataclass(frozen=True)classKinesisRecord(DataClassRecord):""" A record designed for AWS Kinesis Data Streams. Extends :class:`~unistream.records.dataclass.DataClassRecord` with Kinesis-specific binary encoding methods. :param partition_key: Kinesis partition key. Defaults to ``record.id``. """
[docs]defto_put_record_data(self)->bytes:""" Convert the record to binary data for the ``put_records`` API. """returnbase64.b64encode(self.serialize().encode("utf-8"))
[docs]@classmethoddeffrom_get_record_data(cls,data:bytes,)->Self:""" Convert ``get_records`` API response data to a record instance. """returncls.deserialize(base64.b64decode(data).decode("utf-8"))
@propertydefpartition_key(self)->str:""" Kinesis partition key. Override this property for custom partitioning. Defaults to ``self.id``. """returnself.id
[docs]@dataclasses.dataclassclassKinesisGetRecordsResponseRecord:""" Deserializes the ``Records`` part of the ``get_records`` API response. """sequence_number:str=dataclasses.field()approximate_arrival_timestamp:str=dataclasses.field()data:bytes=dataclasses.field()partition_key:str=dataclasses.field()encryption_type:str|None=dataclasses.field()
[docs]@classmethoddeffrom_get_records_response(cls,res:dict,)->list[Self]:""" Parse the ``Records`` part of the ``get_records`` API response. """records=list()forrecordinres.get("Records",[]):records.append(cls(sequence_number=record["SequenceNumber"],approximate_arrival_timestamp=record["ApproximateArrivalTimestamp"],data=record["Data"],partition_key=record["PartitionKey"],encryption_type=record.get("EncryptionType"),))returnrecords