I’m using Lambda and Kinesis together, to validate and insert batches of records.

Everything works great when I test with 100, 1000, or even 10000 records. I’m getting a bunch of duplicates beyond that. I’m seeing the following types of errors arise when uploading larger data sets:

ProvisionedThroughputExceededException: Rate exceeded for shard

I just ran 20000 and ended up with almost 42000 records in the database. I’m new to Kinesis so I’m not even sure what I’m looking for, or if I’ve misconfigured something.

I’m working on the assumption that Kinesis is triggering the Consumer Lambda more than once and sending duplicate records.

CDK Infra:

Kinesis

export class StreamStack extends Stack {
    public readonly myStream: Stream;

    constructor(scope: Construct, id: string, props?: StackProps) {
        super(scope, id, props);

        this.myStream = new Stream(this, "MyStream", {
            streamName: "my-stream",
            streamMode: StreamMode.ON_DEMAND
        });
    }
}

Event Source

interface EventSourceStackProps extends StackProps {
    myStream: Stream,
    myConsumerLambda: NodejsFunction
}

export class EventSourceStack extends Stack {
    constructor(scope: Construct, id: string, props: EventSourceStackProps) {
        super(scope, id, props);

        new EventSourceMapping(this, "ConsumerFunctionEvent", {
            target: props.myConsumerLambda,
            batchSize: 10,
            startingPosition: StartingPosition.LATEST,
            eventSourceArn: props.myStream.streamArn
        });
        props.myStream.grantRead(props.myConsumerLambda);
    }
}

Producer and Consumer Lambda functions in a larger LambdaStack

        this.myProducerLambda = new NodejsFunction(this, "MyProducerLambda", {
            functionName: "myProducerHandler",
            runtime: Runtime.NODEJS_18_X,
            handler: "myProducerHandler",
            entry: "./lambda/myProducerHandler.ts",
            memorySize: 256,
            timeout: Duration.minutes(5)
        });
        props.myStream.grantWrite(this.myProducerLambda);

        this.myConsumerLambda = new NodejsFunction(this, "MyConsumerLambda", {
            functionName: "myConsumerHandler",
            runtime: Runtime.NODEJS_18_X,
            handler: "myConsumerHandler",
            entry: "./lambda/myConsumerHandler.ts",
            memorySize: 1024,
            timeout: Duration.minutes(15)
        });

The producer sends records in like so:

export const putStreamRecord = async (item: StreamItem): Promise<void> => {
    try {
        const client = new KinesisClient();
        const params = {
            Data: Buffer.from(JSON.stringify(item)),
            PartitionKey: item.key,
            StreamName: process.env.STREAM_NAME,
        };
        await client.send(new PutRecordCommand(params));
    } catch (e) {
        console.error(e);
        throw e;
    }
};

The consumer function looks something like this:

export const myConsumerHandler = async (event: KinesisStreamEvent): Promise<boolean> => {
    let batchSaveRes = false;
    try {
        //extract batch records
        const batchData = event.Records
            .map(rec => Buffer.from(rec.kinesis.data, "base64").toString())
            .map(buff => JSON.parse(buff))
            .map(item => item.payload);
        
        batchSaveRes = await consumeBatchStream(batchData);
        return batchSaveRes;
    } catch (e) {
        console.error("ERROR:", e);
        return batchSaveRes;
    }
};

The consumeBatchStream reference there, is straightforward. It just batch inserts the data into Postgres.

export const consumeBatchStream = async (batchData: Record<string, unknown>[]): Promise<boolean> => {
    let client: Client | null = null;
    try {
        client = await getDbClient("my-secret");
        await client?.connect();
        const query = //...builds an insert query;
        await client.query(query);
        console.log("INSERTED:", batchData);
        return true;
    } catch (e) {
        console.error("Error consuming batch stream:", e)
        throw e;
    } finally {
        await client?.end();
    }
};