

# Message streaming
<a name="msk-stack"></a>

The message streaming layer creates an Amazon MSK cluster for real-time telemetry streaming.

## MSK cluster configuration
<a name="msk-cluster-configuration"></a>

 **Cluster settings:** 
+ Kafka version: 3.5.1
+ Brokers: 3 (one per AZ)
+ Broker type: kafka.m5.large (2 vCPU, 8 GB RAM)
+ Storage per broker: 100 GB EBS (gp3)
+ Replication factor: 3
+ Min in-sync replicas: 2

 **Network:** 
+ VPC: Uses InfrastructureStack VPC
+ Subnets: Private subnets only
+ Security group: Allows traffic from IoT Rules and Flink

 **Security:** 
+ Encryption in transit: TLS
+ Encryption at rest: AWS-managed keys
+ Authentication: IAM (for Flink) and unauthenticated (for IoT Rules)

## Kafka topics
<a name="kafka-topics"></a>

Three topics handle different data streams.

### cms-telemetry topic
<a name="telemetry-topic"></a>

Raw telemetry messages from vehicles (MQTT Direct mode).

 **Configuration:** 
+ Partitions: 6
+ Replication factor: 3
+ Retention: 7 days
+ Compression: gzip
+ Message format: JSON

 **Message schema:** 

```
{
  "vin": "1HGBH41JXMN109186",
  "timestamp": "2024-10-13T12:00:00Z",
  "location": {"lat": 37.7749, "lon": -122.4194},
  "speed": 45.5,
  "ignition": "on",
  "engineTemp": 195.5,
  "tirePressure": {"fl": 32, "fr": 32, "rl": 30, "rr": 31},
  "batteryVoltage": 12.6,
  "fuelLevel": 75.5,
  "odometer": 45678.9
}
```

### cms-trips topic
<a name="trips-topic"></a>

Trip events (start/end) generated by Flink.

 **Configuration:** 
+ Partitions: 3
+ Replication factor: 3
+ Retention: 30 days

 **Message schema:** 

```
{
  "tripId": "550e8400-e29b-41d4-a716-446655440000",
  "vin": "1HGBH41JXMN109186",
  "event": "trip_start",
  "timestamp": "2024-10-13T12:00:00Z",
  "location": {"lat": 37.7749, "lon": -122.4194}
}
```

### cms-alerts topic
<a name="alerts-topic"></a>

Maintenance and safety alerts generated by Flink.

 **Configuration:** 
+ Partitions: 3
+ Replication factor: 3
+ Retention: 90 days

 **Message schema:** 

```
{
  "alertId": "660e8400-e29b-41d4-a716-446655440000",
  "vin": "1HGBH41JXMN109186",
  "type": "maintenance",
  "severity": "high",
  "message": "Low tire pressure detected",
  "timestamp": "2024-10-13T12:00:00Z",
  "metadata": {"tire": "rear_right", "pressure": 28}
}
```

### fw-telemetry-raw topic
<a name="fw-telemetry-raw-topic"></a>

Protobuf-encoded telemetry from FleetWise Edge agents.

 **Configuration:** 
+ Partitions: 6
+ Replication factor: 3
+ Retention: 7 days
+ Message format: Base64-encoded protobuf (from IoT Rule)

The IoT Rule `fw_{stage}_iot_msk_rule` routes messages from `cms/fleetwise/vehicles/+/signals` to this topic. The FWTelemetryProcessor Flink application consumes from this topic, decodes the protobuf payload, maps CAN signals to the standard format using the decoder manifest, and outputs to `cms-telemetry-preprocessed`.

### fw-checkin topic
<a name="fw-checkin-topic"></a>

Checkin messages from FleetWise Edge agents requesting campaign configuration.

 **Configuration:** 
+ Partitions: 3
+ Replication factor: 3
+ Retention: 7 days
+ Message format: Base64-encoded protobuf (from IoT Rule)

The IoT Rule `fw_{stage}_checkin_rule` routes messages from `cms/fleetwise/vehicles/+/checkins` to this topic. The CampaignSyncProcessor Flink application consumes checkins, resolves active campaigns from DynamoDB, and pushes decoder manifests and collection schemes back to the agent through IoT Core MQTT.

### cms-telemetry-preprocessed topic
<a name="cms-telemetry-preprocessed-topic"></a>

Decoded and mapped FleetWise telemetry in standard standard JSON format.

 **Configuration:** 
+ Partitions: 6
+ Replication factor: 3
+ Retention: 7 days
+ Message format: JSON (same schema as cms-telemetry)

This topic receives output from the FWTelemetryProcessor after protobuf decoding and signal mapping. Downstream processors (TripProcessor, SafetyProcessor, MaintenanceProcessor) consume from this topic in addition to `cms-telemetry`, enabling the same processing pipeline for both MQTT Direct and FleetWise Edge telemetry.

### cms-telemetry-oem topic
<a name="cms-telemetry-oem-topic"></a>

Raw telemetry from third-party OEM APIs.

 **Configuration:** 
+ Partitions: 6
+ Replication factor: 3
+ Retention: 7 days
+ Message format: JSON (OEM-specific format with `oem_source` field)

The OEMTelemetryProcessor consumes from this topic, applies the appropriate transform manifest from S3 based on the `oem_source` field, and outputs standard CMS-format JSON to `cms-telemetry-raw`. This enables integration with any OEM without modifying the core processing pipeline.