基于YOLOv8与ByteTrack的车辆检测追踪与流量计数系统【python源码+Pyqt5界面+数据集+训练代码】深度学习实战、目标追踪、车辆检测追踪、过线计数、流量统计(2)https://developer.aliyun.com/article/1536911
三、使用ByteTrack进行目标追踪
ByteTrack算法简介
论文地址:https://arxiv.org/abs/2110.06864
源码地址:https://github.com/ifzhang/ByteTrack
ByteTrack算法是一种十分强大且高效的追踪算法
,和其他非ReID的算法一样,仅仅使用目标追踪所得到的bbox进行追踪
。追踪算法使用了卡尔曼滤波预测边界框,然后使用匈牙利算法进行目标和轨迹间的匹配。
ByteTrack算法的最大创新点就是对低分框的使用
,作者认为低分框可能是对物体遮挡时产生的框,直接对低分框抛弃会影响性能,所以作者使用低分框对追踪算法进行了二次匹配,有效优化了追踪过程中因为遮挡造成换id的问题。
- 没有使用ReID特征计算外观相似度
- 非深度方法,不需要训练
- 利用高分框和低分框之间的区别和匹配,有效解决遮挡问题
ByteTrack与其他追踪算法的对比如下图所示,可以看到ByteTrack的性能还是相当不错的。
ByteTrack的实现代码如下:
class ByteTrack: """ Initialize the ByteTrack object. Parameters: track_thresh (float, optional): Detection confidence threshold for track activation. track_buffer (int, optional): Number of frames to buffer when a track is lost. match_thresh (float, optional): Threshold for matching tracks with detections. frame_rate (int, optional): The frame rate of the video. """ def __init__( self, track_thresh: float = 0.25, track_buffer: int = 30, match_thresh: float = 0.8, frame_rate: int = 30, ): self.track_thresh = track_thresh self.match_thresh = match_thresh self.frame_id = 0 self.det_thresh = self.track_thresh + 0.1 self.max_time_lost = int(frame_rate / 30.0 * track_buffer) self.kalman_filter = KalmanFilter() self.tracked_tracks: List[STrack] = [] self.lost_tracks: List[STrack] = [] self.removed_tracks: List[STrack] = [] def update_with_detections(self, detections: Detections) -> Detections: """ Updates the tracker with the provided detections and returns the updated detection results. Parameters: detections: The new detections to update with. Returns: Detection: The updated detection results that now include tracking IDs. """ tracks = self.update_with_tensors( tensors=detections2boxes(detections=detections) ) detections = Detections.empty() if len(tracks) > 0: detections.xyxy = np.array( [track.tlbr for track in tracks], dtype=np.float32 ) detections.class_id = np.array( [int(t.class_ids) for t in tracks], dtype=int ) detections.tracker_id = np.array( [int(t.track_id) for t in tracks], dtype=int ) detections.confidence = np.array( [t.score for t in tracks], dtype=np.float32 ) else: detections.tracker_id = np.array([], dtype=int) return detections def update_with_tensors(self, tensors: np.ndarray) -> List[STrack]: """ Updates the tracker with the provided tensors and returns the updated tracks. Parameters: tensors: The new tensors to update with. Returns: List[STrack]: Updated tracks. """ self.frame_id += 1 activated_starcks = [] refind_stracks = [] lost_stracks = [] removed_stracks = [] class_ids = tensors[:, 5] scores = tensors[:, 4] bboxes = tensors[:, :4] remain_inds = scores > self.track_thresh inds_low = scores > 0.1 inds_high = scores < self.track_thresh inds_second = np.logical_and(inds_low, inds_high) dets_second = bboxes[inds_second] dets = bboxes[remain_inds] scores_keep = scores[remain_inds] scores_second = scores[inds_second] class_ids_keep = class_ids[remain_inds] class_ids_second = class_ids[inds_second] if len(dets) > 0: """Detections""" detections = [ STrack(STrack.tlbr_to_tlwh(tlbr), s, c) for (tlbr, s, c) in zip(dets, scores_keep, class_ids_keep) ] else: detections = [] """ Add newly detected tracklets to tracked_stracks""" unconfirmed = [] tracked_stracks = [] # type: list[STrack] for track in self.tracked_tracks: if not track.is_activated: unconfirmed.append(track) else: tracked_stracks.append(track) """ Step 2: First association, with high score detection boxes""" strack_pool = joint_tracks(tracked_stracks, self.lost_tracks) # Predict the current location with KF STrack.multi_predict(strack_pool) dists = matching.iou_distance(strack_pool, detections) dists = matching.fuse_score(dists, detections) matches, u_track, u_detection = matching.linear_assignment( dists, thresh=self.match_thresh ) for itracked, idet in matches: track = strack_pool[itracked] det = detections[idet] if track.state == TrackState.Tracked: track.update(detections[idet], self.frame_id) activated_starcks.append(track) else: track.re_activate(det, self.frame_id, new_id=False) refind_stracks.append(track) """ Step 3: Second association, with low score detection boxes""" # association the untrack to the low score detections if len(dets_second) > 0: """Detections""" detections_second = [ STrack(STrack.tlbr_to_tlwh(tlbr), s, c) for (tlbr, s, c) in zip(dets_second, scores_second, class_ids_second) ] else: detections_second = [] r_tracked_stracks = [ strack_pool[i] for i in u_track if strack_pool[i].state == TrackState.Tracked ] dists = matching.iou_distance(r_tracked_stracks, detections_second) matches, u_track, u_detection_second = matching.linear_assignment( dists, thresh=0.5 ) for itracked, idet in matches: track = r_tracked_stracks[itracked] det = detections_second[idet] if track.state == TrackState.Tracked: track.update(det, self.frame_id) activated_starcks.append(track) else: track.re_activate(det, self.frame_id, new_id=False) refind_stracks.append(track) for it in u_track: track = r_tracked_stracks[it] if not track.state == TrackState.Lost: track.mark_lost() lost_stracks.append(track) """Deal with unconfirmed tracks, usually tracks with only one beginning frame""" detections = [detections[i] for i in u_detection] dists = matching.iou_distance(unconfirmed, detections) dists = matching.fuse_score(dists, detections) matches, u_unconfirmed, u_detection = matching.linear_assignment( dists, thresh=0.7 ) for itracked, idet in matches: unconfirmed[itracked].update(detections[idet], self.frame_id) activated_starcks.append(unconfirmed[itracked]) for it in u_unconfirmed: track = unconfirmed[it] track.mark_removed() removed_stracks.append(track) """ Step 4: Init new stracks""" for inew in u_detection: track = detections[inew] if track.score < self.det_thresh: continue track.activate(self.kalman_filter, self.frame_id) activated_starcks.append(track) """ Step 5: Update state""" for track in self.lost_tracks: if self.frame_id - track.end_frame > self.max_time_lost: track.mark_removed() removed_stracks.append(track) self.tracked_tracks = [ t for t in self.tracked_tracks if t.state == TrackState.Tracked ] self.tracked_tracks = joint_tracks(self.tracked_tracks, activated_starcks) self.tracked_tracks = joint_tracks(self.tracked_tracks, refind_stracks) self.lost_tracks = sub_tracks(self.lost_tracks, self.tracked_tracks) self.lost_tracks.extend(lost_stracks) self.lost_tracks = sub_tracks(self.lost_tracks, self.removed_tracks) self.removed_tracks.extend(removed_stracks) self.tracked_tracks, self.lost_tracks = remove_duplicate_tracks( self.tracked_tracks, self.lost_tracks ) output_stracks = [track for track in self.tracked_tracks if track.is_activated] return output_stracks
使用方法
1.创建ByteTrack跟踪器
# 创建跟踪器 byte_tracker = sv.ByteTrack(track_thresh=0.25, track_buffer=30, match_thresh=0.8, frame_rate=30)
2.对YOLOv8的目标检测结果进行追踪
model = YOLO(path) results = model(frame)[0] detections = sv.Detections.from_ultralytics(results) detections = byte_tracker.update_with_detections(detections)
3.显示追踪结果ID、检测框及标签信息
labels = [ f"id{tracker_id} {model.model.names[class_id]}" for _, _, confidence, class_id, tracker_id in detections ] annotated_frame = frame.copy() annotated_frame = box_annotator.annotate( scene=annotated_frame, detections=detections, labels=labels)
最终检测效果如下:
四、过线计数判断方式
定义过线线段
定义用于统计过线的线段,此处我们直接使用视频水平中心线作为过线线段
,代码如下:
cap = cv2.VideoCapture(video_path) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) point_A = [10, int(height/2)] point_B = [width-10, int(height/2)] # 定义过线使用的线段点 LINE_START = sv.Point(point_A[0], point_A[1]) LINE_END = sv.Point(point_B[0], point_B[1]) line_zone = MyLineZone(start=LINE_START, end=LINE_END)
判断过线方法
使用目标中心点判断是否过线,核心代码如下:
for i, (xyxy, _, confidence, class_id, tracker_id) in enumerate(detections): if tracker_id is None: continue # 使用中心点判断是否过线 x1, y1, x2, y2 = xyxy center_x = int((x1 + x2) / 2) center_y = int((y1 + y2) / 2) center_point = Point(x=center_x, y=center_y) triggers = [self.vector.is_in(point=center_point)]
上述通过目标框坐标计算出目标中心点坐标center_x
,center_y
,然后通过is_in
函数判断过线状态,其中is_in
函数定义如下:
def is_in(self, point: Point) -> bool: v1 = Vector(self.start, self.end) v2 = Vector(self.start, point) cross_product = (v1.end.x - v1.start.x) * (v2.end.y - v2.start.y) - ( v1.end.y - v1.start.y ) * (v2.end.x - v2.start.x) return cross_product < 0
函数首先根据线段的起点和终点构造两个向量v1和v2,分别表示线段和待判断的点与线段起点的向量
。然后计算两个向量的叉积
,并判断叉积的正负来确定点的位置关系。若叉积小于0,则点在线段的左侧;若叉积大于0,则点在线段的右侧;若叉积等于0,则点在线段上
。根据题设,函数返回的是点在线段不同侧的状态,即当叉积小于0时返回True,否则返回False
。
判断是否通过线段
上述判断方式只能用于判断目标是否通过线段所在直线,并不是在线段内通过。如果想判断在线段内通过,需要另外加上过线时的判断条件,核心代码如下:
def point_in_line(self, center_point): # 判断点是否在线段之间通过 # 计算向量 AP 与向量 AB 的点积(也称为“标量积”) # 点积的绝对值应在 0(包括端点)与向量 AB 的模长平方之间,且方向应与 AB 相同(即点积为正) point_A, point_B = self.get_line_points(self.vector) xA, yA = point_A xB, yB = point_B xP, yP = center_point AB = (xB - xA, yB - yA) AP = (xP - xA, yP - yA) # 计算向量 AP 与向量 AB 的点积 dot_product = AB[0] * AP[0] + AB[1] * AP[1] # 计算向量 AB 模长的平方 AB_length_squared = AB[0] ** 2 + AB[1] ** 2 # 判断标准:点积的绝对值应在 0(包括端点)与向量 AB 的模长平方之间,且方向应与 AB 相同(即点积为正) if 0 <= dot_product <= AB_length_squared and dot_product >= 0: within_segment = True else: within_segment = False return within_segment
判断点是否在线段之间通过,通过计算向量 AP
与向量 AB
的点积
(也称为“标量积”)来进行判断。其中P表示目标中心点,AB表示目标需要通过的线段。
判断标准:点积的绝对值应在 0(包括端点)与向量 AB 的模长平方之间,且方向应与 AB 相同(即点积为正),则表示在线段内通过。
过线效果展示
过线效果展示如下:
以上便是关于此款车辆检测追踪与流量计数系统
的原理与代码介绍。基于以上内容,博主用python
与Pyqt5
开发了一个带界面的软件系统,即文中第二部分的演示内容,能够很好的视频及摄像头进行检测追踪,以及自定义过线计数
。