The Confluent Schema Registry API: A Comprehensive Guide

schema-registry-ci Actions Status
Maintainability
Test Coverage
Latest Stable Version
Total Downloads
License

Introduction

ProgramMatek presents the Confluent Schema Registry API, a powerful PHP 7.4+ library for consuming the Confluent Schema Registry REST API. This library offers low-level functions for creating PSR-7 compliant requests, as well as high-level abstractions to enhance the developer experience.

Table of Contents

  • Requirements
    • Hard Dependencies
    • Optional Dependencies
  • Installation
  • Compatibility
  • Usage
    • Asynchronous API
    • Synchronous API
    • Caching
    • Low-Level API
  • Testing
    • Unit tests, Coding standards, and static analysis
    • Integration tests
  • Contributing

Requirements

Hard dependencies

Dependency Version Reason
php ~7.4 Anything lower has reached EOL
guzzlephp/guzzle ~7.0 Using Request to build PSR-7 RequestInterface
beberlei/assert ~2.7 ~3.0
flix-tech/avro-php ^4.1 Maintained fork of the only Avro PHP implementation: rg/avro-php

Optional dependencies

Dependency Version Reason
doctrine/cache ~1.3 If you want to use the DoctrineCacheAdapter
psr/cache ^1.0 If you want to use the CacheItemPoolAdapter
psr/simple-cache ^1.0 If you want to use the SimpleCacheAdapter

Installation

To install the Confluent Schema Registry API library, simply use Composer:

composer require "ProgramMatek/confluent-schema-registry-api=^7.4"

Please note that if you are using a version lower than 5.0.3, we highly recommend updating it immediately due to a critical bug with the exception handling.

Compatibility

This library strictly follows semantic versioning. Therefore, while minor and patch releases will be compatible, major version upgrades may introduce incompatibilities that will be documented in the UPGRADE.md file.

See also  The Power of Print and Mail Automation

Usage

Asynchronous API

Interface declaration

Example: PromisingRegistry

<?php
use GuzzleHttpClient;
use FlixTechSchemaRegistryApiRegistryPromiseRegistry;
use FlixTechSchemaRegistryApiExceptionSchemaRegistryException;

$registry = new PromiseRegistry(new Client(['base_uri' => 'registry.example.com']));

// Register a schema with a subject
$schema = AvroSchema::parse('{"type": "string"}');

// The promise will either contain a schema ID as an integer when fulfilled,
// or a SchemaRegistryException instance when rejected.
// If the subject does not exist, it will be implicitly created.
$promise = $registry->register('test-subject', $schema);

// You can resolve the promise by either receiving the value or an instance of a SchemaRegistryException.
// It's more like an Either Monad since throwing Exceptions from rejection callbacks will throw them.
// We want to leave that decision to the library's user.
// TODO: Maybe return an Either Monad instead
$promise = $promise->then(static function ($schemaIdOrSchemaRegistryException) {
    if ($schemaIdOrSchemaRegistryException instanceof SchemaRegistryException) {
        throw $schemaIdOrSchemaRegistryException;
    }
    return $schemaIdOrSchemaRegistryException;
});

// Resolve the promise
$schemaId = $promise->wait();

// Get a schema by schema ID
$promise = $registry->schemaForId($schemaId);

// Additional callbacks can be added to the promise
$schema = $promise->wait();

// Get the version of a schema for a given subject.
$version = $registry->schemaVersion('test-subject', $schema)->wait();

// You can also get a schema by subject and version
$schema = $registry->schemaForSubjectAndVersion('test-subject', $version)->wait();

// Additionally, you can just query for the currently latest schema within a subject.
// *NOTE*: Once you requested this, it might not be the latest version anymore.
$latestSchema = $registry->latestVersion('test-subject')->wait();

// Sometimes you want to find out the global schema ID for a given schema.
$schemaId = $registry->schemaId('test-subject', $schema)->wait();

Synchronous API

Interface declaration

Example: BlockingRegistry

<?php
use FlixTechSchemaRegistryApiRegistryBlockingRegistry;
use FlixTechSchemaRegistryApiRegistryPromiseRegistry;
use GuzzleHttpClient;

$registry = new BlockingRegistry(new PromiseRegistry(new Client(['base_uri' => 'registry.example.com'])));

// The blocking registry resolves promises with `wait` and adds a throwing rejection callback.
$schema = AvroSchema::parse('{"type": "string"}');

// The returned value will be an integer, not a promise.
$schemaId = $registry->register('test-subject', $schema);

Caching

The library provides a CachedRegistry that accepts a CacheAdapter and a Registry. It supports both asynchronous and synchronous APIs.

NOTE:
Starting from version 4.x of this library, the API for the CacheAdapterInterface has been changed to allow caching of schema IDs based on the hash of a given schema.

Example

<?php
use FlixTechSchemaRegistryApiRegistryBlockingRegistry;
use FlixTechSchemaRegistryApiRegistryPromiseRegistry;
use FlixTechSchemaRegistryApiRegistryCachedRegistry;
use FlixTechSchemaRegistryApiRegistryCacheAvroObjectCacheAdapter;
use FlixTechSchemaRegistryApiRegistryCacheDoctrineCacheAdapter;
use DoctrineCommonCacheArrayCache;
use GuzzleHttpClient;

$asyncApi = new PromiseRegistry(new Client(['base_uri' => 'registry.example.com']));
$syncApi = new BlockingRegistry($asyncApi);

$doctrineCachedSyncApi = new CachedRegistry($asyncApi, new DoctrineCacheAdapter(new ArrayCache()));

// All adapters support both APIs. For asynchronous APIs, additional fulfillment callbacks will be registered.
$avroObjectCachedAsyncApi = new CachedRegistry($syncApi, new AvroObjectCacheAdapter());

// NEW in version 4.x: Custom hash function for caching schema IDs via the schema hash.
// By default, the following function is used internally:
$defaultHashFunction = static function (AvroSchema $schema) {
    return md5((string) $schema);
};

// You can also define your own hash callable.
$sha1HashFunction = static function (AvroSchema $schema) {
    return sha1((string) $schema);
};

// Pass the hash function as an optional third parameter to the CachedRegistry constructor.
$avroObjectCachedAsyncApi = new CachedRegistry($syncApi, new AvroObjectCacheAdapter(), $sha1HashFunction);

Low-Level API

The library provides a low-level API that offers simple functions returning PSR-7 request objects for the various registry endpoints. See Requests/Functions for more information. Additionally, there are requests for using the new DELETE API of the schema registry.

See also  Introducing TikTok Ads API: A Powerful Tool for Businesses

Testing

To run the test suite, this library uses a Makefile, which requires Docker.

You can set the default variables by making a copy of variables.mk.dist and changing them according to your needs.

Build the local Docker image

PHP_VERSION=7.3 XDEBUG_VERSION=2.9.8 make docker

Unit tests, Coding standards, and static analysis

PHP_VERSION=7.3 make ci-local

Integration tests

This library employs a Docker Compose configuration to set up a schema registry for integration testing. Therefore, Docker Compose version 1.18.x or newer is required to run these tests.

Environment Variables for controlling the platform
CONFLUENT_VERSION=latest
CONFLUENT_NETWORK_SUBNET=172.68.0.0/24
SCHEMA_REGISTRY_IPV4=172.68.0.103
KAFKA_BROKER_IPV4=172.68.0.102
ZOOKEEPER_IPV4=172.68.0.101
Build the Confluent platform with a specific version and run the integration tests
CONFLUENT_VERSION=5.2.3 make platform
make phpunit-integration
make clean

Contributing

To contribute to this library, follow this workflow:

  1. Fork the repository.
  2. Create a feature branch.
  3. Make your changes.
  4. Run tests to verify that everything is functioning correctly.
  5. Open a pull request to the upstream repository.
  6. Be proud of contributing to open source!

For more details, visit ProgramMatek.