Local Dynamic Map
The Local Dynamic Map (LDM) is FlexStack’s intelligent data storage system. It collects, organizes, and serves V2X messages — giving your applications a real-time view of the surrounding traffic environment.
Note
The LDM implements ETSI EN 302 895 V1.1.1. Think of it as a database that automatically manages message validity based on time and location.
What the LDM Does
flowchart LR
subgraph "Data Providers"
CAM[CA Basic Service<br/>CAMs]
DEN[DEN Service<br/>DENMs]
VRU[VRU Service<br/>VAMs]
end
subgraph "LDM"
DB[(LDM Database)]
MT[Maintenance<br/>Auto-cleanup]
end
subgraph "Data Consumers"
APP1[Collision Warning App]
APP2[Traffic Analysis App]
APP3[Navigation App]
end
CAM -->|"Publish"| DB
DEN -->|"Publish"| DB
VRU -->|"Publish"| DB
DB -->|"Query"| APP1
DB -->|"Subscribe"| APP2
DB -->|"Query"| APP3
MT -.->|"Remove expired"| DB
style DB fill:#e3f2fd,stroke:#1565c0
Key Features:
Feature |
Description |
|---|---|
📥 Data Collection |
Receives messages from CAM, DENM, VAM services |
🗄️ Organized Storage |
Stores messages with timestamps and locations |
🔍 Query Interface |
Find specific messages by station ID, type, or location |
📡 Subscriptions |
Get notified when new messages arrive |
🧹 Auto Maintenance |
Automatically removes expired or out-of-area messages |
Architecture
The LDM consists of two main components:
flowchart TB
subgraph "LDM Service"
REG[Registration<br/>Providers & Consumers]
PUB[Publish Interface<br/>IF-LDM-3]
QRY[Query Interface<br/>IF-LDM-4]
SUB[Subscribe Interface<br/>IF-LDM-4]
end
subgraph "LDM Maintenance"
TIME[Time Validity<br/>Check]
AREA[Area of Maintenance<br/>Check]
CLEAN[Cleanup<br/>Service]
end
subgraph "Storage"
DB[(LDM Database)]
end
subgraph "Location"
LOC[Location Service]
end
REG --> DB
PUB --> DB
QRY --> DB
SUB --> DB
TIME --> CLEAN
AREA --> CLEAN
CLEAN --> DB
LOC --> AREA
style DB fill:#fff3e0,stroke:#e65100
Components:
Component |
Description |
|---|---|
LDM Service |
Handles registration, publishing, queries, and subscriptions |
LDM Maintenance |
Removes expired messages and those outside the area |
IF-LDM-3 |
Interface for Data Providers (publish data) |
IF-LDM-4 |
Interface for Data Consumers (query/subscribe) |
Supported Data Types
The LDM can store various V2X message types:
Type |
Access Permission |
Description |
|---|---|---|
CAM |
|
Cooperative Awareness Messages from vehicles |
DENM |
|
Decentralized Environmental Notifications |
VAM |
|
VRU Awareness Messages |
MAPEM |
|
Map topology messages |
SPATEM |
|
Signal phase and timing |
Getting Started
Step 1: Create the LDM
Use the LDMFactory to create an LDM instance:
from flexstack.facilities.local_dynamic_map.factory import LDMFactory
from flexstack.facilities.local_dynamic_map.ldm_classes import (
Location,
GeometricArea,
Circle,
)
# Define LDM location (your position)
ldm_location = Location.initializer(
latitude=int(41.386931 * 10**7), # Scaled integer format
longitude=int(2.112104 * 10**7),
)
# Define area of interest (5 km radius)
ldm_area = GeometricArea(
circle=Circle(radius=5000),
rectangle=None,
ellipse=None,
)
# Create the LDM
ldm_factory = LDMFactory()
ldm = ldm_factory.create_ldm(
ldm_location,
ldm_maintenance_type="Reactive",
ldm_service_type="Reactive",
ldm_database_type="Dictionary",
)
LDM Configuration Options:
Parameter |
Description |
|---|---|
|
|
|
|
|
|
Step 2: Connect Location Updates
Keep the LDM’s area of maintenance updated with your position:
from flexstack.utils.static_location_service import ThreadStaticLocationService
location_service = ThreadStaticLocationService(
period=1000,
latitude=41.386931,
longitude=2.112104,
)
# Update LDM location when position changes
location_service.add_callback(ldm_location.location_service_callback)
Step 3: Connect to Facility Services
The LDM works best when connected to facility services like CA Basic Service:
from flexstack.facilities.ca_basic_service.ca_basic_service import (
CooperativeAwarenessBasicService,
)
# CA Basic Service automatically publishes to LDM
ca_basic_service = CooperativeAwarenessBasicService(
btp_router=btp_router,
vehicle_data=vehicle_data,
ldm=ldm, # Pass the LDM here
)
Data Providers
Data Providers publish messages to the LDM. Facility services (CAM, DENM, VAM) automatically register as providers when you pass the LDM to them.
Register a Custom Provider
For custom applications that need to publish data:
from flexstack.facilities.local_dynamic_map.ldm_classes import (
RegisterDataProviderReq,
AccessPermission,
TimeValidity,
)
from flexstack.facilities.local_dynamic_map.ldm_constants import CAM
# Register as a data provider
ldm.ldm_if_ldm_3.register_data_provider(
RegisterDataProviderReq(
application_id=CAM,
access_permissions=[AccessPermission.CAM],
time_validity=TimeValidity(5), # Messages valid for 5 seconds
)
)
Publish Data
from flexstack.facilities.local_dynamic_map.ldm_classes import (
AddDataProviderReq,
TimestampIts,
Location,
TimeValidity,
)
# Publish a CAM to the LDM
timestamp = TimestampIts.initialize_with_utc_timestamp_seconds()
response = ldm.ldm_if_ldm_3.add_provider_data(
AddDataProviderReq(
application_id=CAM,
timestamp=timestamp,
location=Location.location_builder_circle(
latitude=cam["cam"]["camParameters"]["basicContainer"]["referencePosition"]["latitude"],
longitude=cam["cam"]["camParameters"]["basicContainer"]["referencePosition"]["longitude"],
altitude=cam["cam"]["camParameters"]["basicContainer"]["referencePosition"]["altitude"]["altitudeValue"],
radius=0,
),
data_object=cam,
time_validity=TimeValidity(5),
)
)
Data Consumers
Data Consumers retrieve messages from the LDM using queries or subscriptions.
Register as a Consumer
from flexstack.facilities.local_dynamic_map.ldm_classes import (
RegisterDataConsumerReq,
RegisterDataConsumerResp,
AccessPermission,
GeometricArea,
Circle,
RequestDataObjectsResp,
)
from flexstack.facilities.local_dynamic_map.ldm_constants import CAM
# Register to consume CAMs and VAMs
response: RegisterDataConsumerResp = ldm.if_ldm_4.register_data_consumer(
RegisterDataConsumerReq(
application_id=CAM,
access_permisions=(
AccessPermission.CAM,
AccessPermission.VAM,
AccessPermission.DENM,
),
area_of_interest=GeometricArea(
circle=Circle(radius=5000), # 5 km radius
rectangle=None,
ellipse=None,
),
)
)
if response.result == 2:
print("Registration failed!")
exit(1)
Querying the LDM
Query for specific messages using filters:
flowchart LR
APP[Application] -->|"Query Request"| LDM[(LDM)]
LDM -->|"Filtered Results"| APP
subgraph "Query Options"
F[Filter by field]
O[Order results]
T[Filter by type]
end
Basic Query
from flexstack.facilities.local_dynamic_map.ldm_classes import (
RequestDataObjectsReq,
RequestDataObjectsResp,
Filter,
FilterStatement,
ComparisonOperators,
OrderTupleValue,
OrderingDirection,
)
from flexstack.facilities.local_dynamic_map.ldm_constants import CAM, VAM
# Create a query for CAMs and VAMs
request = RequestDataObjectsReq(
application_id=CAM,
data_object_type=[CAM, VAM],
priority=0,
order=(OrderTupleValue("timeStamp", OrderingDirection.ASCENDING),),
filter=None, # No filter = return all
)
# Execute the query
result: RequestDataObjectsResp = ldm.request_data_objects(request)
for obj in result.data_objects:
print(f"Station: {obj['dataObject']['header']['stationId']}")
Filtered Query
Find messages from a specific station:
# Filter: header.stationId == 12345
filter = Filter(
FilterStatement(
"header.stationId",
ComparisonOperators.EQUAL,
12345,
)
)
request = RequestDataObjectsReq(
application_id=CAM,
data_object_type=[CAM],
priority=0,
order=(OrderTupleValue("timeStamp", OrderingDirection.DESCENDING),),
filter=filter,
)
result = ldm.request_data_objects(request)
Available Comparison Operators:
Operator |
Description |
|---|---|
|
Field equals value |
|
Field does not equal value |
|
Field is greater than value |
|
Field is less than value |
Subscribing to Updates
Get notified when new messages arrive:
sequenceDiagram
participant App as Application
participant LDM as LDM
participant CAM as CA Basic Service
App->>LDM: Subscribe (filter, callback)
LDM-->>App: Subscription confirmed
loop Every matching message
CAM->>LDM: New CAM arrives
LDM->>App: Callback with data
end
Subscribe Example
from flexstack.facilities.local_dynamic_map.ldm_classes import (
SubscribeDataobjectsReq,
SubscribeDataObjectsResp,
SubscribeDataobjectsResult,
Filter,
FilterStatement,
ComparisonOperators,
OrderTupleValue,
OrderingDirection,
TimestampIts,
)
# Define callback function
def on_new_message(data: RequestDataObjectsResp) -> None:
for obj in data.data_objects:
station_id = obj["dataObject"]["header"]["stationId"]
print(f"📨 New message from station {station_id}")
# Subscribe to CAMs (excluding our own)
MY_STATION_ID = 12345
response: SubscribeDataObjectsResp = ldm.if_ldm_4.subscribe_data_consumer(
SubscribeDataobjectsReq(
application_id=CAM,
data_object_type=(CAM,),
priority=1,
filter=Filter(
filter_statement_1=FilterStatement(
"header.stationId",
ComparisonOperators.NOT_EQUAL,
MY_STATION_ID,
)
),
notify_time=TimestampIts(0), # Notify immediately
multiplicity=1,
order=(
OrderTupleValue(
attribute="cam.generationDeltaTime",
ordering_direction=OrderingDirection.ASCENDING,
),
),
),
on_new_message, # Callback function
)
if response.result != SubscribeDataobjectsResult.SUCCESSFUL:
print("Subscription failed! Ensure the LDM Data Consumer is registered.")
LDM Data Object
Messages are stored as LDM Data Objects:
# Example CAM stored in LDM
data_object = {
"header": {
"protocolVersion": 2,
"messageId": 2,
"stationId": 12345
},
"cam": {
"generationDeltaTime": 5000,
"camParameters": {
"basicContainer": {
"stationType": 5,
"referencePosition": {
"latitude": 413869310,
"longitude": 21121040,
# ... more fields
}
},
"highFrequencyContainer": {
# Speed, heading, etc.
}
}
}
}
Area of Maintenance
The LDM only keeps messages within a defined geographical area:
flowchart TB
subgraph "Area of Maintenance"
CENTER[Your Position<br/>📍]
R1((5 km radius))
end
MSG1[Message A<br/>✅ Inside] --> CENTER
MSG2[Message B<br/>✅ Inside] --> CENTER
MSG3[Message C<br/>❌ Outside] -.->|"Rejected"| CENTER
style CENTER fill:#e8f5e9,stroke:#2e7d32
style MSG3 fill:#ffebee,stroke:#c62828
Messages outside the area of maintenance are automatically removed.
Complete Example
Here’s a complete example using LDM with CA Basic Service:
1#!/usr/bin/env python3
2"""
3Local Dynamic Map Example
4
5Stores and queries CAMs using the LDM.
6Run with: sudo python ldm_example.py
7"""
8
9import logging
10import random
11import time
12
13from flexstack.linklayer.raw_link_layer import RawLinkLayer
14from flexstack.geonet.router import Router as GNRouter
15from flexstack.geonet.mib import MIB
16from flexstack.geonet.gn_address import GNAddress, M, ST, MID
17from flexstack.btp.router import Router as BTPRouter
18from flexstack.utils.static_location_service import ThreadStaticLocationService
19from flexstack.facilities.ca_basic_service.ca_basic_service import (
20 CooperativeAwarenessBasicService,
21)
22from flexstack.facilities.ca_basic_service.cam_transmission_management import VehicleData
23from flexstack.facilities.local_dynamic_map.factory import LDMFactory
24from flexstack.facilities.local_dynamic_map.ldm_classes import (
25 AccessPermission,
26 Circle,
27 ComparisonOperators,
28 Filter,
29 FilterStatement,
30 GeometricArea,
31 Location,
32 OrderTupleValue,
33 OrderingDirection,
34 RegisterDataConsumerReq,
35 RegisterDataConsumerResp,
36 RequestDataObjectsResp,
37 SubscribeDataobjectsReq,
38 SubscribeDataObjectsResp,
39 SubscribeDataobjectsResult,
40 TimestampIts,
41)
42from flexstack.facilities.local_dynamic_map.ldm_constants import CAM
43
44logging.basicConfig(level=logging.INFO)
45
46# Configuration
47POSITION = [41.386931, 2.112104]
48MAC_ADDRESS = bytes([(random.randint(0, 255) & 0xFE) | 0x02] +
49 [random.randint(0, 255) for _ in range(5)])
50STATION_ID = random.randint(1, 2147483647)
51
52
53def main():
54 # Location Service
55 location_service = ThreadStaticLocationService(
56 period=1000,
57 latitude=POSITION[0],
58 longitude=POSITION[1],
59 )
60
61 # GeoNetworking
62 mib = MIB(
63 itsGnLocalGnAddr=GNAddress(
64 m=M.GN_MULTICAST,
65 st=ST.PASSENGER_CAR,
66 mid=MID(MAC_ADDRESS),
67 ),
68 )
69 gn_router = GNRouter(mib=mib, sign_service=None)
70 location_service.add_callback(gn_router.refresh_ego_position_vector)
71
72 # BTP
73 btp_router = BTPRouter(gn_router)
74 gn_router.register_indication_callback(btp_router.btp_data_indication)
75
76 # Local Dynamic Map
77 ldm_location = Location.initializer(
78 latitude=int(POSITION[0] * 10**7),
79 longitude=int(POSITION[1] * 10**7),
80 )
81 ldm_area = GeometricArea(
82 circle=Circle(radius=5000),
83 rectangle=None,
84 ellipse=None,
85 )
86 ldm_factory = LDMFactory()
87 ldm = ldm_factory.create_ldm(
88 ldm_location,
89 ldm_maintenance_type="Reactive",
90 ldm_service_type="Reactive",
91 ldm_database_type="Dictionary",
92 )
93 location_service.add_callback(ldm_location.location_service_callback)
94
95 # Register as LDM Consumer
96 response: RegisterDataConsumerResp = ldm.if_ldm_4.register_data_consumer(
97 RegisterDataConsumerReq(
98 application_id=CAM,
99 access_permisions=(AccessPermission.CAM,),
100 area_of_interest=ldm_area,
101 )
102 )
103 if response.result == 2:
104 print("❌ LDM registration failed!")
105 exit(1)
106
107 # Subscribe to incoming CAMs
108 def on_cam_received(data: RequestDataObjectsResp) -> None:
109 if data.data_objects:
110 station = data.data_objects[0]["dataObject"]["header"]["stationId"]
111 print(f"📨 Received CAM from station {station}")
112
113 sub_response: SubscribeDataObjectsResp = ldm.if_ldm_4.subscribe_data_consumer(
114 SubscribeDataobjectsReq(
115 application_id=CAM,
116 data_object_type=(CAM,),
117 priority=1,
118 filter=Filter(
119 filter_statement_1=FilterStatement(
120 "header.stationId",
121 ComparisonOperators.NOT_EQUAL,
122 STATION_ID,
123 )
124 ),
125 notify_time=TimestampIts(0),
126 multiplicity=1,
127 order=(
128 OrderTupleValue(
129 attribute="cam.generationDeltaTime",
130 ordering_direction=OrderingDirection.ASCENDING,
131 ),
132 ),
133 ),
134 on_cam_received,
135 )
136 if sub_response.result != SubscribeDataobjectsResult.SUCCESSFUL:
137 print("❌ LDM subscription failed!")
138 exit(1)
139
140 # CA Basic Service with LDM
141 vehicle_data = VehicleData(
142 station_id=STATION_ID,
143 station_type=5,
144 drive_direction="forward",
145 vehicle_length={
146 "vehicleLengthValue": 1023,
147 "vehicleLengthConfidenceIndication": "unavailable",
148 },
149 vehicle_width=62,
150 )
151 ca_basic_service = CooperativeAwarenessBasicService(
152 btp_router=btp_router,
153 vehicle_data=vehicle_data,
154 ldm=ldm,
155 )
156 location_service.add_callback(
157 ca_basic_service.cam_transmission_management.location_service_callback
158 )
159
160 # Link Layer
161 btp_router.freeze_callbacks()
162 link_layer = RawLinkLayer(
163 "lo",
164 MAC_ADDRESS,
165 receive_callback=gn_router.gn_data_indicate,
166 )
167 gn_router.link_layer = link_layer
168
169 print("✅ LDM with CA Basic Service running!")
170 print("📡 Sending CAMs and listening for others...")
171 print("Press Ctrl+C to exit\n")
172
173 try:
174 while True:
175 time.sleep(1)
176 except KeyboardInterrupt:
177 print("\n👋 Shutting down...")
178
179 location_service.stop_event.set()
180 location_service.location_service_thread.join()
181 link_layer.sock.close()
182
183
184if __name__ == "__main__":
185 main()
Use Cases
Query the LDM to find nearby vehicles and calculate collision risks based on their positions and trajectories.
Subscribe to all CAMs in an area to analyze traffic density, average speeds, and flow patterns.
Store DENMs in the LDM and query for active hazards along your planned route.
Combine CAMs and VAMs in the LDM to detect vulnerable road users near vehicles.
See Also
Getting Started — Complete V2X tutorial
Cooperative Awareness (CA) Basic Service — Cooperative Awareness Messages
Decentralized Environmental Notification (DEN) Service — Decentralized Environmental Notifications
VRU Awareness Service — VRU Awareness Messages
Basic Transport Protocol (BTP) — Transport layer