Message streaming
The message streaming layer creates an Amazon MSK cluster for real-time telemetry streaming.
MSK cluster configuration
Cluster settings:
-
Kafka version: 3.5.1
-
Brokers: 3 (one per AZ)
-
Broker type: kafka.m5.large (2 vCPU, 8 GB RAM)
-
Storage per broker: 100 GB EBS (gp3)
-
Replication factor: 3
-
Min in-sync replicas: 2
Network:
-
VPC: Uses InfrastructureStack VPC
-
Subnets: Private subnets only
-
Security group: Allows traffic from IoT Rules and Flink
Security:
-
Encryption in transit: TLS
-
Encryption at rest: AWS-managed keys
-
Authentication: IAM (for Flink) and unauthenticated (for IoT Rules)
Kafka topics
Three topics handle different data streams.
cms-telemetry topic
Raw telemetry messages from vehicles (MQTT Direct mode).
Configuration:
-
Partitions: 6
-
Replication factor: 3
-
Retention: 7 days
-
Compression: gzip
-
Message format: JSON
Message schema:
{ "vin": "1HGBH41JXMN109186", "timestamp": "2024-10-13T12:00:00Z", "location": {"lat": 37.7749, "lon": -122.4194}, "speed": 45.5, "ignition": "on", "engineTemp": 195.5, "tirePressure": {"fl": 32, "fr": 32, "rl": 30, "rr": 31}, "batteryVoltage": 12.6, "fuelLevel": 75.5, "odometer": 45678.9 }
cms-trips topic
Trip events (start/end) generated by Flink.
Configuration:
-
Partitions: 3
-
Replication factor: 3
-
Retention: 30 days
Message schema:
{ "tripId": "550e8400-e29b-41d4-a716-446655440000", "vin": "1HGBH41JXMN109186", "event": "trip_start", "timestamp": "2024-10-13T12:00:00Z", "location": {"lat": 37.7749, "lon": -122.4194} }
cms-alerts topic
Maintenance and safety alerts generated by Flink.
Configuration:
-
Partitions: 3
-
Replication factor: 3
-
Retention: 90 days
Message schema:
{ "alertId": "660e8400-e29b-41d4-a716-446655440000", "vin": "1HGBH41JXMN109186", "type": "maintenance", "severity": "high", "message": "Low tire pressure detected", "timestamp": "2024-10-13T12:00:00Z", "metadata": {"tire": "rear_right", "pressure": 28} }
fw-telemetry-raw topic
Protobuf-encoded telemetry from FleetWise Edge agents.
Configuration:
-
Partitions: 6
-
Replication factor: 3
-
Retention: 7 days
-
Message format: Base64-encoded protobuf (from IoT Rule)
The IoT Rule fw_{stage}_iot_msk_rule routes messages from cms/fleetwise/vehicles/+/signals to this topic. The FWTelemetryProcessor Flink application consumes from this topic, decodes the protobuf payload, maps CAN signals to the standard format using the decoder manifest, and outputs to cms-telemetry-preprocessed.
fw-checkin topic
Checkin messages from FleetWise Edge agents requesting campaign configuration.
Configuration:
-
Partitions: 3
-
Replication factor: 3
-
Retention: 7 days
-
Message format: Base64-encoded protobuf (from IoT Rule)
The IoT Rule fw_{stage}_checkin_rule routes messages from cms/fleetwise/vehicles/+/checkins to this topic. The CampaignSyncProcessor Flink application consumes checkins, resolves active campaigns from DynamoDB, and pushes decoder manifests and collection schemes back to the agent through IoT Core MQTT.
cms-telemetry-preprocessed topic
Decoded and mapped FleetWise telemetry in standard standard JSON format.
Configuration:
-
Partitions: 6
-
Replication factor: 3
-
Retention: 7 days
-
Message format: JSON (same schema as cms-telemetry)
This topic receives output from the FWTelemetryProcessor after protobuf decoding and signal mapping. Downstream processors (TripProcessor, SafetyProcessor, MaintenanceProcessor) consume from this topic in addition to cms-telemetry, enabling the same processing pipeline for both MQTT Direct and FleetWise Edge telemetry.
cms-telemetry-oem topic
Raw telemetry from third-party OEM APIs.
Configuration:
-
Partitions: 6
-
Replication factor: 3
-
Retention: 7 days
-
Message format: JSON (OEM-specific format with
oem_sourcefield)
The OEMTelemetryProcessor consumes from this topic, applies the appropriate transform manifest from S3 based on the oem_source field, and outputs standard CMS-format JSON to cms-telemetry-raw. This enables integration with any OEM without modifying the core processing pipeline.