View a markdown version of this page

Message streaming - Guidance for Connected Mobility on AWS

Message streaming

The message streaming layer creates an Amazon MSK cluster for real-time telemetry streaming.

MSK cluster configuration

Cluster settings:

  • Kafka version: 3.5.1

  • Brokers: 3 (one per AZ)

  • Broker type: kafka.m5.large (2 vCPU, 8 GB RAM)

  • Storage per broker: 100 GB EBS (gp3)

  • Replication factor: 3

  • Min in-sync replicas: 2

Network:

  • VPC: Uses InfrastructureStack VPC

  • Subnets: Private subnets only

  • Security group: Allows traffic from IoT Rules and Flink

Security:

  • Encryption in transit: TLS

  • Encryption at rest: AWS-managed keys

  • Authentication: IAM (for Flink) and unauthenticated (for IoT Rules)

Kafka topics

Three topics handle different data streams.

cms-telemetry topic

Raw telemetry messages from vehicles (MQTT Direct mode).

Configuration:

  • Partitions: 6

  • Replication factor: 3

  • Retention: 7 days

  • Compression: gzip

  • Message format: JSON

Message schema:

{ "vin": "1HGBH41JXMN109186", "timestamp": "2024-10-13T12:00:00Z", "location": {"lat": 37.7749, "lon": -122.4194}, "speed": 45.5, "ignition": "on", "engineTemp": 195.5, "tirePressure": {"fl": 32, "fr": 32, "rl": 30, "rr": 31}, "batteryVoltage": 12.6, "fuelLevel": 75.5, "odometer": 45678.9 }

cms-trips topic

Trip events (start/end) generated by Flink.

Configuration:

  • Partitions: 3

  • Replication factor: 3

  • Retention: 30 days

Message schema:

{ "tripId": "550e8400-e29b-41d4-a716-446655440000", "vin": "1HGBH41JXMN109186", "event": "trip_start", "timestamp": "2024-10-13T12:00:00Z", "location": {"lat": 37.7749, "lon": -122.4194} }

cms-alerts topic

Maintenance and safety alerts generated by Flink.

Configuration:

  • Partitions: 3

  • Replication factor: 3

  • Retention: 90 days

Message schema:

{ "alertId": "660e8400-e29b-41d4-a716-446655440000", "vin": "1HGBH41JXMN109186", "type": "maintenance", "severity": "high", "message": "Low tire pressure detected", "timestamp": "2024-10-13T12:00:00Z", "metadata": {"tire": "rear_right", "pressure": 28} }

fw-telemetry-raw topic

Protobuf-encoded telemetry from FleetWise Edge agents.

Configuration:

  • Partitions: 6

  • Replication factor: 3

  • Retention: 7 days

  • Message format: Base64-encoded protobuf (from IoT Rule)

The IoT Rule fw_{stage}_iot_msk_rule routes messages from cms/fleetwise/vehicles/+/signals to this topic. The FWTelemetryProcessor Flink application consumes from this topic, decodes the protobuf payload, maps CAN signals to the standard format using the decoder manifest, and outputs to cms-telemetry-preprocessed.

fw-checkin topic

Checkin messages from FleetWise Edge agents requesting campaign configuration.

Configuration:

  • Partitions: 3

  • Replication factor: 3

  • Retention: 7 days

  • Message format: Base64-encoded protobuf (from IoT Rule)

The IoT Rule fw_{stage}_checkin_rule routes messages from cms/fleetwise/vehicles/+/checkins to this topic. The CampaignSyncProcessor Flink application consumes checkins, resolves active campaigns from DynamoDB, and pushes decoder manifests and collection schemes back to the agent through IoT Core MQTT.

cms-telemetry-preprocessed topic

Decoded and mapped FleetWise telemetry in standard standard JSON format.

Configuration:

  • Partitions: 6

  • Replication factor: 3

  • Retention: 7 days

  • Message format: JSON (same schema as cms-telemetry)

This topic receives output from the FWTelemetryProcessor after protobuf decoding and signal mapping. Downstream processors (TripProcessor, SafetyProcessor, MaintenanceProcessor) consume from this topic in addition to cms-telemetry, enabling the same processing pipeline for both MQTT Direct and FleetWise Edge telemetry.

cms-telemetry-oem topic

Raw telemetry from third-party OEM APIs.

Configuration:

  • Partitions: 6

  • Replication factor: 3

  • Retention: 7 days

  • Message format: JSON (OEM-specific format with oem_source field)

The OEMTelemetryProcessor consumes from this topic, applies the appropriate transform manifest from S3 based on the oem_source field, and outputs standard CMS-format JSON to cms-telemetry-raw. This enables integration with any OEM without modifying the core processing pipeline.