Gathering detailed insights and metrics for kinesis-local
Gathering detailed insights and metrics for kinesis-local
Gathering detailed insights and metrics for kinesis-local
Gathering detailed insights and metrics for kinesis-local
@aws-sdk/client-kinesis
AWS SDK for JavaScript Kinesis Client for Node.js, Browser and React Native
amazon-kinesis-video-streams-webrtc
Amazon Kinesis Video Streams WebRTC SDK for JavaScript.
@aws-sdk/client-kinesis-video
AWS SDK for JavaScript Kinesis Video Client for Node.js, Browser and React Native
aws-kinesis-agg
Node.js module to simplify working with Amazon Kinesis Records using Protcol Buffers encoding
npm install kinesis-local
Module System
Min. Node Version
Typescript Support
Node Version
NPM Version
56 Stars
970 Commits
17 Forks
2 Watching
17 Branches
16 Contributors
Updated on 27 Nov 2024
Scala (99.83%)
Shell (0.14%)
Dockerfile (0.03%)
Cumulative downloads
Total Downloads
Last day
-14%
121,657
Compared to previous day
Last week
-18.2%
619,072
Compared to previous week
Last month
1.7%
3,143,523
Compared to previous month
Last year
3,625.4%
15,470,005
Compared to previous year
1
A mock for the Kinesis API, intended for local testing.
There are a few ways to start kinesis-mock.
It is available as a docker image in the GitHub Container Registry:
1docker pull ghcr.io/etspaceman/kinesis-mock:0.4.1 2docker run -p 4567:4567 -p 4568:4568 ghcr.io/etspaceman/kinesis-mock:0.4.1
It is available on NPM as an executable service.
1npm i kinesis-local 2npx kinesis-local
You can also leverage the following executable options in the release assets:
File | Description | Launching |
---|---|---|
main.js | Executable NodeJS file that can be run in any NodeJS enabled environment | node ./main.js |
main.js.map | Source mappings for main.js | |
server.json | self-signed certificate for TLS. Should be included in the same area as main.js |
Below is the available configuration for the service. Note that it is not recommended to edit the ports in the docker environment (rather you can map these ports to a local one).
Variable | Data Type | Default Value | Notes |
---|---|---|---|
INITIALIZE_STREAMS | String | A comma-delimited string of stream names, its optional corresponding shard count and an optional region to initialize during startup. If the shard count is not provided, the default shard count of 4 is used. If the region is not provided, the default region is used. For example: "my-first-stream:1,my-other-stream::us-west-2,my-last-stream:1" | |
KINESIS_MOCK_TLS_PORT | Int | 4567 | Https Only |
KINESIS_MOCK_PLAIN_PORT | Int | 4568 | Http Only |
KINESIS_MOCK_KEYSTORE_PASSWORD | Int | Password for the JKS KeyStore (only for JVM, not JS) | |
KINESIS_MOCK_KEYMANAGER_PASSWORD | Int | Password for the JKS KeyManager (only for JVM, not JS) | |
KINESIS_MOCK_CERT_PASSWORD | Int | Password used for self-signed certificate (only for JS, not JVM) | |
KINESIS_MOCK_CERT_PATH | Int | server.json | Path to certificate file (only for JS, not JVM) |
CREATE_STREAM_DURATION | Duration | 500ms | |
DELETE_STREAM_DURATION | Duration | 500ms | |
REGISTER_STREAM_CONSUMER_DURATION | Duration | 500ms | |
START_STREAM_ENCRYPTION_DURATION | Duration | 500ms | |
STOP_STREAM_ENCRYPTION_DURATION | Duration | 500ms | |
DEREGISTER_STREAM_CONSUMER_DURATION | Duration | 500ms | |
MERGE_SHARDS_DURATION | Duration | 500ms | |
SPLIT_SHARD_DURATION | Duration | 500ms | |
UPDATE_SHARD_COUNT_DURATION | Duration | 500ms | |
UPDATE_STREAM_MODE_DURATION | Duration | 500ms | |
SHARD_LIMIT | Int | 50 | |
ON_DEMAND_STREAM_COUNT_LIMIT | Int | 10 | |
AWS_ACCOUNT_ID | String | "000000000000" | |
AWS_REGION | String | "us-east-1" | Default region in use for operations. E.g. if a region is not provided by the INITIALIZE_STREAMS values. |
LOG_LEVEL | String | "INFO" | Sets the log-level for kinesis-mock specific logs |
ROOT_LOG_LEVEL | String | "ERROR" | Sets the log-level for all dependencies |
LOAD_DATA_IF_EXISTS | Boolean | true | Loads data from the configured persisted data file if it exists |
SHOULD_PERSIST_DATA | Boolean | false | Persists data to disk. Used to keep data during restarts of the service |
PERSIST_PATH | String | "data" | Path to persist data to. If it doesn't start with "/", the path is considered relative to the present working directory. |
PERSIST_FILE_NAME | String | "kinesis-data.json" | File name for persisted data |
PERSIST_INTERVAL | Duration | 5s | Delay between data persistence |
You can configure the LOG_LEVEL
of the mock with the following levels in mind:
ERROR
- Unhandled errors in the serviceWARN
- Handled errors in the service (e.g. bad requests)INFO
- High-level, low-noise informational messages (default)DEBUG
- Low-level, high-noise informational messagesTRACE
- Log data bodies going in / out of the serviceThe image exposes 2 ports for interactions:
For an example docker-compose setup which uses this image, check out the docker-compose.yml file.
There are examples configuring the KPL, KCL and AWS SDK to use this mock in the integration tests.
1import software.amazon.awssdk.auth.credentials.{AwsCredentials,AwsCredentialsProvider} 2import software.amazon.awssdk.http.SdkHttpConfigurationOption 3import software.amazon.awssdk.http.async.SdkAsyncHttpClient 4import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient 5import software.amazon.awssdk.regions.Region 6import software.amazon.awssdk.services.kinesis.KinesisAsyncClient 7import software.amazon.awssdk.services.kinesis.model._ 8import software.amazon.awssdk.utils.AttributeMap 9 10object MyApp { 11 // A mock credentials provider 12 final case class AwsCreds(accessKey: String, secretKey: String) 13 extends AwsCredentials 14 with AwsCredentialsProvider { 15 override def accessKeyId(): String = accessKey 16 override def secretAccessKey(): String = secretKey 17 override def resolveCredentials(): AwsCredentials = this 18 } 19 20 object AwsCreds { 21 val LocalCreds: AwsCreds = 22 AwsCreds("mockKinesisAccessKey", "mockKinesisSecretKey") 23 } 24 25 // The kinesis-mock uses a self-signed certificate 26 private val trustAllCertificates = 27 AttributeMap 28 .builder() 29 .put( 30 SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, 31 java.lang.Boolean.TRUE 32 ) 33 .build() 34 35 def nettyClient: SdkAsyncHttpClient = 36 NettyNioAsyncHttpClient 37 .builder() 38 .buildWithDefaults(trustAllCertificates) 39 40 val kinesisClient: KinesisAsyncClient = 41 KinesisAsyncClient 42 .builder() 43 .httpClient(nettyClient) 44 .region(Region.US_EAST_1) 45 .credentialsProvider(AwsCreds.LocalCreds) 46 .endpointOverride(URI.create(s"https://localhost:4567")) 47 .build() 48}
1import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider} 2 3object MyApp { 4 // A mock credentials provider 5 final case class AwsCreds(accessKey: String, secretKey: String) 6 extends AWSCredentials 7 with AWSCredentialsProvider { 8 override def getAWSAccessKeyId: String = accessKey 9 override def getAWSSecretKey: String = secretKey 10 override def getCredentials: AWSCredentials = this 11 override def refresh(): Unit = () 12 } 13 14 object AwsCreds { 15 val LocalCreds: AwsCreds = 16 AwsCreds("mockKinesisAccessKey", "mockKinesisSecretKey") 17 } 18 19 val kplProducer = new KinesisProducer( 20 new KinesisProducerConfiguration() 21 .setCredentialsProvider(AwsCreds.LocalCreds) 22 .setRegion(Regions.US_EAST_1.getName) 23 .setKinesisEndpoint("localhost") 24 .setKinesisPort(4567L) 25 .setCloudwatchEndpoint("localhost") 26 .setCloudwatchPort(4566L) // Using localstack's Cloudwatch port 27 .setVerifyCertificate(false) 28 ) 29}
1import software.amazon.awssdk.auth.credentials.{AwsCredentials,AwsCredentialsProvider} 2import software.amazon.awssdk.http.SdkHttpConfigurationOption 3import software.amazon.awssdk.http.async.SdkAsyncHttpClient 4import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient 5import software.amazon.awssdk.regions.Region 6import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient 7import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient 8import software.amazon.awssdk.services.kinesis.KinesisAsyncClient 9import software.amazon.awssdk.services.kinesis.model._ 10import software.amazon.awssdk.utils.AttributeMap 11import software.amazon.kinesis.checkpoint.CheckpointConfig 12import software.amazon.kinesis.common._ 13import software.amazon.kinesis.coordinator.{CoordinatorConfig, Scheduler} 14import software.amazon.kinesis.leases.LeaseManagementConfig 15import software.amazon.kinesis.lifecycle.LifecycleConfig 16import software.amazon.kinesis.lifecycle.events._ 17import software.amazon.kinesis.metrics.MetricsConfig 18import software.amazon.kinesis.processor._ 19import software.amazon.kinesis.retrieval.polling.PollingConfig 20import software.amazon.kinesis.retrieval.RetrievalConfig 21 22object MyApp { 23 // A mock credentials provider 24 final case class AwsCreds(accessKey: String, secretKey: String) 25 extends AwsCredentials 26 with AwsCredentialsProvider { 27 override def accessKeyId(): String = accessKey 28 override def secretAccessKey(): String = secretKey 29 override def resolveCredentials(): AwsCredentials = this 30 } 31 32 object AwsCreds { 33 val LocalCreds: AwsCreds = 34 AwsCreds("mockKinesisAccessKey", "mockKinesisSecretKey") 35 } 36 37 // The kinesis-mock uses a self-signed certificate 38 private val trustAllCertificates = 39 AttributeMap 40 .builder() 41 .put( 42 SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, 43 java.lang.Boolean.TRUE 44 ) 45 .build() 46 47 def nettyClient: SdkAsyncHttpClient = 48 NettyNioAsyncHttpClient 49 .builder() 50 .buildWithDefaults(trustAllCertificates) 51 52 val kinesisClient: KinesisAsyncClient = 53 KinesisAsyncClient 54 .builder() 55 .httpClient(nettyClient) 56 .region(Region.US_EAST_1) 57 .credentialsProvider(AwsCreds.LocalCreds) 58 .endpointOverride(URI.create(s"https://localhost:4567")) 59 .build() 60 61 val cloudwatchClient: CloudWatchAsyncClient - 62 CloudWatchAsyncClient 63 .builder() 64 .httpClient(nettyClient) 65 .region(Region.US_EAST_1) 66 .credentialsProvider(AwsCreds.LocalCreds) 67 .endpointOverride(URI.create(s"https://localhost:4566")) // localstack port 68 .build() 69 70 val dynamoClient: DynamoDbAsyncClient - 71 DynamoDbAsyncClient 72 .builder() 73 .httpClient(nettyClient) 74 .region(Region.US_EAST_1) 75 .credentialsProvider(AwsCreds.LocalCreds) 76 .endpointOverride(URI.create(s"http://localhost:8000")) // dynamodb-local port 77 .build() 78 79 object KCLRecordProcessor extends ShardRecordProcessor { 80 override def initialize(x: InitializationInput): Unit = () 81 override def processRecords(x: ProcessRecordsInput): Unit = println(s"GOT RECORDS: $x") 82 override def leaseLost(x: LeaseLostInput): Unit = () 83 override def shardEnded(x: ShardEndedInput): Unit = () 84 override def shutdownRequested(x: ShutdownRequestedInput): Unit = () 85 } 86 87 object KCLRecordProcessorFactory extends ShardRecordProcessorFactory { 88 override def shardRecordProcessor(): ShardRecordProcessor = 89 KCLRecordProcessor 90 override def shardRecordProcessor( 91 streamIdentifier: StreamIdentifier 92 ): ShardRecordProcessor = KCLRecordProcessor 93 } 94 95 val appName = "some-app-name" 96 val workerId = "some-worker-id" 97 val streamName = "some-stream-name" 98 // kinesis-mock only supports polling consumers today 99 val retrievalSpecificConfig = new PollingConfig(streamName, kinesisClient) 100 101 // Consumer can be executed from this by running scheduler.run() 102 val scheduler = new Scheduler( 103 new CheckpointConfig(), 104 new CoordinatorConfig(appName) 105 .parentShardPollIntervalMillis(1000L), 106 new LeaseManagementConfig( 107 appName, 108 dynamoClient, 109 kinesisClient, 110 workerId 111 ).shardSyncIntervalMillis(1000L), 112 new LifecycleConfig(), 113 new MetricsConfig(cloudwatchClient, appName), 114 new ProcessorConfig(KCLRecordProcessorFactory), 115 new RetrievalConfig( 116 kinesisClient, 117 streamName, 118 appName 119 ).initialPositionInStreamExtended( 120 InitialPositionInStreamExtended.newInitialPosition( 121 InitialPositionInStream.TRIM_HORIZON 122 ) 123 ).retrievalSpecificConfig(retrievalSpecificConfig) 124 .retrievalFactory(retrievalSpecificConfig.retrievalFactory()) 125 ) 126}
No vulnerabilities found.
No security vulnerabilities found.