GuidesChangelog
Guides

Using UDP sinks for traffic data retrieval

When you need traffic data, sometimes using the query-response dynamic of the REST API is not ideal. That's where UDP sinks come in.

The introductory article explains the concept of REST and UDP sinks. REST sinks require applications to periodically send requests to the server to get the current data. Sometimes, however, this form of communication isn’t desirable, like with traffic light controllers which require current updates as fast as possible or aren’t able to send frequent data requests. UDP sinks offer an alternative thanks to their usage of the UDP protocol: Subscribe to a sink once and get a zone's status updates until the subscription expires.

After assigning a UDP sink to an operator, any authorized client application can start communicating with the server via UDP datagrams, whose payloads are JSON objects. At this moment, UDP communication doesn't use any form of authentication. The UDP server listens on the port 55570 by default. You can change this in the Block settings in FLOW Insights.

When creating UDP sinks, make sure that their names are unique across all analytics.

There are three types of UDP sinks: Zone state sink, Category count sink, and Object list sink.

Zone state sink

This sink is subscription-based and provides information about the object presence and sensor state of a corresponding zone.

A typical procedure for your application would be to subscribe to a FLOW server with a ZoneStateSubscribe message and then wait for ZoneStatePush messages. Immediately after subscription, the server sends Push messages for all active sinks - one Push message per sink per UDP datagram. Then until the subscription timeout expires, the server automatically sends Push messages for sinks whose state has changed. Another ZoneStateSubscribe message with the same destination IP address and port will reset the subscription timer.

Apart from that, a device may request extended data from a zone state sink by sending the ZoneExtendedStateRequest. Afterward, a response from all zone state sinks specified in the request is generated - one response per sink per UDP datagram.

ZoneStateSubscribe

  • Direction: To server
  • Example payload:
{
  "ZoneStateSubscribe": {
    "DestinationIpAddress": "192.168.11.1",
    "DestinationPort": 4444,
    "SubscriptionTimeout_s": 10,
    "Options": [ "IdList" ]
  }
}

Description:

  • The message contains an IP address and port of the device that should receive ZoneStatePush messages for the duration of the timeout.
  • The Options array is optional. If it contains the IdList string, then all ZoneStatePush responses will contain the IdList, IdListStartTimestamp, and IdListEndTimestamp properties.

ZoneStatePush

  • Direction: From server to the address specified by the ZoneStateSubscribe message
  • Example payload:
{
  "ZoneStatePush": {
        "Id": "z001",
        "Failure": false,
        "FailureState": "NoFailure",
        "Presence": true,
        "IdList": [ 
          "3", "1", "6", "7", "4", "5", //...
         ],
         "IdListStartTimestamp":"1650541963538",
         "IdListEndTimestamp": "1650542571179"
  }
}

Description:

  • Id - the identifier of the corresponding sink
  • Failure - denotes whether there is a failure within a sensor watching the zone
  • Presence - denotes whether there is a traffic object within the zone
  • FailureState - a string describing the failure state, can have the following values:
    • NoFailure
    • SensorFailure - failure of the camera, sensor, or another internal component
    • CommunicationFailure
    • ConfigurationFailure - invalid configuration, e.g. the corresponding zone doesn’t exist
    • EnvironmentalInterference - e.g. camera blinded by the Sun, reflections on the road, fog, darkness, frost
    • SensorCalibration - the system is still initializing and cannot provide detection data yet
  • IdList - present only if requested in the ZoneStateSubscribe message. Contains IDs of all traffic objects passed by the operator to which the sink is attached.
  • IdListStartTimestamp - present only if requested in the ZoneStateSubscribe message. It's the timestamp (in milliseconds since epoch) of the earliest point that FLOW's trajectory cache includes. This is equivalent to the cache_start_timestamp value in the analytics info response. Trajectory caching is explained in another article.
  • IdListEndTimestamp - present only if requested in the ZoneStateSubscribe message. It's the timestamp of the last analytics evaluation.

ZoneExtendedStateRequest

  • Direction: To server
  • Example payload:
{
  "ZoneExtendedStateRequest": {
    "Sinks": [ "z001", "z002" ]
  }
}

Description:

  • Sinks - an array of strings denoting which sinks’ extended data the sender wants to receive

ZoneExtendedState

  • Direction: From server to the request sender
  • Example payload:
{
  "ZoneExtendedState": {
    "Id": "z002",
    "VehicleCount": 24,
}

Description:

  • Id - the identifier of the corresponding sink
  • VehicleCount - number of vehicles in the zone
  • The ZoneExtendedState object may actually have more properties, but their values will always be invalid, so it's safe to ignore them.

Category count sink

This sink provides data from a continuous counter of trajectories sorted into categories. The counter resets only when the underlying integer data type overflows. Data from this sink is obtained by requesting them. After sending one request, a response from all Category Count Sinks is generated - one response per sink per UDP datagram.

CategoryCountRequest

  • Direction: To server
  • Payload:
{
  "CategoryCountRequest": {}
}

CategoryCount

  • Direction: From server to the request sender
  • Example payload:
    Raw trajectories UDP
{
  "CategoryCount": {
    "Id": "m1",
       
    "CategoryCounts": [
      {
        "Category": "car",
        "Count": 10
      },
      {
        "Category": "pedestrian",
        "Count": 21
      }
    ]
  }
}

Description:

  • Id - the identifier of the corresponding sink
  • Category - a string containing the name of the category, to which the Count relates. Can have one of the following values: car, light, heavy, bus, motorcycle, bicycle, pedestrian, unknown

Object list sink

This sink is subscription-based and provides detailed information about the objects passed by the attached operator.

Its usage is similar to the Zone state sink: Subscribe to the server with the ObjectListSubscribe message and then wait for ObjectList messages. Immediately after subscription, the server sends ObjectList messages for all active sinks — one ObjectList message per sink per UDP datagram. Then until the subscription timeout expires, the server automatically sends ObjectList messages with every evaluation tick. Another ObjectListSubscribe message with the same destination IP address and port will reset the subscription timer.

ObjectListSubscribe

  • Direction: To server
  • Example payload:
{
  "ObjectListSubscribe": {
    "DestinationIpAddress": "192.168.11.1",
    "DestinationPort": 4444,
    "SubscriptionTimeout_s": 10,
    "Options": [ "BoundingBox" ]
  }
}
  • Comments:
    • The message contains an IP address and port of the device that should receive ObjectList messages for the duration of the timeout.
    • Options - an optional property. If present, additional properties will be included in the response. Possible options include: EvaluationTimestamp, MessageCounter, LicensePlate, BoundingBox, Speed, SpeedAngle, Acceleration, SensorPosition, WGS84Position, MapPosition, Score. See the object list response for details about each additional property.

ObjectList

  • Direction: From server to the address specified by the ObjectListSubscribe message
  • If one sink would output more than 150 trajectories, the response will be split into several parts — one part per UDP datagram.
  • The following example assumes all possible options were specified in the subscription message.
  • Example payload of datagram #1:
{
   "ObjectList":{
      "CubeId": 3, 
      "AnalyticsId": 0,
      "SinkId": 29,
      "EvaluationTimestamp": "1649336808104",
      "Id": "Speed - Object list",
      "Part": 1,
      "TotalParts": 2,
      "Failure": false,
      "FailureState": "",
      "MessageCounter": 0,
      "Objects":[
         {
            "Color":"grey",
            "Id":"408",
            "LicensePlate":"",
            "Category":"car",
            "DetectionScore": 0.9621756076812744,
            "TrajectoryScore": 0.9570966958999634,
            "LastState":{
               "Timestamp":"1649336736729",
               "WGS84Position": [ 16.5928435, 49.225322 ],
               "MapPosition": [ 615951.5, 5453970.5 ],
               "MapSpeed": 0.8981415033340454,
               "MapSpeedAngle": 6.1257758140563965,
               "SensorPosition": [ 0.8809463977813721, 1 ],
               "BoundingBox": [ 0.7785, 0.8504, 1.003, 1.04 ],
            }
         },
         // ...
      ]
   }
}
  • Example payload of datagram #2:
{
   "ObjectList":{
      "AnalyticsId":0,
      "CubeId":3,
      "SinkId":29,
      "EvaluationTimestamp":"1649336808104",
      "Id":"Speed - Object list",
      "Part":2,
      "TotalParts":2,
      "Failure": false,
      "FailureState": "",
      "MessageCounter": 1,
      "Objects":[
         // ...
      ]
   }
}

Description:

  • CubeId - the identifier of the cube that the sink corresponds to
  • AnalyticsId - the identifier of the analytics that the sink corresponds to
  • SinkId - the identifier of the sink
  • EvaluationTimestamp - the timestamp of when this result's analytics evaluation took place
  • If one sink would output more than 150 trajectories, the response will be split into several parts — one part per UDP datagram. This is FLOW's mechanism called JSON fragmentation. In this example, the response has been split into two parts. The part number is indicated by the Part property. Indexing starts from 1. TotalParts indicates how many parts belong to the sink in total for the current evaluation. In practice, you'll mostly need to receive all parts of a sink's output and then merge their Objects arrays before further processing.
  • Failure - indicates whether there has been a problem. If there is one, FailureState contains one of the following error codes:
    • AnalNotRunning - the analytic corresponding to the sink is not running
    • MapPositionMissing - the MapPosition option has been requested, but the analytic is not georegistered.
    • WGS84PositionMisssing - the WGS84Position option has been requested, but the analytic is not georegistered.
    • EnvironmentalInference - poor visibility of the scene has been detected in the analytic
  • Objects - information about the traffic objects outputted by the sink. See the Raw trajectories sink for explanation of each property. Note that unlike the Raw trajectory sink, this sink always uses the CamelCase style for names of its properties. Apart from that, here's a list of differences of the Objects object from the Raw trajectories' trajectories object:
    • There's DetectionScore instead of last_detection_score
    • There's LastState instead of state_data. This object contains only the last value of each type, e.g. only the last known bounding box etc. Therefore, all keys of this object are in singular instead of plural.

UDP fragmentation

Even with JSON fragmentation, the UDP datagrams can get large. For example, one ObjectList JSON part can have up to 19 KB. When sending such large datagrams over a network, they could get discarded by the receiver or any machine along the way if the datagram's size exceeds their MTU settings. Usually, MTU is set to 1500 B, but sometimes it can be as small as 512 B. It's up to you to check the proper network settings in machines running the FLOW BLOCK, the receiving machine, and machines along the way.

FLOW lets you set the maximum size of UDP datagrams to accommodate your network settings. In FLOW Insights, log into a FLOW BLOCK as admin, select BLOCK -> Settings -> UDP settings -> Enable payload fragmentation. Then you're able to set the Maximum datagram size.

Once set, each datagram exceeding the maximum value will be split into multiple datagrams. Please note that this fragmentation occurs on FLOW's application layer and has nothing to do with UDP's intrinsic but unreliable fragmentation on the transport layer. Therefore it's up to the receiving application to build the whole payload from received parts.

When UDP fragmentation is enabled, all datagrams' payloads are prepended with a 16-byte header containing three unsigned values:

  1. 8 B timestamp of the first part (milliseconds since Epoch) in the series
  2. 4 B part number (starts from 0)
  3. 4 B total number of parts

These values are serialized as big-endian (most significant byte first). Here's an illustration of what the string {trajectories: "example"} would look like when split into two datagrams:

Keep in mind that UDP fragmentation affects outputs of all UDP sinks, not just the Object list sink. In the case of the Object list sink output, UDP fragmentation takes place after possible JSON fragmentation. Let's see an example where an ObjectList response has been split into two JSON parts - the first JSON part was further split into three datagrams, while the second JSON part was split into two datagrams:

What next?

You can also make FLOW send you widget outputs on its own with webhooks.


Need more help or have some questions? Contact us!