Cooperative Awareness (CA) Basic Service
The CA Basic Service is the heart of vehicle awareness in V2X — it broadcasts Cooperative Awareness Messages (CAMs) that tell nearby vehicles “I’m here, this is my position, speed, and heading.”
Note
The CA Basic Service implements ETSI TS 103 900 V2.1.1 (2023-11). CAMs are the most fundamental V2X message type, enabling vehicles to be aware of each other’s presence.
What CAMs Contain
Every CAM broadcasts essential information about your vehicle:
Field |
Description |
|---|---|
Position |
GPS coordinates (latitude, longitude, altitude) |
Speed |
Current velocity in m/s |
Heading |
Direction of travel (degrees from north) |
Station ID |
Unique identifier for this vehicle |
Station Type |
Vehicle category (car, truck, motorcycle, etc.) |
Vehicle Size |
Length and width dimensions |
Architecture
The CA Basic Service sits in the Facilities Layer and connects to BTP for transport:
flowchart TB
subgraph APP["Application Layer"]
EX["Application"]
end
subgraph "Facilities Layer"
LDM[(Local Dynamic Map)]
subgraph CA["CA Basic Service"]
direction LR
RM[Reception<br/>Management] ~~~ TM[Transmission<br/>Management]
end
end
subgraph "Location"
LOC[Location Service]
end
subgraph "Transport"
BTP[BTP Router<br/>Port 2001]
end
APP --- LDM
LDM ---|"Store Received"| CA
LOC -->|"Position Updates"| CA
CA <-->|"Facilities-BTP SAP"| BTP
style CA fill:#fff3e0,stroke:#f57c00
style LDM fill:#e8f5e9
How it works:
Sending: Location Service updates trigger CAM generation → sent via BTP port 2001
Receiving: Incoming CAMs on port 2001 → processed and stored in LDM
Getting Started
Step 1: Configure Vehicle Data
First, describe your vehicle using VehicleData:
from flexstack.facilities.ca_basic_service.cam_transmission_management import VehicleData
STATION_ID = 12345 # Unique ID for your station
vehicle_data = VehicleData(
station_id=STATION_ID,
station_type=5, # 5 = Passenger car
drive_direction="forward", # "forward", "backward", or "unavailable"
vehicle_length={
"vehicleLengthValue": 42, # Length in 10cm units (42 = 4.2m)
"vehicleLengthConfidenceIndication": "unavailable",
},
vehicle_width=18, # Width in 10cm units (18 = 1.8m)
)
VehicleData Parameters:
Parameter |
Type |
Description |
|---|---|---|
|
|
Unique station identifier (1 to 2^31-1) |
|
|
Vehicle type code (see table below) |
|
|
|
|
|
Length value (0.1m units) and confidence |
|
|
Width in 0.1m units (1-62, where 62 = unavailable) |
Station Types:
Value |
Type |
Description |
|---|---|---|
0 |
Unknown |
Unknown station type |
1 |
Pedestrian |
Person on foot |
2 |
Cyclist |
Bicycle rider |
3 |
Moped |
Moped/scooter |
4 |
Motorcycle |
Motorcycle |
5 |
Passenger Car |
Standard automobile |
6 |
Bus |
Bus/coach |
7 |
Light Truck |
Light commercial vehicle |
8 |
Heavy Truck |
Heavy goods vehicle |
15 |
RSU |
Road Side Unit |
Step 2: Set Up Prerequisites
The CA Basic Service requires:
A BTP Router (for sending/receiving)
A Local Dynamic Map (for storing received CAMs)
A Location Service (for position updates)
from flexstack.btp.router import Router as BTPRouter
from flexstack.geonet.router import Router as GNRouter
from flexstack.geonet.mib import MIB
from flexstack.geonet.gn_address import GNAddress, M, ST, MID
from flexstack.facilities.local_dynamic_map.factory import LDMFactory
from flexstack.facilities.local_dynamic_map.ldm_classes import Location, GeometricArea, Circle
from flexstack.utils.static_location_service import ThreadStaticLocationService
# Location Service
location_service = ThreadStaticLocationService(
period=1000,
latitude=41.386931,
longitude=2.112104,
)
# GeoNetworking + BTP
MAC_ADDRESS = b'\x00\x11\x22\x33\x44\x55'
mib = MIB(itsGnLocalGnAddr=GNAddress(m=M.GN_MULTICAST, st=ST.PASSENGER_CAR, mid=MID(MAC_ADDRESS)))
gn_router = GNRouter(mib=mib, sign_service=None)
btp_router = BTPRouter(gn_router)
gn_router.register_indication_callback(btp_router.btp_data_indication)
location_service.add_callback(gn_router.refresh_ego_position_vector)
# Local Dynamic Map
ldm_location = Location.initializer(latitude=413869310, longitude=21121040)
ldm_factory = LDMFactory()
ldm = ldm_factory.create_ldm(
ldm_location,
ldm_maintenance_type="Reactive",
ldm_service_type="Reactive",
ldm_database_type="Dictionary",
)
location_service.add_callback(ldm_location.location_service_callback)
Step 3: Create CA Basic Service
Now create the service and connect it to the location service:
from flexstack.facilities.ca_basic_service.ca_basic_service import (
CooperativeAwarenessBasicService,
)
# Create the CA Basic Service
ca_basic_service = CooperativeAwarenessBasicService(
btp_router=btp_router,
vehicle_data=vehicle_data,
ldm=ldm,
)
# Connect location updates to trigger CAM transmission
location_service.add_callback(
ca_basic_service.cam_transmission_management.location_service_callback
)
That’s it! The service will now:
✅ Automatically send CAMs whenever the location service provides an update
✅ Automatically receive CAMs and store them in the LDM
Receiving CAMs
To be notified when CAMs arrive, subscribe to the LDM:
from flexstack.facilities.local_dynamic_map.ldm_constants import CAM
from flexstack.facilities.local_dynamic_map.ldm_classes import (
AccessPermission,
RegisterDataConsumerReq,
RegisterDataConsumerResp,
SubscribeDataobjectsReq,
SubscribeDataObjectsResp,
RequestDataObjectsResp,
Filter,
FilterStatement,
ComparisonOperators,
TimestampIts,
OrderTupleValue,
OrderingDirection,
SubscribeDataobjectsResult,
)
ldm_area = GeometricArea(circle=Circle(radius=5000), rectangle=None, ellipse=None)
# Register as a CAM consumer
ldm.if_ldm_4.register_data_consumer(
RegisterDataConsumerReq(
application_id=CAM,
access_permisions=(AccessPermission.CAM,),
area_of_interest=ldm_area,
)
)
# Callback when CAMs are received
def on_cam_received(data: RequestDataObjectsResp) -> None:
cam = data.data_objects[0]["dataObject"]
station_id = cam["header"]["stationId"]
print(f"🚗 Received CAM from station {station_id}")
# Subscribe to CAM updates (filtering out our own)
ldm.if_ldm_4.subscribe_data_consumer(
SubscribeDataobjectsReq(
application_id=CAM,
data_object_type=(CAM,),
priority=1,
filter=Filter(
filter_statement_1=FilterStatement(
"header.stationId",
ComparisonOperators.NOT_EQUAL,
STATION_ID, # Don't notify about our own CAMs
)
),
notify_time=TimestampIts(0),
multiplicity=1,
order=(OrderTupleValue(
attribute="cam.generationDeltaTime",
ordering_direction=OrderingDirection.ASCENDING,
),),
),
on_cam_received,
)
CAM Message Structure
A CAM contains three main containers:
flowchart LR
subgraph CAM["Cooperative Awareness Message (CAM)"]
direction LR
header["ITS PDU Header"]
basic["Basic Container"]
subgraph HF["High Frequency Container"]
direction TB
veh["Vehicle HF container"]
other["Other Containers"]
veh ~~~|"Or"| other
end
subgraph LF["Low Frequency Container (Optional)"]
direction TB
vehlf["Vehicle LF container"]
end
subgraph SP["Special Vehicle Container (Optional)"]
direction TB
publictransport["Public Transport Container "]
special["Special Transport Container"]
publictransport ~~~|"Or"| special
end
header ~~~ basic
basic ~~~ HF
HF ~~~ LF
LF ~~~ SP
end
Container |
Contents |
|---|---|
ITS PDU Header |
Protocol version, message ID, station ID |
Basic Container |
Station type, reference position |
High Frequency Container |
Heading, speed, drive direction, vehicle length, width, acceleration |
Low Frequency Container (Optional) |
Vehicle role, exterior lights, path history |
Special Vehicle Container (Optional) |
Public transport or special transport details |
Complete Example
Here’s a complete script that sends and receives CAMs:
1#!/usr/bin/env python3
2"""
3CA Basic Service Example
4
5Broadcasts CAMs and listens for nearby vehicles.
6Run with: sudo python cam_example.py
7"""
8
9import logging
10import random
11import time
12
13from flexstack.linklayer.raw_link_layer import RawLinkLayer
14from flexstack.geonet.router import Router as GNRouter
15from flexstack.geonet.mib import MIB
16from flexstack.geonet.gn_address import GNAddress, M, ST, MID
17from flexstack.btp.router import Router as BTPRouter
18from flexstack.utils.static_location_service import ThreadStaticLocationService
19from flexstack.facilities.local_dynamic_map.factory import LDMFactory
20from flexstack.facilities.local_dynamic_map.ldm_constants import CAM
21from flexstack.facilities.local_dynamic_map.ldm_classes import (
22 AccessPermission, Circle, ComparisonOperators, Filter, FilterStatement,
23 GeometricArea, Location, OrderTupleValue, OrderingDirection,
24 RegisterDataConsumerReq, RegisterDataConsumerResp, RequestDataObjectsResp,
25 SubscribeDataobjectsReq, SubscribeDataObjectsResp, SubscribeDataobjectsResult,
26 TimestampIts,
27)
28from flexstack.facilities.ca_basic_service.ca_basic_service import (
29 CooperativeAwarenessBasicService,
30)
31from flexstack.facilities.ca_basic_service.cam_transmission_management import VehicleData
32
33logging.basicConfig(level=logging.INFO)
34
35# Configuration
36POSITION = [41.386931, 2.112104] # Barcelona
37MAC_ADDRESS = bytes([random.randint(0, 255) for _ in range(6)])
38STATION_ID = random.randint(1, 2147483647)
39
40
41def on_cam_received(data: RequestDataObjectsResp) -> None:
42 """Called when a CAM is received from another vehicle."""
43 station_id = data.data_objects[0]["dataObject"]["header"]["stationId"]
44 print(f"🚗 Received CAM from station: {station_id}")
45
46
47def main():
48 # Location Service
49 location_service = ThreadStaticLocationService(
50 period=1000, latitude=POSITION[0], longitude=POSITION[1]
51 )
52
53 # GeoNetworking
54 mib = MIB(itsGnLocalGnAddr=GNAddress(m=M.GN_MULTICAST, st=ST.CYCLIST, mid=MID(MAC_ADDRESS)))
55 gn_router = GNRouter(mib=mib, sign_service=None)
56 location_service.add_callback(gn_router.refresh_ego_position_vector)
57
58 # BTP
59 btp_router = BTPRouter(gn_router)
60 gn_router.register_indication_callback(btp_router.btp_data_indication)
61
62 # Local Dynamic Map
63 ldm_location = Location.initializer(
64 latitude=int(POSITION[0] * 10**7),
65 longitude=int(POSITION[1] * 10**7),
66 )
67 ldm_area = GeometricArea(circle=Circle(radius=5000), rectangle=None, ellipse=None)
68 ldm = LDMFactory().create_ldm(
69 ldm_location, ldm_maintenance_type="Reactive",
70 ldm_service_type="Reactive", ldm_database_type="Dictionary",
71 )
72 location_service.add_callback(ldm_location.location_service_callback)
73
74 # Register with LDM
75 ldm.if_ldm_4.register_data_consumer(RegisterDataConsumerReq(
76 application_id=CAM, access_permisions=(AccessPermission.CAM,), area_of_interest=ldm_area,
77 ))
78
79 # Subscribe to CAMs
80 ldm.if_ldm_4.subscribe_data_consumer(
81 SubscribeDataobjectsReq(
82 application_id=CAM, data_object_type=(CAM,), priority=1,
83 filter=Filter(filter_statement_1=FilterStatement(
84 "header.stationId", ComparisonOperators.NOT_EQUAL, STATION_ID,
85 )),
86 notify_time=TimestampIts(0), multiplicity=1,
87 order=(OrderTupleValue(
88 attribute="cam.generationDeltaTime",
89 ordering_direction=OrderingDirection.ASCENDING,
90 ),),
91 ),
92 on_cam_received,
93 )
94
95 # CA Basic Service
96 vehicle_data = VehicleData(
97 station_id=STATION_ID, station_type=5, drive_direction="forward",
98 vehicle_length={"vehicleLengthValue": 1023, "vehicleLengthConfidenceIndication": "unavailable"},
99 vehicle_width=62,
100 )
101 ca_service = CooperativeAwarenessBasicService(
102 btp_router=btp_router, vehicle_data=vehicle_data, ldm=ldm,
103 )
104 location_service.add_callback(ca_service.cam_transmission_management.location_service_callback)
105
106 # Link Layer
107 btp_router.freeze_callbacks()
108 link_layer = RawLinkLayer("lo", MAC_ADDRESS, receive_callback=gn_router.gn_data_indicate)
109 gn_router.link_layer = link_layer
110
111 print("✅ CA Basic Service running!")
112 print("📡 Broadcasting CAMs...")
113 print("Press Ctrl+C to exit\n")
114
115 try:
116 while True:
117 time.sleep(1)
118 except KeyboardInterrupt:
119 print("\n👋 Shutting down...")
120
121 location_service.stop_event.set()
122 location_service.location_service_thread.join()
123 link_layer.sock.close()
124
125
126if __name__ == "__main__":
127 main()
CAM Generation Rules
According to ETSI standards, CAMs are generated when:
Time-based: At least every 1 second (T_GenCamMax)
Position change: When position changes by more than 4 meters
Speed change: When speed changes by more than 0.5 m/s
Heading change: When heading changes by more than 4 degrees
The minimum interval between CAMs is 100ms (T_GenCamMin).
See Also
Getting Started — Complete tutorial with CA Basic Service
Local Dynamic Map — Where received CAMs are stored
Decentralized Environmental Notification (DEN) Service — Decentralized Environmental Notifications (hazard warnings)
Basic Transport Protocol (BTP) — Transport layer (BTP port 2001)