Quick Start

This guide walks you through producing and consuming records on a real AWS Kinesis Data Stream using unistream_aws_kinesis.

Prerequisites

  • Python 3.10+

  • An AWS account with permissions to create and delete Kinesis streams.

  • The AWS_PROFILE environment variable set to a valid profile name (you can put it in the .env file at the project root).

Project Layout

The Quick Start consists of five scripts:

File

Purpose

shared.py

AWS session, stream configuration, shared data model, and setup() / teardown() helpers.

setup.py

Creates the Kinesis stream (provisioned, 1 shard).

teardown.py

Deletes the Kinesis stream.

producer.py

Sends one record every 5 seconds.

consumer.py

Polls the stream every 1 second and prints each record.

Step 1 - Shared Settings and Data Model

shared.py defines everything that both the producer and the consumer need: the boto3 session, the stream name, the record class, and convenience functions to create / destroy the stream.

shared.py
# -*- coding: utf-8 -*-

"""
Shared settings, data model, and setup/teardown helpers for the Quick Start demo.

Prerequisites:

- Set the ``AWS_PROFILE`` environment variable (or put it in ``.env``).
- The profile must have permission to create/delete Kinesis streams.
"""

import os
import uuid
import dataclasses

import boto3

from unistream_aws_kinesis.api import KinesisRecord

# --- AWS session ---
aws_profile = os.environ["AWS_PROFILE"]
boto_ses = boto3.Session(profile_name=aws_profile)
kinesis_client = boto_ses.client("kinesis")

# --- Stream config ---
STREAM_NAME = "unistream_aws_kinesis_quick_start"
SHARD_COUNT = 1


# --- Data model ---
@dataclasses.dataclass(frozen=True)
class MyRecord(KinesisRecord):
    """
    A simple record carrying a sequence number and a random tag.
    """

    ith: int = dataclasses.field(default=0)
    tag: str = dataclasses.field(default_factory=lambda: uuid.uuid4().hex[:8])


# --- Setup / Teardown ---
def setup():
    """
    Create the Kinesis stream in PROVISIONED mode.
    Blocks until the stream becomes ACTIVE.
    """
    print(f"Creating stream {STREAM_NAME!r} (shards={SHARD_COUNT}) ...")
    kinesis_client.create_stream(
        StreamName=STREAM_NAME,
        ShardCount=SHARD_COUNT,
        StreamModeDetails={"StreamMode": "PROVISIONED"},
    )
    waiter = kinesis_client.get_waiter("stream_exists")
    waiter.wait(StreamName=STREAM_NAME)
    print("Stream is ACTIVE.")


def teardown():
    """
    Delete the Kinesis stream and wait until it is gone.
    """
    print(f"Deleting stream {STREAM_NAME!r} ...")
    kinesis_client.delete_stream(
        StreamName=STREAM_NAME,
        EnforceConsumerDeletion=True,
    )
    waiter = kinesis_client.get_waiter("stream_not_exists")
    waiter.wait(StreamName=STREAM_NAME)
    print("Stream deleted.")

Step 2 - Create the Stream

Run setup.py to create a single-shard, provisioned Kinesis stream. The script blocks until the stream reaches the ACTIVE state.

setup.py
# -*- coding: utf-8 -*-

"""
Create the Kinesis stream used by the Quick Start demo.

Usage::

    python docs/source/01-Quick-Start/setup.py
"""

from shared import setup

if __name__ == "__main__":
    setup()
$ python setup.py
Creating stream 'unistream_aws_kinesis_quick_start' (shards=1) ...
Stream is ACTIVE.

Step 3 - Start the Consumer

Open Terminal 1 and start the consumer. It discovers the shard automatically, creates a local file-based checkpoint, and begins polling.

AWS Kinesis allows up to 5 GetRecords calls per second per shard, so the 1-second polling interval is well within the rate limit.

consumer.py
# -*- coding: utf-8 -*-

"""
Quick Start — Consumer

Polls the Kinesis stream every 1 second and prints each record.

AWS Kinesis allows up to 5 ``GetRecords`` calls per second per shard,
so a 1-second interval is well within the limit.

Usage::

    python docs/source/01-Quick-Start/consumer.py

Press Ctrl-C to stop.
"""

import time
import shutil
import dataclasses
from pathlib import Path

from unistream.checkpoints.simple import SimpleCheckpoint

from unistream_aws_kinesis.api import KinesisStreamShard
from unistream_aws_kinesis.api import AwsKinesisStreamConsumer

from shared import MyRecord
from shared import STREAM_NAME
from shared import kinesis_client

# --- Data directory ---
dir_demo = Path(__file__).absolute().parent / ".consumer_data"
shutil.rmtree(dir_demo, ignore_errors=True)
dir_demo.mkdir(parents=True, exist_ok=True)

# --- Discover shard ---
res = kinesis_client.list_shards(StreamName=STREAM_NAME)
shards = KinesisStreamShard.from_list_shards_response(res)
shard_id = shards[0].ShardId
consumer_id = f"{STREAM_NAME}-{shard_id}"
print(f"Consuming from shard: {shard_id}")

# --- Get shard iterator (LATEST = only new records from now on) ---
res = kinesis_client.get_shard_iterator(
    StreamName=STREAM_NAME,
    ShardId=shard_id,
    ShardIteratorType="LATEST",
)
shard_iterator = res["ShardIterator"]

# --- Checkpoint ---
path_checkpoint = dir_demo / f"{consumer_id}.checkpoint.json"
path_records = dir_demo / f"{consumer_id}.records.json"

checkpoint = SimpleCheckpoint.load(
    checkpoint_file=str(path_checkpoint),
    records_file=str(path_records),
    lock_expire=900,
    max_attempts=3,
    initial_pointer=shard_iterator,
    start_pointer=shard_iterator,
)


# --- Consumer ---
@dataclasses.dataclass
class MyConsumer(AwsKinesisStreamConsumer):
    def process_record(self, record: MyRecord) -> str:
        s = record.serialize()
        print(f"  received: ith={record.ith}  id={record.id}  tag={record.tag}")
        return s

    def process_failed_record(self, record: MyRecord) -> str:
        s = record.serialize()
        print(f"  [DLQ] ith={record.ith}  id={record.id}  tag={record.tag}")
        return s


consumer = MyConsumer.new(
    record_class=MyRecord,
    consumer_id=consumer_id,
    kinesis_client=kinesis_client,
    stream_name=STREAM_NAME,
    shard_id=shard_id,
    checkpoint=checkpoint,
    limit=100,
    delay=1,
)

# --- Run ---
print("Consumer started. Polling every 1 second. Press Ctrl-C to stop.\n")
try:
    consumer.run(verbose=True)
except KeyboardInterrupt:
    print("\nConsumer stopped.")
$ python consumer.py
Consuming from shard: shardId-000000000000
Consumer started. Polling every 1 second. Press Ctrl-C to stop.

... (waiting for records)

Step 4 - Start the Producer

Open Terminal 2 and start the producer. It sends one record every 5 seconds. Records are buffered locally (write-ahead log) and flushed to Kinesis in batches of 3.

producer.py
# -*- coding: utf-8 -*-

"""
Quick Start — Producer

Sends one record every 5 seconds to the Kinesis stream, indefinitely.
Each record carries a sequential ``ith`` counter and a random ``tag``.

Usage::

    python docs/source/01-Quick-Start/producer.py

Press Ctrl-C to stop.
"""

import time
import shutil
from pathlib import Path

from unistream.buffers.file_buffer import FileBuffer
from unistream.producer import RetryConfig

from unistream_aws_kinesis.api import AwsKinesisStreamProducer

from shared import MyRecord
from shared import STREAM_NAME
from shared import kinesis_client

# --- Buffer directory ---
dir_demo = Path(__file__).absolute().parent / ".producer_data"
shutil.rmtree(dir_demo, ignore_errors=True)
dir_demo.mkdir(parents=True, exist_ok=True)
path_wal = dir_demo / "buffer.log"

# --- Create producer ---
producer = AwsKinesisStreamProducer.new(
    buffer=FileBuffer.new(
        record_class=MyRecord,
        path_wal=path_wal,
        max_records=3,
    ),
    retry_config=RetryConfig(exp_backoff=[1, 2, 4, 8]),
    kinesis_client=kinesis_client,
    stream_name=STREAM_NAME,
)

# --- Run ---
print("Producer started. Sending one record every 5 seconds. Press Ctrl-C to stop.\n")
ith = 0
try:
    while True:
        ith += 1
        record = MyRecord(ith=ith)
        producer.put(record, verbose=True)
        print(f"  [{ith}] put record id={record.id} tag={record.tag}")
        time.sleep(5)
except KeyboardInterrupt:
    print(f"\nStopped after {ith} records.")
$ python producer.py
Producer started. Sending one record every 5 seconds. Press Ctrl-C to stop.

  [1] put record id=a1b2c3d4-... tag=e5f6a7b8
  [2] put record id=d9e0f1a2-... tag=c3d4e5f6
  [3] put record id=b7c8d9e0-... tag=a1b2c3d4
  [4] put record id=f1a2b3c4-... tag=d5e6f7a8
  [5] put record id=e9f0a1b2-... tag=c3d4e5f6
  [6] put record id=a3b4c5d6-... tag=e7f8a9b0
^C
Stopped after 6 records.

Meanwhile, Terminal 1 (consumer) will start printing received records:

... (consumer output continues)
  received: ith=1  id=a1b2c3d4-...  tag=e5f6a7b8
  received: ith=2  id=d9e0f1a2-...  tag=c3d4e5f6
  received: ith=3  id=b7c8d9e0-...  tag=a1b2c3d4
  received: ith=4  id=f1a2b3c4-...  tag=d5e6f7a8
  received: ith=5  id=e9f0a1b2-...  tag=c3d4e5f6
  received: ith=6  id=a3b4c5d6-...  tag=e7f8a9b0
^C
Consumer stopped.

Note

Records arrive at the consumer only after the producer’s buffer flushes (every 3 records by default). You may see a short delay before the first batch appears.

Step 5 - Tear Down

Once you are done, delete the stream to avoid ongoing charges.

teardown.py
# -*- coding: utf-8 -*-

"""
Delete the Kinesis stream used by the Quick Start demo.

Usage::

    python docs/source/01-Quick-Start/teardown.py
"""

from shared import teardown

if __name__ == "__main__":
    teardown()
$ python teardown.py
Deleting stream 'unistream_aws_kinesis_quick_start' ...
Stream deleted.