Quick Start¶
This guide walks you through producing and consuming records on a real
AWS Kinesis Data Stream using unistream_aws_kinesis.
Prerequisites¶
Python 3.10+
An AWS account with permissions to create and delete Kinesis streams.
The
AWS_PROFILEenvironment variable set to a valid profile name (you can put it in the.envfile at the project root).
Project Layout¶
The Quick Start consists of five scripts:
File |
Purpose |
|---|---|
|
AWS session, stream configuration, shared data model, and
|
|
Creates the Kinesis stream (provisioned, 1 shard). |
|
Deletes the Kinesis stream. |
|
Sends one record every 5 seconds. |
|
Polls the stream every 1 second and prints each record. |
Step 2 - Create the Stream¶
Run setup.py to create a single-shard, provisioned Kinesis stream.
The script blocks until the stream reaches the ACTIVE state.
# -*- coding: utf-8 -*-
"""
Create the Kinesis stream used by the Quick Start demo.
Usage::
python docs/source/01-Quick-Start/setup.py
"""
from shared import setup
if __name__ == "__main__":
setup()
$ python setup.py
Creating stream 'unistream_aws_kinesis_quick_start' (shards=1) ...
Stream is ACTIVE.
Step 3 - Start the Consumer¶
Open Terminal 1 and start the consumer. It discovers the shard automatically, creates a local file-based checkpoint, and begins polling.
AWS Kinesis allows up to 5 GetRecords calls per second per shard, so the
1-second polling interval is well within the rate limit.
# -*- coding: utf-8 -*-
"""
Quick Start — Consumer
Polls the Kinesis stream every 1 second and prints each record.
AWS Kinesis allows up to 5 ``GetRecords`` calls per second per shard,
so a 1-second interval is well within the limit.
Usage::
python docs/source/01-Quick-Start/consumer.py
Press Ctrl-C to stop.
"""
import time
import shutil
import dataclasses
from pathlib import Path
from unistream.checkpoints.simple import SimpleCheckpoint
from unistream_aws_kinesis.api import KinesisStreamShard
from unistream_aws_kinesis.api import AwsKinesisStreamConsumer
from shared import MyRecord
from shared import STREAM_NAME
from shared import kinesis_client
# --- Data directory ---
dir_demo = Path(__file__).absolute().parent / ".consumer_data"
shutil.rmtree(dir_demo, ignore_errors=True)
dir_demo.mkdir(parents=True, exist_ok=True)
# --- Discover shard ---
res = kinesis_client.list_shards(StreamName=STREAM_NAME)
shards = KinesisStreamShard.from_list_shards_response(res)
shard_id = shards[0].ShardId
consumer_id = f"{STREAM_NAME}-{shard_id}"
print(f"Consuming from shard: {shard_id}")
# --- Get shard iterator (LATEST = only new records from now on) ---
res = kinesis_client.get_shard_iterator(
StreamName=STREAM_NAME,
ShardId=shard_id,
ShardIteratorType="LATEST",
)
shard_iterator = res["ShardIterator"]
# --- Checkpoint ---
path_checkpoint = dir_demo / f"{consumer_id}.checkpoint.json"
path_records = dir_demo / f"{consumer_id}.records.json"
checkpoint = SimpleCheckpoint.load(
checkpoint_file=str(path_checkpoint),
records_file=str(path_records),
lock_expire=900,
max_attempts=3,
initial_pointer=shard_iterator,
start_pointer=shard_iterator,
)
# --- Consumer ---
@dataclasses.dataclass
class MyConsumer(AwsKinesisStreamConsumer):
def process_record(self, record: MyRecord) -> str:
s = record.serialize()
print(f" received: ith={record.ith} id={record.id} tag={record.tag}")
return s
def process_failed_record(self, record: MyRecord) -> str:
s = record.serialize()
print(f" [DLQ] ith={record.ith} id={record.id} tag={record.tag}")
return s
consumer = MyConsumer.new(
record_class=MyRecord,
consumer_id=consumer_id,
kinesis_client=kinesis_client,
stream_name=STREAM_NAME,
shard_id=shard_id,
checkpoint=checkpoint,
limit=100,
delay=1,
)
# --- Run ---
print("Consumer started. Polling every 1 second. Press Ctrl-C to stop.\n")
try:
consumer.run(verbose=True)
except KeyboardInterrupt:
print("\nConsumer stopped.")
$ python consumer.py
Consuming from shard: shardId-000000000000
Consumer started. Polling every 1 second. Press Ctrl-C to stop.
... (waiting for records)
Step 4 - Start the Producer¶
Open Terminal 2 and start the producer. It sends one record every 5 seconds. Records are buffered locally (write-ahead log) and flushed to Kinesis in batches of 3.
# -*- coding: utf-8 -*-
"""
Quick Start — Producer
Sends one record every 5 seconds to the Kinesis stream, indefinitely.
Each record carries a sequential ``ith`` counter and a random ``tag``.
Usage::
python docs/source/01-Quick-Start/producer.py
Press Ctrl-C to stop.
"""
import time
import shutil
from pathlib import Path
from unistream.buffers.file_buffer import FileBuffer
from unistream.producer import RetryConfig
from unistream_aws_kinesis.api import AwsKinesisStreamProducer
from shared import MyRecord
from shared import STREAM_NAME
from shared import kinesis_client
# --- Buffer directory ---
dir_demo = Path(__file__).absolute().parent / ".producer_data"
shutil.rmtree(dir_demo, ignore_errors=True)
dir_demo.mkdir(parents=True, exist_ok=True)
path_wal = dir_demo / "buffer.log"
# --- Create producer ---
producer = AwsKinesisStreamProducer.new(
buffer=FileBuffer.new(
record_class=MyRecord,
path_wal=path_wal,
max_records=3,
),
retry_config=RetryConfig(exp_backoff=[1, 2, 4, 8]),
kinesis_client=kinesis_client,
stream_name=STREAM_NAME,
)
# --- Run ---
print("Producer started. Sending one record every 5 seconds. Press Ctrl-C to stop.\n")
ith = 0
try:
while True:
ith += 1
record = MyRecord(ith=ith)
producer.put(record, verbose=True)
print(f" [{ith}] put record id={record.id} tag={record.tag}")
time.sleep(5)
except KeyboardInterrupt:
print(f"\nStopped after {ith} records.")
$ python producer.py
Producer started. Sending one record every 5 seconds. Press Ctrl-C to stop.
[1] put record id=a1b2c3d4-... tag=e5f6a7b8
[2] put record id=d9e0f1a2-... tag=c3d4e5f6
[3] put record id=b7c8d9e0-... tag=a1b2c3d4
[4] put record id=f1a2b3c4-... tag=d5e6f7a8
[5] put record id=e9f0a1b2-... tag=c3d4e5f6
[6] put record id=a3b4c5d6-... tag=e7f8a9b0
^C
Stopped after 6 records.
Meanwhile, Terminal 1 (consumer) will start printing received records:
... (consumer output continues)
received: ith=1 id=a1b2c3d4-... tag=e5f6a7b8
received: ith=2 id=d9e0f1a2-... tag=c3d4e5f6
received: ith=3 id=b7c8d9e0-... tag=a1b2c3d4
received: ith=4 id=f1a2b3c4-... tag=d5e6f7a8
received: ith=5 id=e9f0a1b2-... tag=c3d4e5f6
received: ith=6 id=a3b4c5d6-... tag=e7f8a9b0
^C
Consumer stopped.
Note
Records arrive at the consumer only after the producer’s buffer flushes (every 3 records by default). You may see a short delay before the first batch appears.
Step 5 - Tear Down¶
Once you are done, delete the stream to avoid ongoing charges.
# -*- coding: utf-8 -*-
"""
Delete the Kinesis stream used by the Quick Start demo.
Usage::
python docs/source/01-Quick-Start/teardown.py
"""
from shared import teardown
if __name__ == "__main__":
teardown()
$ python teardown.py
Deleting stream 'unistream_aws_kinesis_quick_start' ...
Stream deleted.