Local Dynamic Map

The Local Dynamic Map (LDM) is FlexStack’s intelligent data storage system. It collects, organizes, and serves V2X messages — giving your applications a real-time view of the surrounding traffic environment.

Note

The LDM implements ETSI EN 302 895 V1.1.1. Think of it as a database that automatically manages message validity based on time and location.

What the LDM Does

        flowchart LR
    subgraph "Data Providers"
        CAM[CA Basic Service<br/>CAMs]
        DEN[DEN Service<br/>DENMs]
        VRU[VRU Service<br/>VAMs]
    end

    subgraph "LDM"
        DB[(LDM Database)]
        MT[Maintenance<br/>Auto-cleanup]
    end

    subgraph "Data Consumers"
        APP1[Collision Warning App]
        APP2[Traffic Analysis App]
        APP3[Navigation App]
    end

    CAM -->|"Publish"| DB
    DEN -->|"Publish"| DB
    VRU -->|"Publish"| DB
    DB -->|"Query"| APP1
    DB -->|"Subscribe"| APP2
    DB -->|"Query"| APP3
    MT -.->|"Remove expired"| DB

    style DB fill:#e3f2fd,stroke:#1565c0
    

Key Features:

Feature

Description

📥 Data Collection

Receives messages from CAM, DENM, VAM services

🗄️ Organized Storage

Stores messages with timestamps and locations

🔍 Query Interface

Find specific messages by station ID, type, or location

📡 Subscriptions

Get notified when new messages arrive

🧹 Auto Maintenance

Automatically removes expired or out-of-area messages


Architecture

The LDM consists of two main components:

        flowchart TB
    subgraph "LDM Service"
        REG[Registration<br/>Providers & Consumers]
        PUB[Publish Interface<br/>IF-LDM-3]
        QRY[Query Interface<br/>IF-LDM-4]
        SUB[Subscribe Interface<br/>IF-LDM-4]
    end

    subgraph "LDM Maintenance"
        TIME[Time Validity<br/>Check]
        AREA[Area of Maintenance<br/>Check]
        CLEAN[Cleanup<br/>Service]
    end

    subgraph "Storage"
        DB[(LDM Database)]
    end

    subgraph "Location"
        LOC[Location Service]
    end

    REG --> DB
    PUB --> DB
    QRY --> DB
    SUB --> DB
    TIME --> CLEAN
    AREA --> CLEAN
    CLEAN --> DB
    LOC --> AREA

    style DB fill:#fff3e0,stroke:#e65100
    

Components:

Component

Description

LDM Service

Handles registration, publishing, queries, and subscriptions

LDM Maintenance

Removes expired messages and those outside the area

IF-LDM-3

Interface for Data Providers (publish data)

IF-LDM-4

Interface for Data Consumers (query/subscribe)


Supported Data Types

The LDM can store various V2X message types:

Type

Access Permission

Description

CAM

AccessPermission.CAM

Cooperative Awareness Messages from vehicles

DENM

AccessPermission.DENM

Decentralized Environmental Notifications

VAM

AccessPermission.VAM

VRU Awareness Messages

MAPEM

AccessPermission.MAPEM

Map topology messages

SPATEM

AccessPermission.SPATEM

Signal phase and timing


Getting Started

Step 1: Create the LDM

Use the LDMFactory to create an LDM instance:

from flexstack.facilities.local_dynamic_map.factory import LDMFactory
from flexstack.facilities.local_dynamic_map.ldm_classes import (
    Location,
    GeometricArea,
    Circle,
)

# Define LDM location (your position)
ldm_location = Location.initializer(
    latitude=int(41.386931 * 10**7),   # Scaled integer format
    longitude=int(2.112104 * 10**7),
)

# Define area of interest (5 km radius)
ldm_area = GeometricArea(
    circle=Circle(radius=5000),
    rectangle=None,
    ellipse=None,
)

# Create the LDM
ldm_factory = LDMFactory()
ldm = ldm_factory.create_ldm(
    ldm_location,
    ldm_maintenance_type="Reactive",
    ldm_service_type="Reactive",
    ldm_database_type="Dictionary",
)

LDM Configuration Options:

Parameter

Description

ldm_maintenance_type

"Reactive" — cleans up on access

ldm_service_type

"Reactive" — processes requests on demand

ldm_database_type

"Dictionary" — in-memory Python dict storage

Step 2: Connect Location Updates

Keep the LDM’s area of maintenance updated with your position:

from flexstack.utils.static_location_service import ThreadStaticLocationService

location_service = ThreadStaticLocationService(
    period=1000,
    latitude=41.386931,
    longitude=2.112104,
)

# Update LDM location when position changes
location_service.add_callback(ldm_location.location_service_callback)

Step 3: Connect to Facility Services

The LDM works best when connected to facility services like CA Basic Service:

from flexstack.facilities.ca_basic_service.ca_basic_service import (
    CooperativeAwarenessBasicService,
)

# CA Basic Service automatically publishes to LDM
ca_basic_service = CooperativeAwarenessBasicService(
    btp_router=btp_router,
    vehicle_data=vehicle_data,
    ldm=ldm,  # Pass the LDM here
)

Data Providers

Data Providers publish messages to the LDM. Facility services (CAM, DENM, VAM) automatically register as providers when you pass the LDM to them.

Register a Custom Provider

For custom applications that need to publish data:

from flexstack.facilities.local_dynamic_map.ldm_classes import (
    RegisterDataProviderReq,
    AccessPermission,
    TimeValidity,
)
from flexstack.facilities.local_dynamic_map.ldm_constants import CAM

# Register as a data provider
ldm.ldm_if_ldm_3.register_data_provider(
    RegisterDataProviderReq(
        application_id=CAM,
        access_permissions=[AccessPermission.CAM],
        time_validity=TimeValidity(5),  # Messages valid for 5 seconds
    )
)

Publish Data

from flexstack.facilities.local_dynamic_map.ldm_classes import (
    AddDataProviderReq,
    TimestampIts,
    Location,
    TimeValidity,
)

# Publish a CAM to the LDM
timestamp = TimestampIts.initialize_with_utc_timestamp_seconds()

response = ldm.ldm_if_ldm_3.add_provider_data(
    AddDataProviderReq(
        application_id=CAM,
        timestamp=timestamp,
        location=Location.location_builder_circle(
            latitude=cam["cam"]["camParameters"]["basicContainer"]["referencePosition"]["latitude"],
            longitude=cam["cam"]["camParameters"]["basicContainer"]["referencePosition"]["longitude"],
            altitude=cam["cam"]["camParameters"]["basicContainer"]["referencePosition"]["altitude"]["altitudeValue"],
            radius=0,
        ),
        data_object=cam,
        time_validity=TimeValidity(5),
    )
)

Data Consumers

Data Consumers retrieve messages from the LDM using queries or subscriptions.

Register as a Consumer

from flexstack.facilities.local_dynamic_map.ldm_classes import (
    RegisterDataConsumerReq,
    RegisterDataConsumerResp,
    AccessPermission,
    GeometricArea,
    Circle,
    RequestDataObjectsResp,
)
from flexstack.facilities.local_dynamic_map.ldm_constants import CAM

# Register to consume CAMs and VAMs
response: RegisterDataConsumerResp = ldm.if_ldm_4.register_data_consumer(
    RegisterDataConsumerReq(
        application_id=CAM,
        access_permisions=(
            AccessPermission.CAM,
            AccessPermission.VAM,
            AccessPermission.DENM,
        ),
        area_of_interest=GeometricArea(
            circle=Circle(radius=5000),  # 5 km radius
            rectangle=None,
            ellipse=None,
        ),
    )
)

if response.result == 2:
    print("Registration failed!")
    exit(1)

Querying the LDM

Query for specific messages using filters:

        flowchart LR
    APP[Application] -->|"Query Request"| LDM[(LDM)]
    LDM -->|"Filtered Results"| APP

    subgraph "Query Options"
        F[Filter by field]
        O[Order results]
        T[Filter by type]
    end
    

Basic Query

from flexstack.facilities.local_dynamic_map.ldm_classes import (
    RequestDataObjectsReq,
    RequestDataObjectsResp,
    Filter,
    FilterStatement,
    ComparisonOperators,
    OrderTupleValue,
    OrderingDirection,
)
from flexstack.facilities.local_dynamic_map.ldm_constants import CAM, VAM

# Create a query for CAMs and VAMs
request = RequestDataObjectsReq(
    application_id=CAM,
    data_object_type=[CAM, VAM],
    priority=0,
    order=(OrderTupleValue("timeStamp", OrderingDirection.ASCENDING),),
    filter=None,  # No filter = return all
)

# Execute the query
result: RequestDataObjectsResp = ldm.request_data_objects(request)

for obj in result.data_objects:
    print(f"Station: {obj['dataObject']['header']['stationId']}")

Filtered Query

Find messages from a specific station:

# Filter: header.stationId == 12345
filter = Filter(
    FilterStatement(
        "header.stationId",
        ComparisonOperators.EQUAL,
        12345,
    )
)

request = RequestDataObjectsReq(
    application_id=CAM,
    data_object_type=[CAM],
    priority=0,
    order=(OrderTupleValue("timeStamp", OrderingDirection.DESCENDING),),
    filter=filter,
)

result = ldm.request_data_objects(request)

Available Comparison Operators:

Operator

Description

EQUAL

Field equals value

NOT_EQUAL

Field does not equal value

GREATER_THAN

Field is greater than value

LESS_THAN

Field is less than value


Subscribing to Updates

Get notified when new messages arrive:

        sequenceDiagram
    participant App as Application
    participant LDM as LDM
    participant CAM as CA Basic Service

    App->>LDM: Subscribe (filter, callback)
    LDM-->>App: Subscription confirmed

    loop Every matching message
        CAM->>LDM: New CAM arrives
        LDM->>App: Callback with data
    end
    

Subscribe Example

from flexstack.facilities.local_dynamic_map.ldm_classes import (
    SubscribeDataobjectsReq,
    SubscribeDataObjectsResp,
    SubscribeDataobjectsResult,
    Filter,
    FilterStatement,
    ComparisonOperators,
    OrderTupleValue,
    OrderingDirection,
    TimestampIts,
)

# Define callback function
def on_new_message(data: RequestDataObjectsResp) -> None:
    for obj in data.data_objects:
        station_id = obj["dataObject"]["header"]["stationId"]
        print(f"📨 New message from station {station_id}")

# Subscribe to CAMs (excluding our own)
MY_STATION_ID = 12345

response: SubscribeDataObjectsResp = ldm.if_ldm_4.subscribe_data_consumer(
    SubscribeDataobjectsReq(
        application_id=CAM,
        data_object_type=(CAM,),
        priority=1,
        filter=Filter(
            filter_statement_1=FilterStatement(
                "header.stationId",
                ComparisonOperators.NOT_EQUAL,
                MY_STATION_ID,
            )
        ),
        notify_time=TimestampIts(0),  # Notify immediately
        multiplicity=1,
        order=(
            OrderTupleValue(
                attribute="cam.generationDeltaTime",
                ordering_direction=OrderingDirection.ASCENDING,
            ),
        ),
    ),
    on_new_message,  # Callback function
)

if response.result != SubscribeDataobjectsResult.SUCCESSFUL:
    print("Subscription failed! Ensure the LDM Data Consumer is registered.")

LDM Data Object

Messages are stored as LDM Data Objects:

# Example CAM stored in LDM
data_object = {
    "header": {
        "protocolVersion": 2,
        "messageId": 2,
        "stationId": 12345
    },
    "cam": {
        "generationDeltaTime": 5000,
        "camParameters": {
            "basicContainer": {
                "stationType": 5,
                "referencePosition": {
                    "latitude": 413869310,
                    "longitude": 21121040,
                    # ... more fields
                }
            },
            "highFrequencyContainer": {
                # Speed, heading, etc.
            }
        }
    }
}

Area of Maintenance

The LDM only keeps messages within a defined geographical area:

        flowchart TB
    subgraph "Area of Maintenance"
        CENTER[Your Position<br/>📍]
        R1((5 km radius))
    end

    MSG1[Message A<br/>✅ Inside] --> CENTER
    MSG2[Message B<br/>✅ Inside] --> CENTER
    MSG3[Message C<br/>❌ Outside] -.->|"Rejected"| CENTER

    style CENTER fill:#e8f5e9,stroke:#2e7d32
    style MSG3 fill:#ffebee,stroke:#c62828
    

Messages outside the area of maintenance are automatically removed.


Complete Example

Here’s a complete example using LDM with CA Basic Service:

  1#!/usr/bin/env python3
  2"""
  3Local Dynamic Map Example
  4
  5Stores and queries CAMs using the LDM.
  6Run with: sudo python ldm_example.py
  7"""
  8
  9import logging
 10import random
 11import time
 12
 13from flexstack.linklayer.raw_link_layer import RawLinkLayer
 14from flexstack.geonet.router import Router as GNRouter
 15from flexstack.geonet.mib import MIB
 16from flexstack.geonet.gn_address import GNAddress, M, ST, MID
 17from flexstack.btp.router import Router as BTPRouter
 18from flexstack.utils.static_location_service import ThreadStaticLocationService
 19from flexstack.facilities.ca_basic_service.ca_basic_service import (
 20    CooperativeAwarenessBasicService,
 21)
 22from flexstack.facilities.ca_basic_service.cam_transmission_management import VehicleData
 23from flexstack.facilities.local_dynamic_map.factory import LDMFactory
 24from flexstack.facilities.local_dynamic_map.ldm_classes import (
 25    AccessPermission,
 26    Circle,
 27    ComparisonOperators,
 28    Filter,
 29    FilterStatement,
 30    GeometricArea,
 31    Location,
 32    OrderTupleValue,
 33    OrderingDirection,
 34    RegisterDataConsumerReq,
 35    RegisterDataConsumerResp,
 36    RequestDataObjectsResp,
 37    SubscribeDataobjectsReq,
 38    SubscribeDataObjectsResp,
 39    SubscribeDataobjectsResult,
 40    TimestampIts,
 41)
 42from flexstack.facilities.local_dynamic_map.ldm_constants import CAM
 43
 44logging.basicConfig(level=logging.INFO)
 45
 46# Configuration
 47POSITION = [41.386931, 2.112104]
 48MAC_ADDRESS = bytes([(random.randint(0, 255) & 0xFE) | 0x02] +
 49                    [random.randint(0, 255) for _ in range(5)])
 50STATION_ID = random.randint(1, 2147483647)
 51
 52
 53def main():
 54    # Location Service
 55    location_service = ThreadStaticLocationService(
 56        period=1000,
 57        latitude=POSITION[0],
 58        longitude=POSITION[1],
 59    )
 60
 61    # GeoNetworking
 62    mib = MIB(
 63        itsGnLocalGnAddr=GNAddress(
 64            m=M.GN_MULTICAST,
 65            st=ST.PASSENGER_CAR,
 66            mid=MID(MAC_ADDRESS),
 67        ),
 68    )
 69    gn_router = GNRouter(mib=mib, sign_service=None)
 70    location_service.add_callback(gn_router.refresh_ego_position_vector)
 71
 72    # BTP
 73    btp_router = BTPRouter(gn_router)
 74    gn_router.register_indication_callback(btp_router.btp_data_indication)
 75
 76    # Local Dynamic Map
 77    ldm_location = Location.initializer(
 78        latitude=int(POSITION[0] * 10**7),
 79        longitude=int(POSITION[1] * 10**7),
 80    )
 81    ldm_area = GeometricArea(
 82        circle=Circle(radius=5000),
 83        rectangle=None,
 84        ellipse=None,
 85    )
 86    ldm_factory = LDMFactory()
 87    ldm = ldm_factory.create_ldm(
 88        ldm_location,
 89        ldm_maintenance_type="Reactive",
 90        ldm_service_type="Reactive",
 91        ldm_database_type="Dictionary",
 92    )
 93    location_service.add_callback(ldm_location.location_service_callback)
 94
 95    # Register as LDM Consumer
 96    response: RegisterDataConsumerResp = ldm.if_ldm_4.register_data_consumer(
 97        RegisterDataConsumerReq(
 98            application_id=CAM,
 99            access_permisions=(AccessPermission.CAM,),
100            area_of_interest=ldm_area,
101        )
102    )
103    if response.result == 2:
104        print("❌ LDM registration failed!")
105        exit(1)
106
107    # Subscribe to incoming CAMs
108    def on_cam_received(data: RequestDataObjectsResp) -> None:
109        if data.data_objects:
110            station = data.data_objects[0]["dataObject"]["header"]["stationId"]
111            print(f"📨 Received CAM from station {station}")
112
113    sub_response: SubscribeDataObjectsResp = ldm.if_ldm_4.subscribe_data_consumer(
114        SubscribeDataobjectsReq(
115            application_id=CAM,
116            data_object_type=(CAM,),
117            priority=1,
118            filter=Filter(
119                filter_statement_1=FilterStatement(
120                    "header.stationId",
121                    ComparisonOperators.NOT_EQUAL,
122                    STATION_ID,
123                )
124            ),
125            notify_time=TimestampIts(0),
126            multiplicity=1,
127            order=(
128                OrderTupleValue(
129                    attribute="cam.generationDeltaTime",
130                    ordering_direction=OrderingDirection.ASCENDING,
131                ),
132            ),
133        ),
134        on_cam_received,
135    )
136    if sub_response.result != SubscribeDataobjectsResult.SUCCESSFUL:
137        print("❌ LDM subscription failed!")
138        exit(1)
139
140    # CA Basic Service with LDM
141    vehicle_data = VehicleData(
142        station_id=STATION_ID,
143        station_type=5,
144        drive_direction="forward",
145        vehicle_length={
146            "vehicleLengthValue": 1023,
147            "vehicleLengthConfidenceIndication": "unavailable",
148        },
149        vehicle_width=62,
150    )
151    ca_basic_service = CooperativeAwarenessBasicService(
152        btp_router=btp_router,
153        vehicle_data=vehicle_data,
154        ldm=ldm,
155    )
156    location_service.add_callback(
157        ca_basic_service.cam_transmission_management.location_service_callback
158    )
159
160    # Link Layer
161    btp_router.freeze_callbacks()
162    link_layer = RawLinkLayer(
163        "lo",
164        MAC_ADDRESS,
165        receive_callback=gn_router.gn_data_indicate,
166    )
167    gn_router.link_layer = link_layer
168
169    print("✅ LDM with CA Basic Service running!")
170    print("📡 Sending CAMs and listening for others...")
171    print("Press Ctrl+C to exit\n")
172
173    try:
174        while True:
175            time.sleep(1)
176    except KeyboardInterrupt:
177        print("\n👋 Shutting down...")
178
179    location_service.stop_event.set()
180    location_service.location_service_thread.join()
181    link_layer.sock.close()
182
183
184if __name__ == "__main__":
185    main()

Use Cases

🚗 Collision Avoidance

Query the LDM to find nearby vehicles and calculate collision risks based on their positions and trajectories.

🚦 Traffic Analysis

Subscribe to all CAMs in an area to analyze traffic density, average speeds, and flow patterns.

⚠️ Hazard Awareness

Store DENMs in the LDM and query for active hazards along your planned route.

🚴 VRU Protection

Combine CAMs and VAMs in the LDM to detect vulnerable road users near vehicles.


See Also