Enterprise Drivers

Kafka Driver

Introduction

Note: This driver is only available with Kipchak Enterprise.

The Kafka Driver provides a fully typed interface for producing and consuming messages via Apache Kafka. It wraps the native php-rdkafka extension — the industry-standard PHP binding for librdkafka — offering production-grade throughput and reliability.

Key features include:

  • Multiple Connections: Configure separate clients for different Kafka clusters or environments.
  • Typed API: Dedicated message objects and enums — your IDE guides every call.
  • Producer: Publish single messages or batches, with configurable compression, acknowledgements, and delivery timeouts.
  • Consumer: Subscribe to topics with explicit consumer group management, configurable offset reset, and manual commit support.
  • Security: SASL PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, and SSL/TLS — suitable for managed Kafka services (Confluent Cloud, MSK, Aiven, Redpanda, etc.).
  • FrankenPHP Worker-Safe: The underlying producer is lazily created and reused across requests within the same worker process.

Requirements

  • PHP 8.4+
  • The ext-rdkafka PHP extension (wraps librdkafka)
  • librdkafka >= 2.0 installed on the host system

Installation

To install this driver, you need access to the Enterprise Composer repository at https://php.pkgs.1x.ax/.

If you have an enterprise licence, please contact your account representative for access.

composer require kipchak/driver-kafka

Initialise the Driver

Add the following line to your drivers/drivers.php file:

\Kipchak\Driver\Kafka\Kafka::initialise($container);

Configuration

Create a config file named kipchak.kafka.php in your project's config/ directory.

Connection Settings

Define connections in the connections array. Each key (e.g. default, events) is a connection name.

KeyRequiredDescription
brokersYesComma-separated list of host:port broker addresses
securityNoSecurity settings (see below)
producerNoProducer-specific tuning (see below)
consumerNoConsumer defaults (see below)

Security Settings

Configure security when connecting to a secured cluster:

KeyDescription
security.protocolPLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL
security.sasl.mechanismPLAIN, SCRAM-SHA-256, or SCRAM-SHA-512
security.sasl.usernameSASL username
security.sasl.passwordSASL password
security.ssl.ca_locationPath to the CA certificate file
security.ssl.certificate_locationPath to the client certificate file (mutual TLS)
security.ssl.key_locationPath to the client private key file (mutual TLS)
security.ssl.key_passwordPassphrase for the private key
security.ssl.verify_hostnametrue to enable hostname verification (default), false to disable

Producer Settings

KeyDefaultDescription
producer.acksallAcknowledgement level: all (strongest), 0 (fire and forget), 1 (leader only)
producer.compressionnoneCompression codec: none, gzip, snappy, lz4, or zstd
producer.linger_msMilliseconds to wait for batching additional messages
producer.batch_sizeMaximum batch size in bytes
producer.delivery_timeout_ms30000Maximum time to wait for delivery confirmation before raising an error

Consumer Settings

Consumer settings define defaults applied when calling consumer(). The group_id is always supplied at call time.

KeyDefaultDescription
consumer.auto_offset_resetearliestWhere to start consuming when no committed offset exists: earliest, latest, or error
consumer.enable_auto_commitfalseWhether to commit offsets automatically. Recommended: false for manual commit control
consumer.session_timeout_ms30000Consumer group session timeout in milliseconds
consumer.max_poll_interval_ms300000Maximum time between polls before the consumer is considered failed

Example Configuration

<?php

use function Kipchak\Core\env;

return [
    'connections' => [
        'default' => [
            'brokers'  => env('KAFKA_BROKERS', 'kafka:9092'),
            'security' => [
                'protocol' => env('KAFKA_SECURITY_PROTOCOL', 'SASL_SSL'),
                'sasl'     => [
                    'mechanism' => env('KAFKA_SASL_MECHANISM', 'SCRAM-SHA-256'),
                    'username'  => env('KAFKA_USERNAME', ''),
                    'password'  => env('KAFKA_PASSWORD', ''),
                ],
                'ssl' => [
                    'ca_location'      => env('KAFKA_CA_LOCATION', '/etc/ssl/certs/ca-certificates.crt'),
                    'verify_hostname'  => true,
                ],
            ],
            'producer' => [
                'acks'                => 'all',
                'compression'         => 'lz4',
                'linger_ms'           => 5,
                'delivery_timeout_ms' => 30000,
            ],
            'consumer' => [
                'auto_offset_reset'    => 'earliest',
                'enable_auto_commit'   => false,
                'session_timeout_ms'   => 30000,
                'max_poll_interval_ms' => 300000,
            ],
        ],
    ],
];

Environment Variables

KAFKA_BROKERS=broker1:9092,broker2:9092
KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISM=SCRAM-SHA-256
KAFKA_USERNAME=your-kafka-username
KAFKA_PASSWORD=your-kafka-password
KAFKA_CA_LOCATION=/etc/ssl/certs/ca-certificates.crt

Usage

Retrieve a connection by name. If no name is given, default is used.

use Kipchak\Driver\Kafka\Kafka;

$kafka = Kafka::get();           // 'default' connection
$kafka = Kafka::get('events');   // named connection

Publishing a Single Message

// Publish with automatic partition assignment
$kafka->publish(
    topic:   'orders',
    payload: json_encode(['id' => 42, 'status' => 'placed']),
    key:     '42',
);

// Publish with custom headers
$kafka->publish(
    topic:   'orders',
    payload: json_encode(['id' => 43, 'status' => 'shipped']),
    key:     '43',
    headers: ['trace-id' => 'abc-123', 'source' => 'api'],
);

Publishing a Batch

Batching minimises round-trips to the broker. All messages are enqueued before a single flush.

use Kipchak\Driver\Kafka\Message\ProducerMessage;

$kafka->publishBatch([
    new ProducerMessage(topic: 'orders', payload: json_encode(['id' => 1]), key: '1'),
    new ProducerMessage(topic: 'orders', payload: json_encode(['id' => 2]), key: '2'),
    new ProducerMessage(topic: 'invoices', payload: json_encode(['order_id' => 1])),
]);

Consuming Messages

Consuming with the driver is designed for use in dedicated background worker processes, not inside HTTP request handlers. Create a consumer, poll in a loop, and commit offsets manually.

$consumer = $kafka->consumer(
    groupId: 'order-processor',
    topics:  ['orders'],
);

try {
    while (true) {
        $message = $consumer->consume(timeoutMs: 1000);

        if ($message === null) {
            // Timed out or end of partition — poll again
            continue;
        }

        // Process the message
        $data = json_decode($message->payload, true);
        processOrder($data);

        // Commit the offset after successful processing
        $consumer->commit();
    }
} finally {
    // Always close the consumer to release group membership
    $consumer->close();
}

Type Reference

Enums

EnumValues
SecurityProtocolPlaintext Ssl SaslPlaintext SaslSsl
SaslMechanismPlain ScramSha256 ScramSha512
CompressionTypeNone Gzip Snappy Lz4 Zstd
AutoOffsetResetEarliest Latest Error

ProducerMessage Properties

PropertyTypeDefaultDescription
topicstringrequiredTarget topic name
payloadstringrequiredRaw message payload
keystring|nullnullRouting key for partition assignment
headersarray[]Key/value message headers
partitionint-1Target partition (-1 = automatic)

KafkaMessage Properties

PropertyTypeDescription
topicstringSource topic name
payloadstringRaw message payload
keystring|nullMessage key
partitionintPartition the message was read from
offsetintOffset within the partition
timestampMsintMessage timestamp in milliseconds (0 if not set by the producer)
headersarrayKey/value message headers

Error Handling

publish() and publishBatch() throw \RuntimeException when the producer fails to deliver messages within delivery_timeout_ms:

[Kafka] Producer flush failed: Local: Message timed out

KafkaConsumer::consume() throws \RuntimeException on consumer errors other than timeouts and end-of-partition signals:

[Kafka] Consumer error: Broker: Unknown topic or partition

An \InvalidArgumentException is thrown at construction time when brokers is missing from the connection config.

Source

The source code for this driver is hosted internally.

Previous
OpenSearch