基于YOLOv8与ByteTrack的车辆检测追踪与流量计数系统【python源码+Pyqt5界面+数据集+训练代码】深度学习实战、目标追踪、车辆检测追踪、过线计数、流量统计(3)

简介: 基于YOLOv8与ByteTrack的车辆检测追踪与流量计数系统【python源码+Pyqt5界面+数据集+训练代码】深度学习实战、目标追踪、车辆检测追踪、过线计数、流量统计

基于YOLOv8与ByteTrack的车辆检测追踪与流量计数系统【python源码+Pyqt5界面+数据集+训练代码】深度学习实战、目标追踪、车辆检测追踪、过线计数、流量统计(2)https://developer.aliyun.com/article/1536911

三、使用ByteTrack进行目标追踪

ByteTrack算法简介

论文地址:https://arxiv.org/abs/2110.06864

源码地址:https://github.com/ifzhang/ByteTrack

ByteTrack算法是一种十分强大且高效的追踪算法,和其他非ReID的算法一样,仅仅使用目标追踪所得到的bbox进行追踪。追踪算法使用了卡尔曼滤波预测边界框,然后使用匈牙利算法进行目标和轨迹间的匹配。

ByteTrack算法的最大创新点就是对低分框的使用,作者认为低分框可能是对物体遮挡时产生的框,直接对低分框抛弃会影响性能,所以作者使用低分框对追踪算法进行了二次匹配,有效优化了追踪过程中因为遮挡造成换id的问题。

  • 没有使用ReID特征计算外观相似度
  • 非深度方法,不需要训练
  • 利用高分框和低分框之间的区别和匹配,有效解决遮挡问题

ByteTrack与其他追踪算法的对比如下图所示,可以看到ByteTrack的性能还是相当不错的。

ByteTrack的实现代码如下:

class ByteTrack:
    """
    Initialize the ByteTrack object.
    Parameters:
        track_thresh (float, optional): Detection confidence threshold
            for track activation.
        track_buffer (int, optional): Number of frames to buffer when a track is lost.
        match_thresh (float, optional): Threshold for matching tracks with detections.
        frame_rate (int, optional): The frame rate of the video.
    """
    def __init__(
        self,
        track_thresh: float = 0.25,
        track_buffer: int = 30,
        match_thresh: float = 0.8,
        frame_rate: int = 30,
    ):
        self.track_thresh = track_thresh
        self.match_thresh = match_thresh
        self.frame_id = 0
        self.det_thresh = self.track_thresh + 0.1
        self.max_time_lost = int(frame_rate / 30.0 * track_buffer)
        self.kalman_filter = KalmanFilter()
        self.tracked_tracks: List[STrack] = []
        self.lost_tracks: List[STrack] = []
        self.removed_tracks: List[STrack] = []
    def update_with_detections(self, detections: Detections) -> Detections:
        """
        Updates the tracker with the provided detections and
            returns the updated detection results.
        Parameters:
            detections: The new detections to update with.
        Returns:
            Detection: The updated detection results that now include tracking IDs.
        """
        tracks = self.update_with_tensors(
            tensors=detections2boxes(detections=detections)
        )
        detections = Detections.empty()
        if len(tracks) > 0:
            detections.xyxy = np.array(
                [track.tlbr for track in tracks], dtype=np.float32
            )
            detections.class_id = np.array(
                [int(t.class_ids) for t in tracks], dtype=int
            )
            detections.tracker_id = np.array(
                [int(t.track_id) for t in tracks], dtype=int
            )
            detections.confidence = np.array(
                [t.score for t in tracks], dtype=np.float32
            )
        else:
            detections.tracker_id = np.array([], dtype=int)
        return detections
    def update_with_tensors(self, tensors: np.ndarray) -> List[STrack]:
        """
        Updates the tracker with the provided tensors and returns the updated tracks.
        Parameters:
            tensors: The new tensors to update with.
        Returns:
            List[STrack]: Updated tracks.
        """
        self.frame_id += 1
        activated_starcks = []
        refind_stracks = []
        lost_stracks = []
        removed_stracks = []
        class_ids = tensors[:, 5]
        scores = tensors[:, 4]
        bboxes = tensors[:, :4]
        remain_inds = scores > self.track_thresh
        inds_low = scores > 0.1
        inds_high = scores < self.track_thresh
        inds_second = np.logical_and(inds_low, inds_high)
        dets_second = bboxes[inds_second]
        dets = bboxes[remain_inds]
        scores_keep = scores[remain_inds]
        scores_second = scores[inds_second]
        class_ids_keep = class_ids[remain_inds]
        class_ids_second = class_ids[inds_second]
        if len(dets) > 0:
            """Detections"""
            detections = [
                STrack(STrack.tlbr_to_tlwh(tlbr), s, c)
                for (tlbr, s, c) in zip(dets, scores_keep, class_ids_keep)
            ]
        else:
            detections = []
        """ Add newly detected tracklets to tracked_stracks"""
        unconfirmed = []
        tracked_stracks = []  # type: list[STrack]
        for track in self.tracked_tracks:
            if not track.is_activated:
                unconfirmed.append(track)
            else:
                tracked_stracks.append(track)
        """ Step 2: First association, with high score detection boxes"""
        strack_pool = joint_tracks(tracked_stracks, self.lost_tracks)
        # Predict the current location with KF
        STrack.multi_predict(strack_pool)
        dists = matching.iou_distance(strack_pool, detections)
        dists = matching.fuse_score(dists, detections)
        matches, u_track, u_detection = matching.linear_assignment(
            dists, thresh=self.match_thresh
        )
        for itracked, idet in matches:
            track = strack_pool[itracked]
            det = detections[idet]
            if track.state == TrackState.Tracked:
                track.update(detections[idet], self.frame_id)
                activated_starcks.append(track)
            else:
                track.re_activate(det, self.frame_id, new_id=False)
                refind_stracks.append(track)
        """ Step 3: Second association, with low score detection boxes"""
        # association the untrack to the low score detections
        if len(dets_second) > 0:
            """Detections"""
            detections_second = [
                STrack(STrack.tlbr_to_tlwh(tlbr), s, c)
                for (tlbr, s, c) in zip(dets_second, scores_second, class_ids_second)
            ]
        else:
            detections_second = []
        r_tracked_stracks = [
            strack_pool[i]
            for i in u_track
            if strack_pool[i].state == TrackState.Tracked
        ]
        dists = matching.iou_distance(r_tracked_stracks, detections_second)
        matches, u_track, u_detection_second = matching.linear_assignment(
            dists, thresh=0.5
        )
        for itracked, idet in matches:
            track = r_tracked_stracks[itracked]
            det = detections_second[idet]
            if track.state == TrackState.Tracked:
                track.update(det, self.frame_id)
                activated_starcks.append(track)
            else:
                track.re_activate(det, self.frame_id, new_id=False)
                refind_stracks.append(track)
        for it in u_track:
            track = r_tracked_stracks[it]
            if not track.state == TrackState.Lost:
                track.mark_lost()
                lost_stracks.append(track)
        """Deal with unconfirmed tracks, usually tracks with only one beginning frame"""
        detections = [detections[i] for i in u_detection]
        dists = matching.iou_distance(unconfirmed, detections)
        dists = matching.fuse_score(dists, detections)
        matches, u_unconfirmed, u_detection = matching.linear_assignment(
            dists, thresh=0.7
        )
        for itracked, idet in matches:
            unconfirmed[itracked].update(detections[idet], self.frame_id)
            activated_starcks.append(unconfirmed[itracked])
        for it in u_unconfirmed:
            track = unconfirmed[it]
            track.mark_removed()
            removed_stracks.append(track)
        """ Step 4: Init new stracks"""
        for inew in u_detection:
            track = detections[inew]
            if track.score < self.det_thresh:
                continue
            track.activate(self.kalman_filter, self.frame_id)
            activated_starcks.append(track)
        """ Step 5: Update state"""
        for track in self.lost_tracks:
            if self.frame_id - track.end_frame > self.max_time_lost:
                track.mark_removed()
                removed_stracks.append(track)
        self.tracked_tracks = [
            t for t in self.tracked_tracks if t.state == TrackState.Tracked
        ]
        self.tracked_tracks = joint_tracks(self.tracked_tracks, activated_starcks)
        self.tracked_tracks = joint_tracks(self.tracked_tracks, refind_stracks)
        self.lost_tracks = sub_tracks(self.lost_tracks, self.tracked_tracks)
        self.lost_tracks.extend(lost_stracks)
        self.lost_tracks = sub_tracks(self.lost_tracks, self.removed_tracks)
        self.removed_tracks.extend(removed_stracks)
        self.tracked_tracks, self.lost_tracks = remove_duplicate_tracks(
            self.tracked_tracks, self.lost_tracks
        )
        output_stracks = [track for track in self.tracked_tracks if track.is_activated]
        return output_stracks

使用方法

1.创建ByteTrack跟踪器

# 创建跟踪器
byte_tracker = sv.ByteTrack(track_thresh=0.25, track_buffer=30, match_thresh=0.8, frame_rate=30)

2.对YOLOv8的目标检测结果进行追踪

model = YOLO(path)
results = model(frame)[0]
detections = sv.Detections.from_ultralytics(results)
detections = byte_tracker.update_with_detections(detections)

3.显示追踪结果ID、检测框及标签信息

labels = [
            f"id{tracker_id} {model.model.names[class_id]}"
            for _, _, confidence, class_id, tracker_id
            in detections
        ]
annotated_frame = frame.copy()
annotated_frame = box_annotator.annotate(
            scene=annotated_frame,
            detections=detections,
            labels=labels)

最终检测效果如下:

四、过线计数判断方式

定义过线线段

定义用于统计过线的线段,此处我们直接使用视频水平中心线作为过线线段,代码如下:

cap = cv2.VideoCapture(video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
point_A = [10, int(height/2)]
point_B = [width-10, int(height/2)]
# 定义过线使用的线段点
LINE_START = sv.Point(point_A[0], point_A[1])
LINE_END = sv.Point(point_B[0], point_B[1])
line_zone = MyLineZone(start=LINE_START, end=LINE_END)

判断过线方法

使用目标中心点判断是否过线,核心代码如下:

for i, (xyxy, _, confidence, class_id, tracker_id) in enumerate(detections):
            if tracker_id is None:
                continue
            # 使用中心点判断是否过线
            x1, y1, x2, y2 = xyxy
            center_x = int((x1 + x2) / 2)
            center_y = int((y1 + y2) / 2)
            center_point = Point(x=center_x, y=center_y)
            triggers = [self.vector.is_in(point=center_point)]

上述通过目标框坐标计算出目标中心点坐标center_x ,center_y ,然后通过is_in函数判断过线状态,其中is_in函数定义如下:

def is_in(self, point: Point) -> bool:
        v1 = Vector(self.start, self.end)
        v2 = Vector(self.start, point)
        cross_product = (v1.end.x - v1.start.x) * (v2.end.y - v2.start.y) - (
            v1.end.y - v1.start.y
        ) * (v2.end.x - v2.start.x)
        return cross_product < 0

函数首先根据线段的起点和终点构造两个向量v1和v2,分别表示线段和待判断的点与线段起点的向量。然后计算两个向量的叉积,并判断叉积的正负来确定点的位置关系。若叉积小于0,则点在线段的左侧;若叉积大于0,则点在线段的右侧;若叉积等于0,则点在线段上。根据题设,函数返回的是点在线段不同侧的状态,即当叉积小于0时返回True,否则返回False

判断是否通过线段

上述判断方式只能用于判断目标是否通过线段所在直线,并不是在线段内通过。如果想判断在线段内通过,需要另外加上过线时的判断条件,核心代码如下:

def point_in_line(self, center_point):
        # 判断点是否在线段之间通过
        # 计算向量 AP 与向量 AB 的点积(也称为“标量积”)
        # 点积的绝对值应在 0(包括端点)与向量 AB 的模长平方之间,且方向应与 AB 相同(即点积为正)
        point_A, point_B = self.get_line_points(self.vector)
        xA, yA = point_A
        xB, yB = point_B
        xP, yP = center_point
        AB = (xB - xA, yB - yA)
        AP = (xP - xA, yP - yA)
        # 计算向量 AP 与向量 AB 的点积
        dot_product = AB[0] * AP[0] + AB[1] * AP[1]
        # 计算向量 AB 模长的平方
        AB_length_squared = AB[0] ** 2 + AB[1] ** 2
        # 判断标准:点积的绝对值应在 0(包括端点)与向量 AB 的模长平方之间,且方向应与 AB 相同(即点积为正)
        if 0 <= dot_product <= AB_length_squared and dot_product >= 0:
            within_segment = True
        else:
            within_segment = False
        return within_segment

判断点是否在线段之间通过,通过计算向量 AP向量 AB点积(也称为“标量积”)来进行判断。其中P表示目标中心点,AB表示目标需要通过的线段。

判断标准:点积的绝对值应在 0(包括端点)与向量 AB 的模长平方之间,且方向应与 AB 相同(即点积为正),则表示在线段内通过。

过线效果展示

过线效果展示如下:

以上便是关于此款车辆检测追踪与流量计数系统的原理与代码介绍。基于以上内容,博主用pythonPyqt5开发了一个带界面的软件系统,即文中第二部分的演示内容,能够很好的视频及摄像头进行检测追踪,以及自定义过线计数


相关文章
|
23天前
|
机器学习/深度学习 TensorFlow 算法框架/工具
使用Python实现深度学习模型:智能质量检测与控制
使用Python实现深度学习模型:智能质量检测与控制 【10月更文挑战第8天】
149 62
使用Python实现深度学习模型:智能质量检测与控制
|
2天前
|
机器学习/深度学习 PyTorch TensorFlow
使用Python实现智能食品质量检测的深度学习模型
使用Python实现智能食品质量检测的深度学习模型
23 1
|
22天前
|
运维 安全 网络协议
Python 网络编程:端口检测与IP解析
本文介绍了使用Python进行网络编程的两个重要技能:检查端口状态和根据IP地址解析主机名。通过`socket`库实现端口扫描和主机名解析的功能,并提供了详细的示例代码。文章最后还展示了如何整合这两部分代码,实现一个简单的命令行端口扫描器,适用于网络故障排查和安全审计。
|
23天前
|
数据处理 Python
Python读取大文件的“坑“与内存占用检测
Python读取大文件的“坑“与内存占用检测
42 0
|
24天前
|
安全 Java Python
基于python-django的Java网站全站漏洞检测系统
基于python-django的Java网站全站漏洞检测系统
28 0
|
Python
PYTHON实战两数之和
1. 两数之和 难度:简单 收藏 给定一个整数数组 nums 和一个整数目标值 target,请你在该数组中找出 和为目标值 target 的那 两个 整数,并返回它们的数组下标。 你可以假设每种输入只会对应一个答案。但是,数组中同一个元素在答案里不能重复出现。 你可以按任意顺序返回答案。
188 0
PYTHON实战两数之和
|
11天前
|
安全 数据处理 开发者
Python中的多线程编程:从入门到精通
本文将深入探讨Python中的多线程编程,包括其基本原理、应用场景、实现方法以及常见问题和解决方案。通过本文的学习,读者将对Python多线程编程有一个全面的认识,能够在实际项目中灵活运用。
|
6天前
|
设计模式 开发者 Python
Python编程中的设计模式:工厂方法模式###
本文深入浅出地探讨了Python编程中的一种重要设计模式——工厂方法模式。通过具体案例和代码示例,我们将了解工厂方法模式的定义、应用场景、实现步骤以及其优势与潜在缺点。无论你是Python新手还是有经验的开发者,都能从本文中获得关于如何在实际项目中有效应用工厂方法模式的启发。 ###
|
11天前
|
弹性计算 安全 小程序
编程之美:Python让你领略浪漫星空下的流星雨奇观
这段代码使用 Python 的 `turtle` 库实现了一个流星雨动画。程序通过创建 `Meteor` 类来生成具有随机属性的流星,包括大小、颜色、位置和速度。在无限循环中,流星不断移动并重新绘制,营造出流星雨的效果。环境需求为 Python 3.11.4 和 PyCharm 2023.2.5。
|
4天前
|
数据处理 Python
从零到英雄:Python编程的奇幻旅程###
想象你正站在数字世界的门槛上,手中握着一把名为“Python”的魔法钥匙。别小看这把钥匙,它能开启无限可能的大门,引领你穿梭于现实与虚拟之间,创造属于自己的奇迹。本文将带你踏上一场从零基础到编程英雄的奇妙之旅,通过生动有趣的比喻和实际案例,让你领略Python编程的魅力,激发内心深处对技术的渴望与热爱。 ###