Getting Started

Welcome to FlexStack®! This tutorial will walk you through building your first V2X application — a vehicle that broadcasts its position to nearby vehicles using Cooperative Awareness Messages (CAMs).

Note

By the end of this tutorial, you’ll have a working V2X station that can send and receive CAM messages, the foundation of vehicle-to-vehicle communication.

What You’ll Build

In this tutorial, you’ll create a simple V2X application that:

  • 📡 Broadcasts your vehicle’s position, speed, and heading to nearby vehicles

  • 📥 Receives CAM messages from other vehicles

  • 🗺️ Stores received data in a Local Dynamic Map (LDM)

This is the “Hello World” of V2X — the Cooperative Awareness use case that enables vehicles to know about each other’s presence on the road.

Prerequisites

Before starting, make sure you have:

  • Python 3.8 or higher

  • Linux operating system (required for raw socket access)

  • sudo privileges (needed for low-level network access)

Installation

Install FlexStack® using pip:

pip install v2xflexstack

That’s it! All dependencies are installed automatically.

Understanding the Protocol Stack

Before diving into code, let’s understand what we’re building. The ETSI C-ITS protocol stack looks like this:

        graph TB
    subgraph "Application Layer"
        APP[Your Application]
    end
    subgraph "Facilities Layer"
        LDM[Local Dynamic Map]
        CAM[CA Basic Service]
    end
    subgraph "Transport & Network"
        BTP[BTP Router]
        GN[GeoNetworking Router]
    end
    subgraph "Access Layer"
        LL[Link Layer]
    end

    APP --> LDM
    LDM --> CAM
    CAM <--> BTP
    CAM --> LDM
    BTP <--> GN
    GN <--> LL

    style APP fill:#e1f5fe
    style CAM fill:#fff3e0
    style LDM fill:#fff3e0
    style BTP fill:#f3e5f5
    style GN fill:#f3e5f5
    style LL fill:#e8f5e9
    

Each layer has a specific role:

  • Link Layer: Handles raw network communication (ITS-G5 or C-V2X)

  • GeoNetworking: Routes messages based on geographic position

  • BTP: Multiplexes messages to the correct service

  • Facilities Layer: Provides high-level services like CAM broadcasting

  • Local Dynamic Map: Stores and manages received V2X data

We’ll build this stack from the bottom up.

Step-by-Step Tutorial

Let’s build our CAM broadcaster step by step. Create a new file called cam_example.py and follow along.

Step 1: Basic Setup

First, let’s set up logging and define some constants for our vehicle:

import logging
import random
import time

# Enable logging to see what's happening
logging.basicConfig(level=logging.INFO)

def generate_random_mac_address() -> bytes:
    """Generate a valid random MAC address."""
    octets = [random.randint(0x00, 0xFF) for _ in range(6)]
    # Set locally administered and unicast bits
    octets[0] = (octets[0] & 0b11111110) | 0b00000010
    return bytes(octets)

# Vehicle configuration
POSITION_COORDINATES = [41.386931, 2.112104]  # Barcelona, Spain
MAC_ADDRESS = generate_random_mac_address()
STATION_ID = random.randint(1, 2147483647)

Step 2: Location Service

V2X is all about location. The protocol stack needs to know where your vehicle is. FlexStack® provides different location services — we’ll use a static one for this tutorial:

from flexstack.utils.static_location_service import ThreadStaticLocationService

# Create a location service that reports our fixed position every second
location_service = ThreadStaticLocationService(
    period=1000,  # Update every 1000ms
    latitude=POSITION_COORDINATES[0],
    longitude=POSITION_COORDINATES[1]
)

Tip

In a real application, you’d use GPSDLocationService to get live GPS data from a GPSD daemon.

Step 3: GeoNetworking Router

GeoNetworking is the heart of V2X — it routes messages based on geographic areas. Let’s set it up:

from flexstack.geonet.router import Router as GNRouter
from flexstack.geonet.mib import MIB
from flexstack.geonet.gn_address import GNAddress, M, ST, MID

# Configure the GeoNetworking Management Information Base (MIB)
mib = MIB(
    itsGnLocalGnAddr=GNAddress(
        m=M.GN_MULTICAST,      # Multicast addressing mode
        st=ST.CYCLIST,         # Station type (see ETSI TS 102 894-2)
        mid=MID(MAC_ADDRESS),  # Mobile ID based on MAC address
    ),
)

# Create the GeoNetworking router
gn_router = GNRouter(mib=mib, sign_service=None)

# Connect location updates to the router
location_service.add_callback(gn_router.refresh_ego_position_vector)

Step 4: BTP Router

The Basic Transport Protocol (BTP) multiplexes incoming messages to the right service based on port numbers:

from flexstack.btp.router import Router as BTPRouter

# Create BTP router and connect it to GeoNetworking
btp_router = BTPRouter(gn_router)
gn_router.register_indication_callback(btp_router.btp_data_indication)

Step 5: Local Dynamic Map (LDM)

The Local Dynamic Map is like a database for V2X — it stores all received messages and lets you query them. This is where you’ll find data about nearby vehicles:

from flexstack.facilities.local_dynamic_map.factory import LDMFactory
from flexstack.facilities.local_dynamic_map.ldm_classes import (
    AccessPermission,
    Circle,
    Filter,
    FilterStatement,
    ComparisonOperators,
    GeometricArea,
    Location,
    OrderTupleValue,
    OrderingDirection,
    SubscribeDataobjectsReq,
    SubscribeDataObjectsResp,
    SubscribeDataobjectsResult,
    RegisterDataConsumerReq,
    RegisterDataConsumerResp,
    RequestDataObjectsResp,
    TimestampIts,
)
from flexstack.facilities.local_dynamic_map.ldm_constants import CAM

# Define our location for the LDM
ldm_location = Location.initializer(
    latitude=int(POSITION_COORDINATES[0] * 10**7),
    longitude=int(POSITION_COORDINATES[1] * 10**7),
)

# Define area of interest: 5km radius around our position
ldm_area = GeometricArea(
    circle=Circle(radius=5000),
    rectangle=None,
    ellipse=None,
)

# Create the LDM
ldm_factory = LDMFactory()
ldm = ldm_factory.create_ldm(
    ldm_location,
    ldm_maintenance_type="Reactive",
    ldm_service_type="Reactive",
    ldm_database_type="Dictionary",
)

# Keep LDM updated with our position
location_service.add_callback(ldm_location.location_service_callback)

Now let’s register as a data consumer and subscribe to CAM messages:

# Register as a consumer of CAM data
register_response: RegisterDataConsumerResp = ldm.if_ldm_4.register_data_consumer(
    RegisterDataConsumerReq(
        application_id=CAM,
        access_permisions=(AccessPermission.CAM,),
        area_of_interest=ldm_area,
    )
)

if register_response.result == 2:
    print("Failed to register with LDM!")
    exit(1)

# Define what happens when we receive a CAM
def on_cam_received(data: RequestDataObjectsResp) -> None:
    station_id = data.data_objects[0]["dataObject"]["header"]["stationId"]
    print(f"🚗 Received CAM from station: {station_id}")

# Subscribe to CAM messages (excluding our own)
subscribe_response: SubscribeDataObjectsResp = ldm.if_ldm_4.subscribe_data_consumer(
    SubscribeDataobjectsReq(
        application_id=CAM,
        data_object_type=(CAM,),
        priority=1,
        filter=Filter(
            filter_statement_1=FilterStatement(
                "header.stationId",
                ComparisonOperators.NOT_EQUAL,
                STATION_ID  # Don't notify us about our own CAMs
            )
        ),
        notify_time=TimestampIts(0),
        multiplicity=1,
        order=(
            OrderTupleValue(
                attribute="cam.generationDeltaTime",
                ordering_direction=OrderingDirection.ASCENDING
            ),
        ),
    ),
    on_cam_received,
)

if subscribe_response.result != SubscribeDataobjectsResult.SUCCESSFUL:
    print("Failed to subscribe to CAM messages!")
    exit(1)

Step 6: CA Basic Service

Now for the main attraction — the Cooperative Awareness Basic Service. This is what actually sends and receives CAM messages:

from flexstack.facilities.ca_basic_service.ca_basic_service import (
    CooperativeAwarenessBasicService,
)
from flexstack.facilities.ca_basic_service.cam_transmission_management import (
    VehicleData,
)

# Configure our vehicle's properties
vehicle_data = VehicleData(
    station_id=STATION_ID,
    station_type=5,  # Passenger car (see ETSI TS 102 894-2)
    drive_direction="forward",
    vehicle_length={
        "vehicleLengthValue": 1023,  # Unknown length
        "vehicleLengthConfidenceIndication": "unavailable",
    },
    vehicle_width=62,  # ~1.5m width
)

# Create the CA Basic Service
ca_basic_service = CooperativeAwarenessBasicService(
    btp_router=btp_router,
    vehicle_data=vehicle_data,
    ldm=ldm,
)

# Connect location updates to CAM transmission
location_service.add_callback(
    ca_basic_service.cam_transmission_management.location_service_callback
)

Step 8: Run the Application

Add the main loop to keep everything running:

print("✅ CA Basic Service is running!")
print("📡 Broadcasting CAMs and listening for nearby vehicles...")
print("Press Ctrl+C to exit.\n")

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("\n👋 Shutting down...")

# Clean up
location_service.stop_event.set()
location_service.location_service_thread.join()
link_layer.sock.close()

Complete Example

Here’s the complete script. Save it as cam_example.py and run with sudo:

sudo python cam_example.py
  1#!/usr/bin/env python3
  2"""
  3FlexStack® CAM Example
  4
  5A simple V2X application that broadcasts Cooperative Awareness Messages (CAMs)
  6and listens for CAMs from nearby vehicles.
  7
  8Run with: sudo python cam_example.py
  9"""
 10
 11import logging
 12import random
 13import time
 14
 15# FlexStack imports
 16from flexstack.linklayer.raw_link_layer import RawLinkLayer
 17from flexstack.geonet.router import Router as GNRouter
 18from flexstack.geonet.mib import MIB
 19from flexstack.geonet.gn_address import GNAddress, M, ST, MID
 20from flexstack.btp.router import Router as BTPRouter
 21from flexstack.utils.static_location_service import ThreadStaticLocationService
 22from flexstack.facilities.local_dynamic_map.factory import LDMFactory
 23from flexstack.facilities.local_dynamic_map.ldm_constants import CAM
 24from flexstack.facilities.local_dynamic_map.ldm_classes import (
 25    AccessPermission,
 26    Circle,
 27    ComparisonOperators,
 28    Filter,
 29    FilterStatement,
 30    GeometricArea,
 31    Location,
 32    OrderTupleValue,
 33    OrderingDirection,
 34    RegisterDataConsumerReq,
 35    RegisterDataConsumerResp,
 36    RequestDataObjectsResp,
 37    SubscribeDataobjectsReq,
 38    SubscribeDataObjectsResp,
 39    SubscribeDataobjectsResult,
 40    TimestampIts,
 41)
 42from flexstack.facilities.ca_basic_service.ca_basic_service import (
 43    CooperativeAwarenessBasicService,
 44)
 45from flexstack.facilities.ca_basic_service.cam_transmission_management import (
 46    VehicleData,
 47)
 48
 49# Enable logging
 50logging.basicConfig(level=logging.INFO)
 51
 52
 53def generate_random_mac_address() -> bytes:
 54    """Generate a valid random MAC address."""
 55    octets = [random.randint(0x00, 0xFF) for _ in range(6)]
 56    octets[0] = (octets[0] & 0b11111110) | 0b00000010
 57    return bytes(octets)
 58
 59
 60# Configuration
 61POSITION_COORDINATES = [41.386931, 2.112104]  # Barcelona, Spain
 62MAC_ADDRESS = generate_random_mac_address()
 63STATION_ID = random.randint(1, 2147483647)
 64
 65
 66def main():
 67    # ========== Step 2: Location Service ==========
 68    location_service = ThreadStaticLocationService(
 69        period=1000,
 70        latitude=POSITION_COORDINATES[0],
 71        longitude=POSITION_COORDINATES[1],
 72    )
 73
 74    # ========== Step 3: GeoNetworking ==========
 75    mib = MIB(
 76        itsGnLocalGnAddr=GNAddress(
 77            m=M.GN_MULTICAST,
 78            st=ST.CYCLIST,
 79            mid=MID(MAC_ADDRESS),
 80        ),
 81    )
 82    gn_router = GNRouter(mib=mib, sign_service=None)
 83    location_service.add_callback(gn_router.refresh_ego_position_vector)
 84
 85    # ========== Step 4: BTP ==========
 86    btp_router = BTPRouter(gn_router)
 87    gn_router.register_indication_callback(btp_router.btp_data_indication)
 88
 89    # ========== Step 5: Local Dynamic Map ==========
 90    ldm_location = Location.initializer(
 91        latitude=int(POSITION_COORDINATES[0] * 10**7),
 92        longitude=int(POSITION_COORDINATES[1] * 10**7),
 93    )
 94    ldm_area = GeometricArea(
 95        circle=Circle(radius=5000),
 96        rectangle=None,
 97        ellipse=None,
 98    )
 99    ldm_factory = LDMFactory()
100    ldm = ldm_factory.create_ldm(
101        ldm_location,
102        ldm_maintenance_type="Reactive",
103        ldm_service_type="Reactive",
104        ldm_database_type="Dictionary",
105    )
106    location_service.add_callback(ldm_location.location_service_callback)
107
108    # Register with LDM
109    register_response: RegisterDataConsumerResp = ldm.if_ldm_4.register_data_consumer(
110        RegisterDataConsumerReq(
111            application_id=CAM,
112            access_permisions=(AccessPermission.CAM,),
113            area_of_interest=ldm_area,
114        )
115    )
116    if register_response.result == 2:
117        print("Failed to register with LDM!")
118        exit(1)
119
120    # CAM received callback
121    def on_cam_received(data: RequestDataObjectsResp) -> None:
122        station_id = data.data_objects[0]["dataObject"]["header"]["stationId"]
123        print(f"🚗 Received CAM from station: {station_id}")
124
125    # Subscribe to CAMs
126    subscribe_response: SubscribeDataObjectsResp = ldm.if_ldm_4.subscribe_data_consumer(
127        SubscribeDataobjectsReq(
128            application_id=CAM,
129            data_object_type=(CAM,),
130            priority=1,
131            filter=Filter(
132                filter_statement_1=FilterStatement(
133                    "header.stationId",
134                    ComparisonOperators.NOT_EQUAL,
135                    STATION_ID,
136                )
137            ),
138            notify_time=TimestampIts(0),
139            multiplicity=1,
140            order=(
141                OrderTupleValue(
142                    attribute="cam.generationDeltaTime",
143                    ordering_direction=OrderingDirection.ASCENDING,
144                ),
145            ),
146        ),
147        on_cam_received,
148    )
149    if subscribe_response.result != SubscribeDataobjectsResult.SUCCESSFUL:
150        print("Failed to subscribe to CAM messages!")
151        exit(1)
152
153    # ========== Step 6: CA Basic Service ==========
154    vehicle_data = VehicleData(
155        station_id=STATION_ID,
156        station_type=5,
157        drive_direction="forward",
158        vehicle_length={
159            "vehicleLengthValue": 1023,
160            "vehicleLengthConfidenceIndication": "unavailable",
161        },
162        vehicle_width=62,
163    )
164    ca_basic_service = CooperativeAwarenessBasicService(
165        btp_router=btp_router,
166        vehicle_data=vehicle_data,
167        ldm=ldm,
168    )
169    location_service.add_callback(
170        ca_basic_service.cam_transmission_management.location_service_callback
171    )
172
173    # ========== Step 7: Link Layer ==========
174    btp_router.freeze_callbacks()
175    link_layer = RawLinkLayer(
176        "lo",
177        MAC_ADDRESS,
178        receive_callback=gn_router.gn_data_indicate,
179    )
180    gn_router.link_layer = link_layer
181
182    # ========== Step 8: Run ==========
183    print("✅ CA Basic Service is running!")
184    print("📡 Broadcasting CAMs and listening for nearby vehicles...")
185    print("Press Ctrl+C to exit.\n")
186
187    try:
188        while True:
189            time.sleep(1)
190    except KeyboardInterrupt:
191        print("\n👋 Shutting down...")
192
193    location_service.stop_event.set()
194    location_service.location_service_thread.join()
195    link_layer.sock.close()
196
197
198if __name__ == "__main__":
199    main()

Need Help?

  • Check the Overview for a high-level introduction

  • Browse the module documentation in the sidebar

  • Visit our GitHub repository for issues and discussions