Enterprise Drivers
Kafka Driver
Introduction
Note: This driver is only available with Kipchak Enterprise.
The Kafka Driver provides a fully typed interface for producing and consuming messages via Apache Kafka. It wraps the native php-rdkafka extension — the industry-standard PHP binding for librdkafka — offering production-grade throughput and reliability.
Key features include:
- Multiple Connections: Configure separate clients for different Kafka clusters or environments.
- Typed API: Dedicated message objects and enums — your IDE guides every call.
- Producer: Publish single messages or batches, with configurable compression, acknowledgements, and delivery timeouts.
- Consumer: Subscribe to topics with explicit consumer group management, configurable offset reset, and manual commit support.
- Security: SASL PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, and SSL/TLS — suitable for managed Kafka services (Confluent Cloud, MSK, Aiven, Redpanda, etc.).
- FrankenPHP Worker-Safe: The underlying producer is lazily created and reused across requests within the same worker process.
Requirements
- PHP 8.4+
- The
ext-rdkafkaPHP extension (wrapslibrdkafka) librdkafka>= 2.0 installed on the host system
Installation
To install this driver, you need access to the Enterprise Composer repository at https://php.pkgs.1x.ax/.
If you have an enterprise licence, please contact your account representative for access.
composer require kipchak/driver-kafka
Initialise the Driver
Add the following line to your drivers/drivers.php file:
\Kipchak\Driver\Kafka\Kafka::initialise($container);
Configuration
Create a config file named kipchak.kafka.php in your project's config/ directory.
Connection Settings
Define connections in the connections array. Each key (e.g. default, events) is a connection name.
| Key | Required | Description |
|---|---|---|
brokers | Yes | Comma-separated list of host:port broker addresses |
security | No | Security settings (see below) |
producer | No | Producer-specific tuning (see below) |
consumer | No | Consumer defaults (see below) |
Security Settings
Configure security when connecting to a secured cluster:
| Key | Description |
|---|---|
security.protocol | PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL |
security.sasl.mechanism | PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 |
security.sasl.username | SASL username |
security.sasl.password | SASL password |
security.ssl.ca_location | Path to the CA certificate file |
security.ssl.certificate_location | Path to the client certificate file (mutual TLS) |
security.ssl.key_location | Path to the client private key file (mutual TLS) |
security.ssl.key_password | Passphrase for the private key |
security.ssl.verify_hostname | true to enable hostname verification (default), false to disable |
Producer Settings
| Key | Default | Description |
|---|---|---|
producer.acks | all | Acknowledgement level: all (strongest), 0 (fire and forget), 1 (leader only) |
producer.compression | none | Compression codec: none, gzip, snappy, lz4, or zstd |
producer.linger_ms | — | Milliseconds to wait for batching additional messages |
producer.batch_size | — | Maximum batch size in bytes |
producer.delivery_timeout_ms | 30000 | Maximum time to wait for delivery confirmation before raising an error |
Consumer Settings
Consumer settings define defaults applied when calling consumer(). The group_id is always supplied at call time.
| Key | Default | Description |
|---|---|---|
consumer.auto_offset_reset | earliest | Where to start consuming when no committed offset exists: earliest, latest, or error |
consumer.enable_auto_commit | false | Whether to commit offsets automatically. Recommended: false for manual commit control |
consumer.session_timeout_ms | 30000 | Consumer group session timeout in milliseconds |
consumer.max_poll_interval_ms | 300000 | Maximum time between polls before the consumer is considered failed |
Example Configuration
<?php
use function Kipchak\Core\env;
return [
'connections' => [
'default' => [
'brokers' => env('KAFKA_BROKERS', 'kafka:9092'),
'security' => [
'protocol' => env('KAFKA_SECURITY_PROTOCOL', 'SASL_SSL'),
'sasl' => [
'mechanism' => env('KAFKA_SASL_MECHANISM', 'SCRAM-SHA-256'),
'username' => env('KAFKA_USERNAME', ''),
'password' => env('KAFKA_PASSWORD', ''),
],
'ssl' => [
'ca_location' => env('KAFKA_CA_LOCATION', '/etc/ssl/certs/ca-certificates.crt'),
'verify_hostname' => true,
],
],
'producer' => [
'acks' => 'all',
'compression' => 'lz4',
'linger_ms' => 5,
'delivery_timeout_ms' => 30000,
],
'consumer' => [
'auto_offset_reset' => 'earliest',
'enable_auto_commit' => false,
'session_timeout_ms' => 30000,
'max_poll_interval_ms' => 300000,
],
],
],
];
Environment Variables
KAFKA_BROKERS=broker1:9092,broker2:9092
KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISM=SCRAM-SHA-256
KAFKA_USERNAME=your-kafka-username
KAFKA_PASSWORD=your-kafka-password
KAFKA_CA_LOCATION=/etc/ssl/certs/ca-certificates.crt
Usage
Retrieve a connection by name. If no name is given, default is used.
use Kipchak\Driver\Kafka\Kafka;
$kafka = Kafka::get(); // 'default' connection
$kafka = Kafka::get('events'); // named connection
Publishing a Single Message
// Publish with automatic partition assignment
$kafka->publish(
topic: 'orders',
payload: json_encode(['id' => 42, 'status' => 'placed']),
key: '42',
);
// Publish with custom headers
$kafka->publish(
topic: 'orders',
payload: json_encode(['id' => 43, 'status' => 'shipped']),
key: '43',
headers: ['trace-id' => 'abc-123', 'source' => 'api'],
);
Publishing a Batch
Batching minimises round-trips to the broker. All messages are enqueued before a single flush.
use Kipchak\Driver\Kafka\Message\ProducerMessage;
$kafka->publishBatch([
new ProducerMessage(topic: 'orders', payload: json_encode(['id' => 1]), key: '1'),
new ProducerMessage(topic: 'orders', payload: json_encode(['id' => 2]), key: '2'),
new ProducerMessage(topic: 'invoices', payload: json_encode(['order_id' => 1])),
]);
Consuming Messages
Consuming with the driver is designed for use in dedicated background worker processes, not inside HTTP request handlers. Create a consumer, poll in a loop, and commit offsets manually.
$consumer = $kafka->consumer(
groupId: 'order-processor',
topics: ['orders'],
);
try {
while (true) {
$message = $consumer->consume(timeoutMs: 1000);
if ($message === null) {
// Timed out or end of partition — poll again
continue;
}
// Process the message
$data = json_decode($message->payload, true);
processOrder($data);
// Commit the offset after successful processing
$consumer->commit();
}
} finally {
// Always close the consumer to release group membership
$consumer->close();
}
Type Reference
Enums
| Enum | Values |
|---|---|
SecurityProtocol | Plaintext Ssl SaslPlaintext SaslSsl |
SaslMechanism | Plain ScramSha256 ScramSha512 |
CompressionType | None Gzip Snappy Lz4 Zstd |
AutoOffsetReset | Earliest Latest Error |
ProducerMessage Properties
| Property | Type | Default | Description |
|---|---|---|---|
topic | string | required | Target topic name |
payload | string | required | Raw message payload |
key | string|null | null | Routing key for partition assignment |
headers | array | [] | Key/value message headers |
partition | int | -1 | Target partition (-1 = automatic) |
KafkaMessage Properties
| Property | Type | Description |
|---|---|---|
topic | string | Source topic name |
payload | string | Raw message payload |
key | string|null | Message key |
partition | int | Partition the message was read from |
offset | int | Offset within the partition |
timestampMs | int | Message timestamp in milliseconds (0 if not set by the producer) |
headers | array | Key/value message headers |
Error Handling
publish() and publishBatch() throw \RuntimeException when the producer fails to deliver messages within delivery_timeout_ms:
[Kafka] Producer flush failed: Local: Message timed out
KafkaConsumer::consume() throws \RuntimeException on consumer errors other than timeouts and end-of-partition signals:
[Kafka] Consumer error: Broker: Unknown topic or partition
An \InvalidArgumentException is thrown at construction time when brokers is missing from the connection config.
Source
The source code for this driver is hosted internally.