Gathering detailed insights and metrics for kinesis-local
Gathering detailed insights and metrics for kinesis-local
Gathering detailed insights and metrics for kinesis-local
Gathering detailed insights and metrics for kinesis-local
serverless-local-kinesis
Add the plugin to your project
@rabblerouser/local-kinesis-lambda-runner
A script for local development of lambda functions that listen to kinesis streams
serverless-offline-kinesis-streams
A Serverless Plugin that creates local Kinesis streams and then triggers your Serverless Lambda Functions upon receiving new records.
serverless-offline-kinesis
Emulate AWS λ and Kinesis locally when developing your Serverless project
npm install kinesis-local
Typescript
Module System
Node Version
NPM Version
Scala (99.83%)
Shell (0.14%)
Dockerfile (0.03%)
Total Downloads
0
Last Day
0
Last Week
0
Last Month
0
Last Year
0
MIT License
55 Stars
1,159 Commits
17 Forks
2 Watchers
16 Branches
16 Contributors
Updated on Jul 11, 2025
Latest Version
0.4.12
Package Id
kinesis-local@0.4.12
Unpacked Size
11.79 MB
Size
2.94 MB
File Count
6
NPM Version
10.8.2
Node Version
18.20.8
Published on
Apr 18, 2025
Cumulative downloads
Total Downloads
Last Day
0%
NaN
Compared to previous day
Last Week
0%
NaN
Compared to previous week
Last Month
0%
NaN
Compared to previous month
Last Year
0%
NaN
Compared to previous year
1
A mock for the Kinesis API, intended for local testing.
There are a few ways to start kinesis-mock.
It is available as a docker image in the GitHub Container Registry:
1docker pull ghcr.io/etspaceman/kinesis-mock:0.4.8 2docker run -p 4567:4567 -p 4568:4568 ghcr.io/etspaceman/kinesis-mock:0.4.8
It is available on NPM as an executable service.
1npm i kinesis-local 2npx kinesis-local
You can also leverage the following executable options in the release assets:
File | Description | Launching |
---|---|---|
main.js | Executable NodeJS file that can be run in any NodeJS enabled environment | node ./main.js |
main.js.map | Source mappings for main.js | |
server.json | self-signed certificate for TLS. Should be included in the same area as main.js |
Below is the available configuration for the service. Note that it is not recommended to edit the ports in the docker environment (rather you can map these ports to a local one).
Variable | Data Type | Default Value | Notes |
---|---|---|---|
INITIALIZE_STREAMS | String | A comma-delimited string of stream names, its optional corresponding shard count and an optional region to initialize during startup. If the shard count is not provided, the default shard count of 4 is used. If the region is not provided, the default region is used. For example: "my-first-stream:1,my-other-stream::us-west-2,my-last-stream:1" | |
KINESIS_MOCK_TLS_PORT | Int | 4567 | Https Only |
KINESIS_MOCK_PLAIN_PORT | Int | 4568 | Http Only |
KINESIS_MOCK_KEYSTORE_PASSWORD | Int | Password for the JKS KeyStore (only for JVM, not JS) | |
KINESIS_MOCK_KEYMANAGER_PASSWORD | Int | Password for the JKS KeyManager (only for JVM, not JS) | |
KINESIS_MOCK_CERT_PASSWORD | Int | Password used for self-signed certificate (only for JS, not JVM) | |
KINESIS_MOCK_CERT_PATH | Int | server.json | Path to certificate file (only for JS, not JVM) |
CREATE_STREAM_DURATION | Duration | 500ms | |
DELETE_STREAM_DURATION | Duration | 500ms | |
REGISTER_STREAM_CONSUMER_DURATION | Duration | 500ms | |
START_STREAM_ENCRYPTION_DURATION | Duration | 500ms | |
STOP_STREAM_ENCRYPTION_DURATION | Duration | 500ms | |
DEREGISTER_STREAM_CONSUMER_DURATION | Duration | 500ms | |
MERGE_SHARDS_DURATION | Duration | 500ms | |
SPLIT_SHARD_DURATION | Duration | 500ms | |
UPDATE_SHARD_COUNT_DURATION | Duration | 500ms | |
UPDATE_STREAM_MODE_DURATION | Duration | 500ms | |
SHARD_LIMIT | Int | 50 | |
ON_DEMAND_STREAM_COUNT_LIMIT | Int | 10 | |
AWS_ACCOUNT_ID | String | "000000000000" | |
AWS_REGION | String | "us-east-1" | Default region in use for operations. E.g. if a region is not provided by the INITIALIZE_STREAMS values. |
LOG_LEVEL | String | "INFO" | Sets the log-level for kinesis-mock specific logs |
ROOT_LOG_LEVEL | String | "ERROR" | Sets the log-level for all dependencies |
LOAD_DATA_IF_EXISTS | Boolean | true | Loads data from the configured persisted data file if it exists |
SHOULD_PERSIST_DATA | Boolean | false | Persists data to disk. Used to keep data during restarts of the service |
PERSIST_PATH | String | "data" | Path to persist data to. If it doesn't start with "/", the path is considered relative to the present working directory. |
PERSIST_FILE_NAME | String | "kinesis-data.json" | File name for persisted data |
PERSIST_INTERVAL | Duration | 5s | Delay between data persistence |
You can configure the LOG_LEVEL
of the mock with the following levels in mind:
ERROR
- Unhandled errors in the serviceWARN
- Handled errors in the service (e.g. bad requests)INFO
- High-level, low-noise informational messages (default)DEBUG
- Low-level, high-noise informational messagesTRACE
- Log data bodies going in / out of the serviceThe image exposes 2 ports for interactions:
For an example docker-compose setup which uses this image, check out the docker-compose.yml file.
There are examples configuring the KPL, KCL and AWS SDK to use this mock in the integration tests.
1import software.amazon.awssdk.auth.credentials.{AwsCredentials,AwsCredentialsProvider} 2import software.amazon.awssdk.http.SdkHttpConfigurationOption 3import software.amazon.awssdk.http.async.SdkAsyncHttpClient 4import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient 5import software.amazon.awssdk.regions.Region 6import software.amazon.awssdk.services.kinesis.KinesisAsyncClient 7import software.amazon.awssdk.services.kinesis.model._ 8import software.amazon.awssdk.utils.AttributeMap 9 10object MyApp { 11 // A mock credentials provider 12 final case class AwsCreds(accessKey: String, secretKey: String) 13 extends AwsCredentials 14 with AwsCredentialsProvider { 15 override def accessKeyId(): String = accessKey 16 override def secretAccessKey(): String = secretKey 17 override def resolveCredentials(): AwsCredentials = this 18 } 19 20 object AwsCreds { 21 val LocalCreds: AwsCreds = 22 AwsCreds("mockKinesisAccessKey", "mockKinesisSecretKey") 23 } 24 25 // The kinesis-mock uses a self-signed certificate 26 private val trustAllCertificates = 27 AttributeMap 28 .builder() 29 .put( 30 SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, 31 java.lang.Boolean.TRUE 32 ) 33 .build() 34 35 def nettyClient: SdkAsyncHttpClient = 36 NettyNioAsyncHttpClient 37 .builder() 38 .buildWithDefaults(trustAllCertificates) 39 40 val kinesisClient: KinesisAsyncClient = 41 KinesisAsyncClient 42 .builder() 43 .httpClient(nettyClient) 44 .region(Region.US_EAST_1) 45 .credentialsProvider(AwsCreds.LocalCreds) 46 .endpointOverride(URI.create(s"https://localhost:4567")) 47 .build() 48}
1import software.amazon.awssdk.auth.credentials.{AwsCredentials,AwsCredentialsProvider} 2import software.amazon.awssdk.regions.Region 3import software.amazon.kinesis.producer._ 4 5object MyApp { 6 // A mock credentials provider 7 final case class AwsCreds(accessKey: String, secretKey: String) 8 extends AwsCredentials 9 with AwsCredentialsProvider { 10 override def accessKeyId(): String = accessKey 11 override def secretAccessKey(): String = secretKey 12 override def resolveCredentials(): AwsCredentials = this 13 } 14 15 object AwsCreds { 16 val LocalCreds: AwsCreds = 17 AwsCreds("mockKinesisAccessKey", "mockKinesisSecretKey") 18 } 19 20 val kplProducer = new KinesisProducer( 21 new KinesisProducerConfiguration() 22 .setCredentialsProvider(AwsCreds.LocalCreds) 23 .setRegion(Region.US_EAST_1.id()) 24 .setKinesisEndpoint("localhost") 25 .setKinesisPort(4567L) 26 .setCloudwatchEndpoint("localhost") 27 .setCloudwatchPort(4566L) // Using localstack's Cloudwatch port 28 .setVerifyCertificate(false) 29 ) 30}
1import software.amazon.awssdk.auth.credentials.{AwsCredentials,AwsCredentialsProvider} 2import software.amazon.awssdk.http.SdkHttpConfigurationOption 3import software.amazon.awssdk.http.async.SdkAsyncHttpClient 4import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient 5import software.amazon.awssdk.regions.Region 6import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient 7import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient 8import software.amazon.awssdk.services.kinesis.KinesisAsyncClient 9import software.amazon.awssdk.services.kinesis.model._ 10import software.amazon.awssdk.utils.AttributeMap 11import software.amazon.kinesis.checkpoint.CheckpointConfig 12import software.amazon.kinesis.common._ 13import software.amazon.kinesis.coordinator.{CoordinatorConfig, Scheduler} 14import software.amazon.kinesis.leases.LeaseManagementConfig 15import software.amazon.kinesis.leases.dynamodb.{ 16 DynamoDBLeaseManagementFactory, 17 DynamoDBLeaseSerializer 18} 19import software.amazon.kinesis.lifecycle.LifecycleConfig 20import software.amazon.kinesis.lifecycle.events._ 21import software.amazon.kinesis.metrics.MetricsConfig 22import software.amazon.kinesis.processor._ 23import software.amazon.kinesis.retrieval.polling.PollingConfig 24import software.amazon.kinesis.retrieval.RetrievalConfig 25 26object MyApp { 27 // A mock credentials provider 28 final case class AwsCreds(accessKey: String, secretKey: String) 29 extends AwsCredentials 30 with AwsCredentialsProvider { 31 override def accessKeyId(): String = accessKey 32 override def secretAccessKey(): String = secretKey 33 override def resolveCredentials(): AwsCredentials = this 34 } 35 36 object AwsCreds { 37 val LocalCreds: AwsCreds = 38 AwsCreds("mockKinesisAccessKey", "mockKinesisSecretKey") 39 } 40 41 // The kinesis-mock uses a self-signed certificate 42 private val trustAllCertificates = 43 AttributeMap 44 .builder() 45 .put( 46 SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, 47 java.lang.Boolean.TRUE 48 ) 49 .build() 50 51 def nettyClient: SdkAsyncHttpClient = 52 NettyNioAsyncHttpClient 53 .builder() 54 .buildWithDefaults(trustAllCertificates) 55 56 val kinesisClient: KinesisAsyncClient = 57 KinesisAsyncClient 58 .builder() 59 .httpClient(nettyClient) 60 .region(Region.US_EAST_1) 61 .credentialsProvider(AwsCreds.LocalCreds) 62 .endpointOverride(URI.create(s"https://localhost:4567")) 63 .build() 64 65 val cloudwatchClient: CloudWatchAsyncClient - 66 CloudWatchAsyncClient 67 .builder() 68 .httpClient(nettyClient) 69 .region(Region.US_EAST_1) 70 .credentialsProvider(AwsCreds.LocalCreds) 71 .endpointOverride(URI.create(s"https://localhost:4566")) // localstack port 72 .build() 73 74 val dynamoClient: DynamoDbAsyncClient - 75 DynamoDbAsyncClient 76 .builder() 77 .httpClient(nettyClient) 78 .region(Region.US_EAST_1) 79 .credentialsProvider(AwsCreds.LocalCreds) 80 .endpointOverride(URI.create(s"http://localhost:8000")) // dynamodb-local port 81 .build() 82 83 object KCLRecordProcessor extends ShardRecordProcessor { 84 override def initialize(x: InitializationInput): Unit = () 85 override def processRecords(x: ProcessRecordsInput): Unit = println(s"GOT RECORDS: $x") 86 override def leaseLost(x: LeaseLostInput): Unit = () 87 override def shardEnded(x: ShardEndedInput): Unit = () 88 override def shutdownRequested(x: ShutdownRequestedInput): Unit = () 89 } 90 91 object KCLRecordProcessorFactory extends ShardRecordProcessorFactory { 92 override def shardRecordProcessor(): ShardRecordProcessor = 93 KCLRecordProcessor 94 override def shardRecordProcessor( 95 streamIdentifier: StreamIdentifier 96 ): ShardRecordProcessor = KCLRecordProcessor 97 } 98 99 val appName = "some-app-name" 100 val workerId = "some-worker-id" 101 val streamName = "some-stream-name" 102 // kinesis-mock only supports polling consumers today 103 val retrievalSpecificConfig = new PollingConfig(streamName, kinesisClient) 104 // Local setups require us to tweak the lease management configuration 105 val defaultLeaseManagement = new LeaseManagementConfig( 106 appName, 107 appName, 108 dynamoClient, 109 kinesisClient, 110 workerId 111 ).shardSyncIntervalMillis(1000L) 112 .failoverTimeMillis(1000L) 113 val leaseManagementConfig = defaultLeaseManagement.leaseManagementFactory( 114 new DynamoDBLeaseManagementFactory( 115 defaultLeaseManagement.kinesisClient(), 116 defaultLeaseManagement.dynamoDBClient(), 117 defaultLeaseManagement.tableName(), 118 defaultLeaseManagement.workerIdentifier(), 119 defaultLeaseManagement.executorService(), 120 defaultLeaseManagement.failoverTimeMillis(), 121 defaultLeaseManagement.enablePriorityLeaseAssignment(), 122 defaultLeaseManagement.epsilonMillis(), 123 defaultLeaseManagement.maxLeasesForWorker(), 124 defaultLeaseManagement.maxLeasesToStealAtOneTime(), 125 defaultLeaseManagement.maxLeaseRenewalThreads(), 126 defaultLeaseManagement.cleanupLeasesUponShardCompletion(), 127 defaultLeaseManagement.ignoreUnexpectedChildShards(), 128 defaultLeaseManagement.shardSyncIntervalMillis(), 129 defaultLeaseManagement.consistentReads(), 130 defaultLeaseManagement.listShardsBackoffTimeInMillis(), 131 defaultLeaseManagement.maxListShardsRetryAttempts(), 132 defaultLeaseManagement.maxCacheMissesBeforeReload(), 133 defaultLeaseManagement.listShardsCacheAllowedAgeInSeconds(), 134 defaultLeaseManagement.cacheMissWarningModulus(), 135 defaultLeaseManagement.initialLeaseTableReadCapacity().toLong, 136 defaultLeaseManagement.initialLeaseTableWriteCapacity().toLong, 137 defaultLeaseManagement.tableCreatorCallback(), 138 defaultLeaseManagement.dynamoDbRequestTimeout(), 139 defaultLeaseManagement.billingMode(), 140 defaultLeaseManagement.leaseTableDeletionProtectionEnabled(), 141 defaultLeaseManagement.leaseTablePitrEnabled(), 142 defaultLeaseManagement.tags(), 143 new DynamoDBLeaseSerializer(), 144 defaultLeaseManagement.customShardDetectorProvider(), 145 false, 146 LeaseCleanupConfig 147 .builder() 148 .completedLeaseCleanupIntervalMillis(500L) 149 .garbageLeaseCleanupIntervalMillis(500L) 150 .leaseCleanupIntervalMillis(10.seconds.toMillis) 151 .build(), 152 defaultLeaseManagement 153 .workerUtilizationAwareAssignmentConfig() 154 .disableWorkerMetrics(true), 155 defaultLeaseManagement.gracefulLeaseHandoffConfig() 156 ) 157 ) 158 159 // Consumer can be executed from this by running scheduler.run() 160 val scheduler = new Scheduler( 161 new CheckpointConfig(), 162 new CoordinatorConfig(appName) 163 .parentShardPollIntervalMillis(1000L), 164 leaseManagementConfig, 165 new LifecycleConfig(), 166 new MetricsConfig(cloudwatchClient, appName), 167 new ProcessorConfig(KCLRecordProcessorFactory), 168 new RetrievalConfig( 169 kinesisClient, 170 streamName, 171 appName 172 ).initialPositionInStreamExtended( 173 InitialPositionInStreamExtended.newInitialPosition( 174 InitialPositionInStream.TRIM_HORIZON 175 ) 176 ).retrievalSpecificConfig(retrievalSpecificConfig) 177 .retrievalFactory(retrievalSpecificConfig.retrievalFactory()) 178 ) 179}
No vulnerabilities found.
No security vulnerabilities found.