Gathering detailed insights and metrics for @meinestadt.de/glue-schema-registry
Gathering detailed insights and metrics for @meinestadt.de/glue-schema-registry
This is a SerDe library to interact with the AWS Glue Schema Registry. It makes it easy to encode and decode messages with Avro schemas and the AWS' wire format.
npm install @meinestadt.de/glue-schema-registry
Typescript
Module System
Node Version
NPM Version
TypeScript (97.38%)
JavaScript (2.62%)
Total Downloads
182,670
Last Day
761
Last Week
4,277
Last Month
17,584
Last Year
137,677
13 Stars
396 Commits
6 Forks
3 Watching
4 Branches
2 Contributors
Minified
Minified + Gzipped
Latest Version
2.1.0
Package Id
@meinestadt.de/glue-schema-registry@2.1.0
Unpacked Size
60.58 kB
Size
18.29 kB
File Count
6
NPM Version
8.19.4
Node Version
16.20.2
Publised On
13 Jan 2025
Cumulative downloads
Total Downloads
Last day
-25%
761
Compared to previous day
Last week
-4.8%
4,277
Compared to previous week
Last month
38.7%
17,584
Compared to previous month
Last year
214.6%
137,677
Compared to previous year
This is a SerDe library to interact with the AWS Glue Schema Registry. It makes it easy to encode and decode messages with Avro schemas and the AWS' wire format. It's primary built to produce and consume messages with MSK, Kafka, Kinesis, SNS, and so on. This enables the creation of Javascript/Typescript applications that are compatible with messages created with the AWS Glue Java SerDe libraries.
Apache Avro-encoded messages can be created and consumed using this library. Protobuf and JSON Schema are not currently supported. The library supports gzip compression, schema registration, and schema evolution. Avsc (https://github.com/mtth/avsc) is used for avro encoding/decoding.
This library works well with kafkajs (https://kafka.js.org).
The library has been rewritten to use the AWS SDK v3. This is a breaking change. The migration should be relatively easy, but there are some API changes that you need to be aware of.
Glue.ClientConfiguration
to GlueClientConfig
.updateGlueClient
is now mandatory. The type changed from Glue.ClientConfiguration
to GlueClientConfig
.analyzeMessage
has changed. The type of the schema field is now GetSchemaVersionResponse
from the SDK v3.Install with npm
npm i @meinestadt.de/glue-schema-registry
First, create an avro schema and register it.
1import * as msglue from "@meinestadt.de/glue-schema-registry"; 2import * as avro from "avsc"; 3 4 5// avro schema 6const schema = avro.Type.forSchema({ 7 type: "record", 8 name: "property", 9 namespace: "de.meinestadt.demo", 10 fields: [ 11 { name: "demo", type: "string", default: "Hello World" }, 12 ] 13}); 14 15const registry = new msglue.GlueSchemaRegistry<SCHEMATYPE>("<GLUE REGISTRY NAME>", { 16 region: "<AWS_REGION>", 17}); 18 19// create a new schema 20// throws an error if the schema already exists 21try { 22 const schemaId = await registry.createSchema({ 23 type: SchemaType.AVRO, 24 schemaName: "Testschema", 25 compatibility: SchemaCompatibilityType.BACKWARD, 26 schema: JSON.stringify(schema), 27 }); 28} catch (error) { 29 ... 30} 31 32// or register a version of an existing schema 33// creates a new version or returns the id of an existing one, if a similar version already exists 34const schemaId = await registry.register({ 35 schemaName: "Testschema", 36 type: SchemaType.AVRO, 37 schema: JSON.stringify(schema), 38}); 39 40// now you can encode an object 41const encodedmessage = await registry.encode(schemaId, object);
the encoded message can then be sent to Kafka or MSK, for example:
1// import * as kafka from "kafkajs" 2// ... 3const kc = new kafka.Kafka(); 4const producer = kc.producer(); 5await producer.connect(); 6await producer.send({ 7 topic: "<TOPICNAME>", 8 messages: [{ value: encodedmessage }], 9});
To decode received messages:
1 const schema = avro.Type.forSchema({ 2 type: "record", 3 name: "property", 4 namespace: "de.meinestadt.demo", 5 fields: [ 6 { name: "demo", type: "string", default: "Hello World" }, 7 ] 8 }); 9 const dataset = await registry.decode(message.value, schema);
Note: registry.decode expects the raw message as the first parameter, plus the target schema as the second parameter. If the message is encoded with a different version of the schema, the encoding schema gets loaded from Glue. The record is then converted to the target schema. If the schemas are not compatible an exception is thrown.
You can set the type of the object that you want to encode, and that you receive when you decode a message, as type parameter when you create the instance.
For example:
1interface Address { 2 street: string; 3 zip: string; 4 city: string; 5} 6 7const registry = new msglue.GlueSchemaRegistry<Address>("<GLUE REGISTRY NAME>", { 8 region: "<AWS_REGION>", 9});
The constructor takes an GlueClientConfig object as parameter.
Creates a new schema in the glue schema registry. Throws an error if the schema already exists.
1registry.createSchema({ 2 type: SchemaType // currently only SchemaType.AVRO 3 schemaName: string 4 compatibility: SchemaCompatibilityType // NONE, BACKWARD, BACKWARD_ALL, DISABLED, FORWARD, FORWARD_ALL, FULL, FULL_ALL 5 schema: string // the schema as JSON string 6})
Registers a schema and makes it available for encoding/decoding.
Creates a new version if the schema does not exist yet.
Throws an exception if the Glue compatibility check fails.
Schemas are cached so that subsequent calls of register
do not lead to multiple AWS Glue calls.
1const schemaId = await registry.register({ 2 schemaName: string, 3 type: SchemaType, // currently only SchemaType.AVRO 4 schema: string, // schemas as JSON string 5});
Encode the object with a given glueSchemaId and returns a Buffer containing the binary data.
1async encode(glueSchemaId: string, object: T, props?: EncodeProps): Promise<Buffer>
Optional properties:
1{ 2 compress: boolean // default: true 3}
Decodes the binary message with the passed avro schema. Reads the ID of the producer schema from the message, and loads the schema from the Glue schema registry. Caches schemas for subsequent use. Converts the incoming message from the producers schema into the consumers schema. Throws an error if the producer schema does not exist, or cannot be loaded from glue, or if producer and consumer schema are not compatible.
Decodes both uncompressed and gzip compressed messages.
1async decode(message: Buffer, consumerschema: avro.Type): Promise<T>
If you need meta data, such as schema id and glue schema, you can use
1async analyzeMessage(message: Buffer): Promise<AnalyzeMessageResult>
to get the meta data for a message, without fully decoding it. The result is defined as follows:
1export type AnalyzeMessageResult = { 2 /** 3 * true if the message is valid 4 */ 5 valid: boolean 6 /** 7 * the error code, if valid is false, otherwise undefined 8 */ 9 error?: ERROR 10 /** 11 * the header version 12 */ 13 headerversion?: number 14 /** 15 * the compression type, may be 0 (none) or 5 (gzip) 16 */ 17 compression?: number 18 /** 19 * the uuid of the schema 20 */ 21 schemaId?: string 22 /** 23 * the glue schema 24 */ 25 schema?: GetSchemaVersionResponse 26}
You can refresh the internally cached Glue client by calling updateGlueClient
.
This is particulary useful when credentials need to get updated.
1updateGlueClient(props: GlueClientConfig)
The following lambda function consumes messages from Kafka/MSK, and prints the deserialized object to the console. Incoming messages must be encoded with a compatible Avro schema.
1import * as avro from "avsc"; 2import { GlueSchemaRegistry } from "@meinestadt.de/glue-schema-registry"; 3 4// define the interface for the deserialized message 5interface IHelloWorld { 6 message: string; 7} 8 9// the avro schema 10const schema = avro.Type.forSchema({ 11 type: "record", 12 namespace: "de.meinestadt.glueschema.demo", 13 name: "HelloWorld", 14 fields: [ 15 { 16 name: "message", 17 type: "string", 18 default: "", 19 }, 20 ], 21}); 22 23export const handler = async (event: any) => { 24 // create the registry object 25 // Note: the lambda function must have the permission to read schemas and 26 // schema versions from AWS Glue 27 const registry = new GlueSchemaRegistry<IHelloWorld>("MySchemas", { 28 region: "eu-central-1", 29 }); 30 // Iterate through keys 31 for (let key in event.records) { 32 console.log("Key: ", key); 33 // Iterate through records 34 const p = event.records[key].map((record: any) => { 35 console.log(`Message key: ${record.key}`); 36 // base64 decode the incoming message 37 const msg = Buffer.from(record.value, "base64"); 38 // decode the avro encoded message 39 return registry.decode(msg, schema).then((dataset) => { 40 // print the deserialized object to the console 41 console.log(dataset); 42 }); 43 }); 44 await Promise.all(p); 45 } 46};
@meinestadt.de/glue-schema-registry uses the same simple wire protocol as AWS' java libraries (https://github.com/awslabs/aws-glue-schema-registry). The protocol is defined as follows:
VERSION_HEADER (1 Byte) + COMPRESSION_BYTE (1 Byte) + SCHEMA_VERSION_ID (16 Byte) + CONTENT
The current supported version is 3. Compression can be 0 (no compression) or 5 (zlib compression).
No vulnerabilities found.
No security vulnerabilities found.