APIs & Integrations
Message Brokers
Structr can connect to message brokers to send and receive messages asynchronously. This enables event-driven architectures, real-time data pipelines, and integration with external systems through industry-standard messaging protocols.
When to Use Message Brokers
Message brokers are useful when you need to:
- Decouple systems - Send data to other services without waiting for a response
- Process events asynchronously - Handle incoming events in the background
- Integrate with IoT devices - Receive sensor data or send commands via MQTT
- Build data pipelines - Stream data to analytics systems via Kafka or Pulsar
- Enable real-time communication - React to events from external systems immediately
If you only need to push updates to browsers, Server-Sent Events may be simpler. Message brokers are for system-to-system communication.
Supported Brokers
Structr supports three message broker protocols:
| Broker | Protocol | Typical Use Case |
|---|---|---|
| MQTT | Lightweight publish/subscribe | IoT, sensors, mobile apps |
| Kafka | Distributed streaming | High-throughput data pipelines, event sourcing |
| Pulsar | Cloud-native messaging | Multi-tenant messaging, geo-replication |
All three use the same programming model in Structr: create a client, configure subscribers, and process incoming messages with callbacks.
Core Concepts
Message Clients
A message client represents a connection to a broker. In Structr, clients are database objects - you create them like any other data object, either through the Admin UI or via $.create() in scripts. Each broker type has its own client type (MQTTClient, KafkaClient, PulsarClient) with broker-specific configuration properties, but they all share the same interface for sending messages and managing subscriptions.
When you enable a client, Structr establishes and maintains the connection in the background. The connection persists independently of HTTP requests or user sessions.
Message Subscribers
A MessageSubscriber is a database object that defines what happens when a message arrives. You create subscribers and link them to one or more clients. Each subscriber has:
- topic - Which topic to listen to (use
*for all topics) - callback - Code that runs when a message arrives (stored as a string property)
- clients - Which client(s) this subscriber is connected to (a relationship to MessageClient objects)
When a message arrives on a matching topic, Structr executes the callback code with two special variables available:
$.topic- The topic the message was published to$.message- The message content (typically a string or JSON)
The Basic Pattern
Message broker integration in Structr works through database objects. Clients and subscribers are regular Structr objects that you create, configure, and link - just like any other data in your application. This means you can create them through the Admin UI or programmatically via scripts.
Setting up via Admin UI:
- Open the Data area in the Admin UI
- Select the client type (
MQTTClient,KafkaClient, orPulsarClient) - Create a new object and fill in the connection properties
- Create a
MessageSubscriberobject with a topic and callback - Link the subscriber to the client by setting the
clientsproperty - Enable the client by checking
isEnabled(MQTT) orenabled(Kafka/Pulsar)
Setting up via Script:
The same steps work programmatically using $.create(). This is useful when you need to create clients dynamically or as part of an application setup routine.
Once the client is enabled, Structr maintains the connection in the background. Incoming messages automatically trigger the callbacks of linked subscribers. The connection persists across requests - you configure it once, and it keeps running until you disable or delete the client.
MQTT
MQTT (Message Queuing Telemetry Transport) is a lightweight protocol designed for constrained devices and low-bandwidth networks. It’s the standard for IoT applications.
MQTTClient Properties
| Property | Type | Description |
|---|---|---|
mainBrokerURL |
String | Broker URL (required), e.g., ws://localhost:15675/ws |
fallbackBrokerURLs |
String[] | Alternative broker URLs for failover |
username |
String | Authentication username |
password |
String | Authentication password |
qos |
Integer | Quality of Service level (0, 1, or 2), default: 0 |
isEnabled |
Boolean | Set to true to connect |
isConnected |
Boolean | Connection status (read-only) |
Setting Up an MQTT Client
You can create the client and subscriber objects in the Data area of the Admin UI, or programmatically as shown below:
// Create the MQTT client
let client = $.create('MQTTClient', {
name: 'IoT Gateway',
mainBrokerURL: 'ws://localhost:15675/ws',
username: 'guest',
password: 'guest',
qos: 1
});
// Create a subscriber for temperature readings
let subscriber = $.create('MessageSubscriber', {
topic: 'sensors/temperature',
callback: `{
let data = JSON.parse($.message);
$.log('Temperature reading: ' + data.value + '°C from ' + data.sensorId);
// Store the reading
$.create('TemperatureReading', {
sensorId: data.sensorId,
value: data.value,
timestamp: $.now
});
}`
});
// Link subscriber to client
subscriber.clients = [client];
// Enable the connection
client.isEnabled = true;
When creating via the Admin UI, you fill in the same properties in the object editor. The callback property accepts StructrScript or JavaScript code as a string. After linking the subscriber to the client and enabling isEnabled, the connection activates immediately.
After enabling, the isConnected property indicates whether the connection succeeded. In the Admin UI, the client will show a green indicator when connected, red when disconnected.
Subscribing to Multiple Topics
You can create multiple subscribers for different topics:
// Subscribe to all sensor data
$.create('MessageSubscriber', {
topic: 'sensors/*',
callback: `{ $.call('processSensorData', { topic: $.topic, message: $.message }); }`,
clients: [client]
});
// Subscribe to system alerts
$.create('MessageSubscriber', {
topic: 'alerts/#',
callback: `{ $.call('handleAlert', { topic: $.topic, message: $.message }); }`,
clients: [client]
});
Use * to match a single level, # to match multiple levels in MQTT topic hierarchies.
Publishing Messages
Send messages using the client’s sendMessage method or the mqttPublish function:
// Using the method on the client
client.sendMessage('devices/lamp/command', JSON.stringify({ action: 'on', brightness: 80 }));
// Using the global function
$.mqttPublish(client, 'devices/lamp/command', JSON.stringify({ action: 'off' }));
MQTT-Specific Functions
| Function | Description |
|---|---|
mqttPublish(client, topic, message) |
Publish a message to a topic |
mqttSubscribe(client, topic) |
Subscribe to a topic programmatically |
mqttUnsubscribe(client, topic) |
Unsubscribe from a topic |
Quality of Service Levels
MQTT supports three QoS levels:
| Level | Name | Guarantee |
|---|---|---|
| 0 | At most once | Message may be lost |
| 1 | At least once | Message delivered, may be duplicated |
| 2 | Exactly once | Message delivered exactly once |
Higher QoS levels add overhead. Use QoS 0 for frequent sensor readings where occasional loss is acceptable, QoS 1 or 2 for important commands or events.
Kafka
Apache Kafka is a distributed streaming platform designed for high-throughput, fault-tolerant messaging. It’s commonly used for data pipelines and event sourcing.
KafkaClient Properties
| Property | Type | Description |
|---|---|---|
servers |
String[] | Bootstrap server addresses, e.g., ['localhost:9092'] |
groupId |
String | Consumer group ID for coordinated consumption |
enabled |
Boolean | Set to true to connect |
Setting Up a Kafka Client
Create the client and subscriber objects in the Data area, or programmatically:
// Create the Kafka client
let client = $.create('KafkaClient', {
name: 'Event Processor',
servers: ['kafka1.example.com:9092', 'kafka2.example.com:9092'],
groupId: 'structr-consumers'
});
// Create a subscriber for order events
let subscriber = $.create('MessageSubscriber', {
topic: 'orders',
callback: `{
let order = JSON.parse($.message);
$.log('New order received: ' + order.orderId);
$.create('Order', {
externalId: order.orderId,
customerEmail: order.customer.email,
totalAmount: order.total,
status: 'received'
});
}`,
clients: [client]
});
// Enable the connection
client.enabled = true;
The servers property accepts an array of bootstrap servers. Kafka clients connect to any available server and discover the full cluster topology automatically.
Publishing to Kafka
let client = $.first($.find('KafkaClient', 'name', 'Event Processor'));
client.sendMessage('order-updates', JSON.stringify({
orderId: order.externalId,
status: 'shipped',
trackingNumber: 'ABC123',
timestamp: new Date().toISOString()
}));
Consumer Groups
The groupId property determines how multiple consumers coordinate. Consumers in the same group share the workload - each message is processed by only one consumer in the group. Different groups receive all messages independently.
Use the same groupId across multiple Structr instances to distribute processing. Use different group IDs if each instance needs to see all messages.
Pulsar
Apache Pulsar is a cloud-native messaging platform that combines messaging and streaming. It supports multi-tenancy and geo-replication out of the box.
PulsarClient Properties
| Property | Type | Description |
|---|---|---|
servers |
String[] | Service URLs, e.g., ['pulsar://localhost:6650'] |
enabled |
Boolean | Set to true to connect |
Setting Up a Pulsar Client
Create the client and subscriber objects in the Data area, or programmatically:
// Create the Pulsar client
let client = $.create('PulsarClient', {
name: 'Analytics Pipeline',
servers: ['pulsar://pulsar.example.com:6650']
});
// Create a subscriber for analytics events
let subscriber = $.create('MessageSubscriber', {
topic: 'analytics/pageviews',
callback: `{
let event = JSON.parse($.message);
$.create('PageView', {
path: event.path,
userId: event.userId,
sessionId: event.sessionId,
timestamp: $.parseDate(event.timestamp, "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
});
}`,
clients: [client]
});
// Enable the connection
client.enabled = true;
Pulsar clients have minimal configuration. The servers property accepts Pulsar service URLs, typically starting with pulsar:// for unencrypted or pulsar+ssl:// for TLS connections.
Publishing to Pulsar
let client = $.first($.find('PulsarClient', 'name', 'Analytics Pipeline'));
client.sendMessage('analytics/events', JSON.stringify({
type: 'conversion',
userId: user.id,
product: product.name,
value: product.price,
timestamp: new Date().toISOString()
}));
Working with Callbacks
Callback Context
Inside a callback, you have access to:
| Variable | Description |
|---|---|
$.topic |
The topic the message arrived on |
$.message |
The message content as a string |
$.this |
The MessageSubscriber object |
Forwarding to Schema Methods
For complex processing, forward messages to a global schema method:
// Simple callback that delegates to a method
$.create('MessageSubscriber', {
topic: '*',
callback: `{ $.call('handleIncomingMessage', { topic: $.topic, message: $.message }); }`
});
Then implement the logic in your schema method where you have full access to error handling, transactions, and other methods:
// Global schema method: handleIncomingMessage
{
let topic = $.arguments.topic;
let message = $.arguments.message;
try {
let data = JSON.parse(message);
if (topic.startsWith('sensors/')) {
processSensorData(topic, data);
} else if (topic.startsWith('orders/')) {
processOrderEvent(topic, data);
} else {
$.log('Unknown topic: ' + topic);
}
} catch (e) {
$.log('Error processing message: ' + e.message);
// Store failed message for retry
$.create('FailedMessage', {
topic: topic,
message: message,
error: e.message,
timestamp: $.now
});
}
}
Error Handling
Callbacks should handle errors gracefully. Unhandled exceptions are logged but don’t stop message processing. For critical messages, implement your own retry logic:
$.create('MessageSubscriber', {
topic: 'critical-events',
callback: `{
try {
let event = JSON.parse($.message);
processEvent(event);
} catch (e) {
// Log and store for manual review
$.log('Failed to process critical event: ' + e.message);
$.create('FailedEvent', {
topic: $.topic,
payload: $.message,
error: e.message
});
}
}`
});
Managing Connections
Checking Connection Status
For MQTT clients, check the isConnected property:
let client = $.first($.find('MQTTClient', 'name', 'IoT Gateway'));
if (!client.isConnected) {
$.log('MQTT client is disconnected, attempting reconnect...');
client.isEnabled = false;
client.isEnabled = true;
}
Disabling and Re-enabling
To temporarily stop processing:
// Disable
client.isEnabled = false; // or client.enabled = false for Kafka/Pulsar
// Re-enable
client.isEnabled = true;
Disabling disconnects from the broker. Re-enabling reconnects and resubscribes to all configured topics.
Cleaning Up
Deleting a client automatically closes the connection and cleans up resources. Subscribers linked only to that client become inactive but are not automatically deleted.
Best Practices
Use JSON for Messages
Structure your messages as JSON for easy parsing and forward compatibility:
client.sendMessage('events', JSON.stringify({
type: 'user.created',
version: 1,
timestamp: new Date().toISOString(),
data: {
userId: user.id,
email: user.eMail
}
}));
Keep Callbacks Simple
Callbacks should be short. Delegate complex logic to schema methods:
// Good: Simple callback that delegates
callback: `{ $.call('processOrder', { data: $.message }); }`
// Avoid: Complex logic directly in callback
callback: `{ /* 50 lines of processing code */ }`
Handle Connection Failures
Brokers can become unavailable. Design your application to handle disconnections gracefully and log connection issues for monitoring.
Use Meaningful Topic Names
Organize topics hierarchically for easier subscription management:
sensors/temperature/building-a/floor-1
sensors/humidity/building-a/floor-1
orders/created
orders/shipped
orders/delivered
Secure Your Connections
Use authentication (username/password for MQTT) and encrypted connections (TLS) in production. Never store credentials in callbacks - use the client properties.
Troubleshooting
Client Won’t Connect
- Verify the broker URL is correct and reachable from the Structr server
- Check authentication credentials
- Review the Structr server log for connection errors
- For MQTT, ensure the WebSocket endpoint is enabled on the broker
Messages Not Received
- Verify the subscriber’s topic matches the published topic
- Check that the subscriber is linked to the correct client
- Ensure the client is enabled and connected
- Test with topic
*to receive all messages and verify the connection works
Callback Errors
- Check the server log for exception details
- Verify JSON parsing if the message format is unexpected
- Test the callback logic in a schema method first
Related Topics
- Server-Sent Events - Pushing updates to browsers
- Scheduled Tasks - Processing queued messages periodically
- Business Logic - Implementing message handlers as schema methods