Techno Blender
Digitally Yours.

Finding the Fastest Lane at Border Crossings Using Machine Vision | by Danilo Najkov | Feb, 2023

0 50


Generated image from the proposed solution of the “Blace” border crossing camera in North Macedonia (public domain)

Crossing the border can be an exciting part of any road trip, but the frustration of long queues at border crossings can quickly take the excitement out of it. What if we could determine the fastest lane and position ourselves accordingly? Thanks to advancements in machine vision, this is now a possibility. By leveraging OpenCV and YOLOv3, a deep learning algorithm, it’s now possible to detect and track moving vehicles in real time. In this post, we’ll explore how this technology can be used to help people save time and avoid the stress of long queues at border crossings.

Contents

  • Overview of the problem
  • The data
  • Object detection
    – Car detection
    – Lane detection
  • Object tracking
  • Orchestrating everything
  • Limitations
  • Future work

The main idea of this task is to process the raw image data coming from border camera live streams to detect and track moving vehicles. The problem can be divided into several steps:

  • Get raw image data from the border cameras’ livestream
  • Preprocess the image
  • Apply algorithms to find the location of the vehicles
  • Determine the lane for these vehicles
  • Apply algorithms to determine the speed of the vehicles (and in turn the speed of every lane)

All of these steps are combined and orchestrated with docker-compose for running the solution on multiple border video streams.

The video data is sourced from border cameras that are open to the public. The solutions proposed here can be used on any live stream (.m3u8) of border/toll cameras and can be easily adapted for other forms of video. Nearly all the countries in the world have some sort of service that allows users to see the live conditions of border crossings. For this specific case, I used the live feeds of the cameras of my country. You can find the streams on this link.

Before running any algorithms, we need to load the stream and preprocess the image. This can be easily done with OpenCV2. I created a function that resizes the image to 416×416 pixels (the format required by the implementation of the YOLOv3 model I used)

def preprocess(image):
resized_image = cv2.resize(image, (416, 416))
blob = cv2.dnn.blobFromImage(resized_image, 1 / 255, (416, 416), swapRB=True, crop=False)
return resized_image, blob

In a while loop, images are loaded from the stream and are preprocessed using the above function. All the following changes will also be done in this while loop. Additionally, I check if something has changed from the previous frame, as it would be unnecessary to process the image again if nothing has changed.

old_frame = None
while True:
# read the stream and get the current image
cap = cv2.VideoCapture(BORDER_URL.m3u8)
ret, frame = cap.read()
if not ret:
break

if cv2.waitKey(1) & 0xFF == ord('q'):
break
# resize the image to correct formant
image, blob = preprocess(frame)

# check if the image is the same as the previous one by calculating the mean squared error between the two
if old_frame is not None:
mse = np.mean((image - old_frame) ** 2)
if mse < 0.001:
old_frame = image
continue

Car detection

YOLOv3 architecture for detecting objects (credit to the original YOLOv3 paper)

Now, we are all set for detecting vehicles in the image. I used YOLOv3 for getting the bounding boxes of all the classes that the model was trained on, but kept only the ones that are vehicles. The weights and configuration files are required for loading this model, and you can download them from the official website. This function returns the bounding boxes, the confidence of the model for the detected object, the class of the object (ex. car, truck, person, etc.), and the centers of each of the bounding boxes.

def detect_cars(image, blob):
net = cv2.dnn.readNet("./yolov3.weights", "./yolov3.cfg")
net.setInput(blob)
layer_names = net.getLayerNames()
output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()]
predictions = net.forward(output_layers)
boxes = []
confidences = []
class_ids = []
centers = []
for prediction in predictions:
for detection in prediction:
scores = detection[5:]
class_id = int(np.argmax(scores))
confidence = float(scores[class_id])

# Filter out weak detections
if confidence > 0.5:
# Get detection coordinates
x, y, w, h = (detection[0:4] * np.array(
[image.shape[1], image.shape[0], image.shape[1], image.shape[0]])).astype("int")
x = int(x - w / 2)
y = int(y - h / 2)

center_x = int(x + w // 2)
center_y = int(y + h // 2)

centers.append((center_x, center_y))
boxes.append([int(x), int(y), int(x + w), int(y + h)])
confidences.append(float(confidence))
class_ids.append(class_id)

return boxes, confidences, class_ids, centers

The output of the algorithm before applying NMS on the “Blace” border crossing camera (public domain)

We have all the information about the bounding boxes, however, when we run the application we see that there are a lot of overlapping boxes that need to be removed. For this challenge, I used Non-Maximum Suppression (NMS). Non-Maximum Suppression (NMS) is a post-processing algorithm used in object detection tasks, specifically to remove redundant or overlapping bounding boxes for the same object.

NMS is applied to filter out redundant bounding boxes and keep only the most appropriate ones. The basic idea behind NMS is to first sort the bounding boxes based on their detection confidence score (i.e., the probability that the bounding box contains an object). Starting with the bounding box with the highest confidence score, NMS suppresses all overlapping bounding boxes that have an Intersection over Union (IoU) value greater than a certain threshold (e.g., 0.5).

The following function performs this algorithm. It takes as parameters the bounding boxes, the confidence of the bounding boxes, and the threshold we want to filter out the duplicates. It returns the IDs of the boxes we need to keep.

def NMS(boxes, confidences, threshold):
if len(boxes) == 0:
return []

boxes = np.array(boxes)
x1 = boxes[:, 0]
y1 = boxes[:, 1]
x2 = boxes[:, 2]
y2 = boxes[:, 3]

scores = confidences
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
order = scores.argsort()[::-1]

keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])

w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
ovr = inter / (areas[i] + areas[order[1:]] - inter)

inds = np.where(ovr <= threshold)[0]
order = order[inds + 1]

return keep

After using this algorithm, only one bounding box is retained per vehicle. You may need to adjust your threshold value to find what works for you.

Lane detection

While there are lane detection algorithms that could be effective for this task, they may not be the most efficient option. Since the cameras are fixed and lane positions don’t change, it would be a waste of processing time to use such algorithms. To address this, I saved the lane positions in a JSON file as a list of points. However, before determining the speed of the lanes, one additional challenge remains: identifying which lane each car is in.

For this, I created the following functions which find where each car is depending on its x-coordinate. It returns the index of the lane in which the car is. If the car is outside the lanes it returns -1.

def get_x_at_y(line, y):
loc = -1
for segment in range(len(line)-1):
if line[segment][1] <= y <= line[segment + 1][1] or line[segment][1] >= y >= line[segment + 1][1]:
loc = segment
if loc == -1:
return None
slope = (line[segment+1][1] - line[segment][1]) / (line[segment+1][0]-line[segment][0])
return (y-line[segment+1][1])/slope + line[segment+1][0]

def track_lanes(car_centers, image, lanes):
if len(car_centers) < 2:
return []
ccs_with_pos = []
for cc in car_centers:
xs = []
for lane in lanes:
xs.append(get_x_at_y(lane, cc[1]))
dists = []
if None in xs:
ccs_with_pos.append((cc, -1))
continue
for x in xs:
dists.append(x-cc[0])
if dists[0]>0 or dists[-1]<0:
ccs_with_pos.append((cc, -1))
continue
# check if x value of the car is between each two lines
for i in range(len(dists)-1):
if dists[i] < 0 < dists[i + 1]:
ccs_with_pos.append((cc, i))
break
return ccs_with_pos

We have everything ready to see the rate of change in each lane, by implementing object tracking.

Object tracking is the process of locating a specific object or multiple objects in a video stream and following their movement over time. The goal of object tracking is to maintain the identity of the objects across successive frames of the video, despite changes in the object’s position, orientation, size, and appearance. Many object-tracking algorithms exist, but for this specific use case, I decided to use optical flow, as it works fairly well with streams with a lower frame rate.

Optical flow is a technique that involves analyzing the changes in the intensity of adjacent pixels in successive video frames to determine the direction and speed of movement of objects in the scene. Optical flow algorithms rely on the assumption that the brightness of a pixel in one frame is the same as its corresponding pixel in the next frame, allowing for the calculation of the displacement of objects between frames.

This algorithm is much more complicated, so I will not be explaining it in detail. As parameters, it takes the image and previous image, the centers of the cars from the YOLOv3 model, the lanes to determine if the movement is happening in each lane, and cars in lanes to see if the movement is a result of vehicle movement or other factors. It returns a list of speeds for each lane.

def optical_flow(image, prev_image, features_to_track, lanes, cars_in_lanes):
if len(cars_in_lanes) == 0:
return list([-1 for el in range(len(lanes))])
feature_params = dict(maxCorners=100,
qualityLevel=0.3,
minDistance=20,
blockSize=7)
lk_params = dict(winSize=(30, 30),
maxLevel=5,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

prev_gray = cv2.convertScaleAbs(prev_image)
prev_gray = cv2.cvtColor(prev_gray, cv2.COLOR_BGR2GRAY)
p0 = np.array(features_to_track).reshape((-1, 1, 2))
p0 = np.float32(p0)
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

# Compute optical flow
p1, st, err = cv2.calcOpticalFlowPyrLK(prev_gray, gray, p0, None, **lk_params)

# Filter good points
good_new = p1[st == 1]
good_prev = p0[st == 1]

# Find the speed of each lane
lane_speeds = list([0 for el in range(len(lanes))])
for i, (new, prev) in enumerate(zip(good_new, good_prev)):
x_new, y_new = new.ravel()
x_prev, y_prev = prev.ravel()
xs_new = []
xs_old = []
for lane in lanes:
xs_new.append(get_x_at_y(lane, y_new))
xs_old.append(get_x_at_y(lane, y_prev))
dists_new = []
dists_old = []
if None in xs_new or None in xs_old:
continue
for x in xs_new:
dists_new.append(x - x_new)
for x in xs_old:
dists_old.append(x - x_prev)
if dists_new[0] > 0 or dists_new[-1] < 0 or dists_old[0] > 0 or dists_old[-1] < 0:
continue
lane_new = None
lane_old = None
for j in range(len(dists_new) - 1):
if dists_new[j] < 0 < dists_new[j + 1]:
lane_new = j
break
for j in range(len(dists_old) - 1):
if dists_old[j] < 0 < dists_old[j + 1]:
lane_old = j
break
lane_speeds[lane_new] += math.dist([x_new, y_new],[x_prev, y_prev])

for i, ls in enumerate(lane_speeds):
cnt = 0
for car in cars_in_lanes:
if car[1] == i:
cnt += 1
if cnt == 0:
lane_speeds[i] = -1
continue
lane_speeds[i] = lane_speeds[i]/cnt

return lane_speeds

The output of the optical flow algorithm is saved and compared to the output of the previous frame. If there is a significant change in the optical flow values for each lane, it is recorded as 1 movement point. This is implemented in the following function.

def calc_movement(new_movement, old_movement, sum_movement):
for i in range(len(new_movement)):
if new_movement[i] > old_movement[i] + 1:
sum_movement[i] += 1
return sum_movement

Now that everything is implemented, we can visualize the results. This function shows the image with the bounding boxes of each car, and lanes of traffic, and prints the speed of lanes in the console.


def visualize(boxes, centers, class_ids, keep, image, colors, lanes, ccs_with_pos):
for lane in lanes:
cv2.polylines(image, [np.array(lane).reshape((-1, 1, 2))], False, (0,0,255), 2)
for i, (box, c, ci) in enumerate(zip(boxes, centers, class_ids)):
if i not in keep:
continue
class_list = ['car', 'motorbike', 'bus', 'truck', 'person']
if classes[ci] in class_list:
color = colors[class_list.index(classes[ci])]
thickness = 2
x, y, w, h = box
cv2.rectangle(image, (x, y), (w, h), color, thickness)

if ccs_with_pos is not None:
prev_vehs = list([None for el in range(len(lanes))])
for i in range(416, 0, -1):
for car in ccs_with_pos:
if car[0][1] == i and car[1] != -1:
if prev_vehs[car[1]] is None:
prev_vehs[car[1]] = car
cv2.putText(image, 'lane ' + str(car[1]+1), (car[0][0]-10, car[0][1]+10), cv2.FONT_HERSHEY_SIMPLEX,
0.5, (255, 0, 0), 2, cv2.LINE_AA)
else:
cv2.line(image, car[0], prev_vehs[car[1]][0], (255,0,0), 2)
prev_vehs[car[1]] = car
cv2.imshow("image", image)

After running everything in the main while loop, we get the following video. The program outputs that the second lane (lane 2) had the most movement and therefore is the fastest.

Sped up GIF of around 3 minutes of border footage from the “Blace” border crossing (public domain)

Although the algorithm is complete for detecting and tracking vehicles in a single border crossing, I wanted to extend the solution to work across multiple crossings while still saving the results of speed. To achieve this, I decided to run each border camera script as a separate container that communicates within a docker-compose. Since running a separate instance of the YOLOv3 model for each camera would be inefficient, I created a separate container with a flask app that serves the other containers. However, to ensure that multiple requests don’t use the same neural network simultaneously, it’s important to incorporate semaphores or other locking mechanisms.

In addition, I developed a .Net web API that saves the speeds and cars in each lane to a database. It is out of this post’s scope to include the full code here, but you can find it on my GitHub repository. With this implementation, the solution can be deployed to multiple border crossings, and the resulting data can be efficiently collected and analyzed, providing valuable insights into traffic patterns and congestion at different locations.

version: '3'
services:
api:
container_name: api
build:
context: ./api
dockerfile: Dockerfile
image: api:latest
ports:
- 80:80
restart: always

yolo:
build: ./yolo_detector
command: python script.py
container_name: yolo
ports:
- 5001:5001
restart: always

script0:
build: ./image_processing
command: python count.py 0
container_name: script0
restart: always

script1:
build: ./image_processing
command: python count.py 1
container_name: script1
restart: always

...

While our algorithm has proven to be highly effective, it is important to acknowledge its limitations. Firstly, YOLOv3, the object detection model I utilized, is less accurate in low-light conditions, such as at night. As a result, the number of vehicles detected will be significantly lower, potentially leading to incorrect assumptions about traffic volume. Additionally, the model struggles to detect objects at long distances, meaning that there is a cap on the number of vehicles that can be detected in long queues.

Furthermore, it’s important to consider that border crossings can vary significantly in design. While the majority of borders may follow a similar layout, there are some with more complex features, such as curved lanes and additional barriers. These variations can render the algorithm unusable in certain scenarios. It’s worth noting that further development and optimization could potentially address these limitations, but for now, it’s essential to take these factors into account when implementing our solution.

Example border checkpoint from the “Tabanovce” crossing (public domain) where the lane speed algorithm cannot be used.

While this solution provides an effective means of determining vehicle speed and traffic congestion at border checkpoints, there is still room for improvement. One area for future work is to explore other object detection models to see if they offer improved accuracy in low-light conditions or when detecting objects at longer distances.

Additionally, it may be useful to investigate the feasibility of implementing this algorithm at border crossings with more complex layouts. This could involve developing new algorithms to address the challenges posed by curved lanes, additional barriers, or other features unique to those crossings.

Furthermore, the current solution could be extended to incorporate machine learning techniques that could adapt to changing traffic patterns over time. By training the model on data collected over an extended period, it could potentially learn to adjust its parameters based on traffic conditions, improving its accuracy and reliability over time.

Finally, it may be possible to integrate data from other sources, such as social media or traffic cameras, to provide a more comprehensive understanding of traffic patterns at border crossings. By incorporating data from a range of sources, it may be possible to find correlations between traffic patterns and external factors, such as weather or public events, providing valuable insights for traffic management and future planning. This data can help people better plan their trips, by setting departure times in which there is the least expected traffic.

In conclusion, by using machine vision algorithms, we have developed an effective solution for determining vehicle speed and traffic volume at border crossings. You can experiment with the code on GitHub and try what results you get with the border crossings near you. Thank you for reading!


Generated image from the proposed solution of the “Blace” border crossing camera in North Macedonia (public domain)

Crossing the border can be an exciting part of any road trip, but the frustration of long queues at border crossings can quickly take the excitement out of it. What if we could determine the fastest lane and position ourselves accordingly? Thanks to advancements in machine vision, this is now a possibility. By leveraging OpenCV and YOLOv3, a deep learning algorithm, it’s now possible to detect and track moving vehicles in real time. In this post, we’ll explore how this technology can be used to help people save time and avoid the stress of long queues at border crossings.

Contents

  • Overview of the problem
  • The data
  • Object detection
    – Car detection
    – Lane detection
  • Object tracking
  • Orchestrating everything
  • Limitations
  • Future work

The main idea of this task is to process the raw image data coming from border camera live streams to detect and track moving vehicles. The problem can be divided into several steps:

  • Get raw image data from the border cameras’ livestream
  • Preprocess the image
  • Apply algorithms to find the location of the vehicles
  • Determine the lane for these vehicles
  • Apply algorithms to determine the speed of the vehicles (and in turn the speed of every lane)

All of these steps are combined and orchestrated with docker-compose for running the solution on multiple border video streams.

The video data is sourced from border cameras that are open to the public. The solutions proposed here can be used on any live stream (.m3u8) of border/toll cameras and can be easily adapted for other forms of video. Nearly all the countries in the world have some sort of service that allows users to see the live conditions of border crossings. For this specific case, I used the live feeds of the cameras of my country. You can find the streams on this link.

Before running any algorithms, we need to load the stream and preprocess the image. This can be easily done with OpenCV2. I created a function that resizes the image to 416×416 pixels (the format required by the implementation of the YOLOv3 model I used)

def preprocess(image):
resized_image = cv2.resize(image, (416, 416))
blob = cv2.dnn.blobFromImage(resized_image, 1 / 255, (416, 416), swapRB=True, crop=False)
return resized_image, blob

In a while loop, images are loaded from the stream and are preprocessed using the above function. All the following changes will also be done in this while loop. Additionally, I check if something has changed from the previous frame, as it would be unnecessary to process the image again if nothing has changed.

old_frame = None
while True:
# read the stream and get the current image
cap = cv2.VideoCapture(BORDER_URL.m3u8)
ret, frame = cap.read()
if not ret:
break

if cv2.waitKey(1) & 0xFF == ord('q'):
break
# resize the image to correct formant
image, blob = preprocess(frame)

# check if the image is the same as the previous one by calculating the mean squared error between the two
if old_frame is not None:
mse = np.mean((image - old_frame) ** 2)
if mse < 0.001:
old_frame = image
continue

Car detection

YOLOv3 architecture for detecting objects (credit to the original YOLOv3 paper)

Now, we are all set for detecting vehicles in the image. I used YOLOv3 for getting the bounding boxes of all the classes that the model was trained on, but kept only the ones that are vehicles. The weights and configuration files are required for loading this model, and you can download them from the official website. This function returns the bounding boxes, the confidence of the model for the detected object, the class of the object (ex. car, truck, person, etc.), and the centers of each of the bounding boxes.

def detect_cars(image, blob):
net = cv2.dnn.readNet("./yolov3.weights", "./yolov3.cfg")
net.setInput(blob)
layer_names = net.getLayerNames()
output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()]
predictions = net.forward(output_layers)
boxes = []
confidences = []
class_ids = []
centers = []
for prediction in predictions:
for detection in prediction:
scores = detection[5:]
class_id = int(np.argmax(scores))
confidence = float(scores[class_id])

# Filter out weak detections
if confidence > 0.5:
# Get detection coordinates
x, y, w, h = (detection[0:4] * np.array(
[image.shape[1], image.shape[0], image.shape[1], image.shape[0]])).astype("int")
x = int(x - w / 2)
y = int(y - h / 2)

center_x = int(x + w // 2)
center_y = int(y + h // 2)

centers.append((center_x, center_y))
boxes.append([int(x), int(y), int(x + w), int(y + h)])
confidences.append(float(confidence))
class_ids.append(class_id)

return boxes, confidences, class_ids, centers

The output of the algorithm before applying NMS on the “Blace” border crossing camera (public domain)

We have all the information about the bounding boxes, however, when we run the application we see that there are a lot of overlapping boxes that need to be removed. For this challenge, I used Non-Maximum Suppression (NMS). Non-Maximum Suppression (NMS) is a post-processing algorithm used in object detection tasks, specifically to remove redundant or overlapping bounding boxes for the same object.

NMS is applied to filter out redundant bounding boxes and keep only the most appropriate ones. The basic idea behind NMS is to first sort the bounding boxes based on their detection confidence score (i.e., the probability that the bounding box contains an object). Starting with the bounding box with the highest confidence score, NMS suppresses all overlapping bounding boxes that have an Intersection over Union (IoU) value greater than a certain threshold (e.g., 0.5).

The following function performs this algorithm. It takes as parameters the bounding boxes, the confidence of the bounding boxes, and the threshold we want to filter out the duplicates. It returns the IDs of the boxes we need to keep.

def NMS(boxes, confidences, threshold):
if len(boxes) == 0:
return []

boxes = np.array(boxes)
x1 = boxes[:, 0]
y1 = boxes[:, 1]
x2 = boxes[:, 2]
y2 = boxes[:, 3]

scores = confidences
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
order = scores.argsort()[::-1]

keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])

w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
ovr = inter / (areas[i] + areas[order[1:]] - inter)

inds = np.where(ovr <= threshold)[0]
order = order[inds + 1]

return keep

After using this algorithm, only one bounding box is retained per vehicle. You may need to adjust your threshold value to find what works for you.

Lane detection

While there are lane detection algorithms that could be effective for this task, they may not be the most efficient option. Since the cameras are fixed and lane positions don’t change, it would be a waste of processing time to use such algorithms. To address this, I saved the lane positions in a JSON file as a list of points. However, before determining the speed of the lanes, one additional challenge remains: identifying which lane each car is in.

For this, I created the following functions which find where each car is depending on its x-coordinate. It returns the index of the lane in which the car is. If the car is outside the lanes it returns -1.

def get_x_at_y(line, y):
loc = -1
for segment in range(len(line)-1):
if line[segment][1] <= y <= line[segment + 1][1] or line[segment][1] >= y >= line[segment + 1][1]:
loc = segment
if loc == -1:
return None
slope = (line[segment+1][1] - line[segment][1]) / (line[segment+1][0]-line[segment][0])
return (y-line[segment+1][1])/slope + line[segment+1][0]

def track_lanes(car_centers, image, lanes):
if len(car_centers) < 2:
return []
ccs_with_pos = []
for cc in car_centers:
xs = []
for lane in lanes:
xs.append(get_x_at_y(lane, cc[1]))
dists = []
if None in xs:
ccs_with_pos.append((cc, -1))
continue
for x in xs:
dists.append(x-cc[0])
if dists[0]>0 or dists[-1]<0:
ccs_with_pos.append((cc, -1))
continue
# check if x value of the car is between each two lines
for i in range(len(dists)-1):
if dists[i] < 0 < dists[i + 1]:
ccs_with_pos.append((cc, i))
break
return ccs_with_pos

We have everything ready to see the rate of change in each lane, by implementing object tracking.

Object tracking is the process of locating a specific object or multiple objects in a video stream and following their movement over time. The goal of object tracking is to maintain the identity of the objects across successive frames of the video, despite changes in the object’s position, orientation, size, and appearance. Many object-tracking algorithms exist, but for this specific use case, I decided to use optical flow, as it works fairly well with streams with a lower frame rate.

Optical flow is a technique that involves analyzing the changes in the intensity of adjacent pixels in successive video frames to determine the direction and speed of movement of objects in the scene. Optical flow algorithms rely on the assumption that the brightness of a pixel in one frame is the same as its corresponding pixel in the next frame, allowing for the calculation of the displacement of objects between frames.

This algorithm is much more complicated, so I will not be explaining it in detail. As parameters, it takes the image and previous image, the centers of the cars from the YOLOv3 model, the lanes to determine if the movement is happening in each lane, and cars in lanes to see if the movement is a result of vehicle movement or other factors. It returns a list of speeds for each lane.

def optical_flow(image, prev_image, features_to_track, lanes, cars_in_lanes):
if len(cars_in_lanes) == 0:
return list([-1 for el in range(len(lanes))])
feature_params = dict(maxCorners=100,
qualityLevel=0.3,
minDistance=20,
blockSize=7)
lk_params = dict(winSize=(30, 30),
maxLevel=5,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

prev_gray = cv2.convertScaleAbs(prev_image)
prev_gray = cv2.cvtColor(prev_gray, cv2.COLOR_BGR2GRAY)
p0 = np.array(features_to_track).reshape((-1, 1, 2))
p0 = np.float32(p0)
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

# Compute optical flow
p1, st, err = cv2.calcOpticalFlowPyrLK(prev_gray, gray, p0, None, **lk_params)

# Filter good points
good_new = p1[st == 1]
good_prev = p0[st == 1]

# Find the speed of each lane
lane_speeds = list([0 for el in range(len(lanes))])
for i, (new, prev) in enumerate(zip(good_new, good_prev)):
x_new, y_new = new.ravel()
x_prev, y_prev = prev.ravel()
xs_new = []
xs_old = []
for lane in lanes:
xs_new.append(get_x_at_y(lane, y_new))
xs_old.append(get_x_at_y(lane, y_prev))
dists_new = []
dists_old = []
if None in xs_new or None in xs_old:
continue
for x in xs_new:
dists_new.append(x - x_new)
for x in xs_old:
dists_old.append(x - x_prev)
if dists_new[0] > 0 or dists_new[-1] < 0 or dists_old[0] > 0 or dists_old[-1] < 0:
continue
lane_new = None
lane_old = None
for j in range(len(dists_new) - 1):
if dists_new[j] < 0 < dists_new[j + 1]:
lane_new = j
break
for j in range(len(dists_old) - 1):
if dists_old[j] < 0 < dists_old[j + 1]:
lane_old = j
break
lane_speeds[lane_new] += math.dist([x_new, y_new],[x_prev, y_prev])

for i, ls in enumerate(lane_speeds):
cnt = 0
for car in cars_in_lanes:
if car[1] == i:
cnt += 1
if cnt == 0:
lane_speeds[i] = -1
continue
lane_speeds[i] = lane_speeds[i]/cnt

return lane_speeds

The output of the optical flow algorithm is saved and compared to the output of the previous frame. If there is a significant change in the optical flow values for each lane, it is recorded as 1 movement point. This is implemented in the following function.

def calc_movement(new_movement, old_movement, sum_movement):
for i in range(len(new_movement)):
if new_movement[i] > old_movement[i] + 1:
sum_movement[i] += 1
return sum_movement

Now that everything is implemented, we can visualize the results. This function shows the image with the bounding boxes of each car, and lanes of traffic, and prints the speed of lanes in the console.


def visualize(boxes, centers, class_ids, keep, image, colors, lanes, ccs_with_pos):
for lane in lanes:
cv2.polylines(image, [np.array(lane).reshape((-1, 1, 2))], False, (0,0,255), 2)
for i, (box, c, ci) in enumerate(zip(boxes, centers, class_ids)):
if i not in keep:
continue
class_list = ['car', 'motorbike', 'bus', 'truck', 'person']
if classes[ci] in class_list:
color = colors[class_list.index(classes[ci])]
thickness = 2
x, y, w, h = box
cv2.rectangle(image, (x, y), (w, h), color, thickness)

if ccs_with_pos is not None:
prev_vehs = list([None for el in range(len(lanes))])
for i in range(416, 0, -1):
for car in ccs_with_pos:
if car[0][1] == i and car[1] != -1:
if prev_vehs[car[1]] is None:
prev_vehs[car[1]] = car
cv2.putText(image, 'lane ' + str(car[1]+1), (car[0][0]-10, car[0][1]+10), cv2.FONT_HERSHEY_SIMPLEX,
0.5, (255, 0, 0), 2, cv2.LINE_AA)
else:
cv2.line(image, car[0], prev_vehs[car[1]][0], (255,0,0), 2)
prev_vehs[car[1]] = car
cv2.imshow("image", image)

After running everything in the main while loop, we get the following video. The program outputs that the second lane (lane 2) had the most movement and therefore is the fastest.

Sped up GIF of around 3 minutes of border footage from the “Blace” border crossing (public domain)

Although the algorithm is complete for detecting and tracking vehicles in a single border crossing, I wanted to extend the solution to work across multiple crossings while still saving the results of speed. To achieve this, I decided to run each border camera script as a separate container that communicates within a docker-compose. Since running a separate instance of the YOLOv3 model for each camera would be inefficient, I created a separate container with a flask app that serves the other containers. However, to ensure that multiple requests don’t use the same neural network simultaneously, it’s important to incorporate semaphores or other locking mechanisms.

In addition, I developed a .Net web API that saves the speeds and cars in each lane to a database. It is out of this post’s scope to include the full code here, but you can find it on my GitHub repository. With this implementation, the solution can be deployed to multiple border crossings, and the resulting data can be efficiently collected and analyzed, providing valuable insights into traffic patterns and congestion at different locations.

version: '3'
services:
api:
container_name: api
build:
context: ./api
dockerfile: Dockerfile
image: api:latest
ports:
- 80:80
restart: always

yolo:
build: ./yolo_detector
command: python script.py
container_name: yolo
ports:
- 5001:5001
restart: always

script0:
build: ./image_processing
command: python count.py 0
container_name: script0
restart: always

script1:
build: ./image_processing
command: python count.py 1
container_name: script1
restart: always

...

While our algorithm has proven to be highly effective, it is important to acknowledge its limitations. Firstly, YOLOv3, the object detection model I utilized, is less accurate in low-light conditions, such as at night. As a result, the number of vehicles detected will be significantly lower, potentially leading to incorrect assumptions about traffic volume. Additionally, the model struggles to detect objects at long distances, meaning that there is a cap on the number of vehicles that can be detected in long queues.

Furthermore, it’s important to consider that border crossings can vary significantly in design. While the majority of borders may follow a similar layout, there are some with more complex features, such as curved lanes and additional barriers. These variations can render the algorithm unusable in certain scenarios. It’s worth noting that further development and optimization could potentially address these limitations, but for now, it’s essential to take these factors into account when implementing our solution.

Example border checkpoint from the “Tabanovce” crossing (public domain) where the lane speed algorithm cannot be used.

While this solution provides an effective means of determining vehicle speed and traffic congestion at border checkpoints, there is still room for improvement. One area for future work is to explore other object detection models to see if they offer improved accuracy in low-light conditions or when detecting objects at longer distances.

Additionally, it may be useful to investigate the feasibility of implementing this algorithm at border crossings with more complex layouts. This could involve developing new algorithms to address the challenges posed by curved lanes, additional barriers, or other features unique to those crossings.

Furthermore, the current solution could be extended to incorporate machine learning techniques that could adapt to changing traffic patterns over time. By training the model on data collected over an extended period, it could potentially learn to adjust its parameters based on traffic conditions, improving its accuracy and reliability over time.

Finally, it may be possible to integrate data from other sources, such as social media or traffic cameras, to provide a more comprehensive understanding of traffic patterns at border crossings. By incorporating data from a range of sources, it may be possible to find correlations between traffic patterns and external factors, such as weather or public events, providing valuable insights for traffic management and future planning. This data can help people better plan their trips, by setting departure times in which there is the least expected traffic.

In conclusion, by using machine vision algorithms, we have developed an effective solution for determining vehicle speed and traffic volume at border crossings. You can experiment with the code on GitHub and try what results you get with the border crossings near you. Thank you for reading!

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.

Leave a comment