1. 实训目的

基于opencv中的KCF算法实现小车在gazebo环境中的人脸跟踪任务。首先了解KCF算法基本原理,其次了解KCF算法在opencv中的相关实现,最后编写python脚本,结合人脸检测知识,完成人脸跟踪脚本。

  1. 实训操作步骤

  • 编写人脸跟踪脚本。

  • 基于KCF的人脸跟踪

跟踪是指在视频的连续帧中定位对象。要实现对人脸的跟踪,首先要正确识别出人脸,即要完成我们此前学习过的人脸检测任务。在人脸检测任务的基础上,设计程序,将获得的第一个人脸检测矩形框作为后续跟踪任务的目标。经过目标跟踪算法的处理,最终实现对人脸的持续跟踪。

KCF是一个经典的目标跟踪算法,全称为核相关滤波(Kernel Correlation Filter)算法。其工作流程就是在目标追踪过程中, 将初始目标检测框提取的特征进行循环移位操作,得到初始的样本训练集,并通过高斯函数为这些样本根据样本中心距目标的远近标记权值,通过样本与权重标签来训练相关滤波器,用于在后续的目标搜索过程中评估周围测试样本的目标响应程度,选择出响应较高的候选框,并使用最新的跟踪结果更新训练集,进一步更新相关滤波器,从而达成对目标的持续跟踪。

我们利用OpenCV3.4.15的扩展库OpenCV-contrib3.4.15中的目标跟踪API来实现基于KCF的人脸跟踪算法。与此同时,我们使用OpenCV中的卡尔曼滤波器在KCF目标跟踪结果的基础上进一步预测下一帧跟踪框的目标中心,即绘制瞄点。

在工作空间的src目录下打开终端,输入如下命令建立功能包:

$  catkin_create_pkg proj04_face_tracker std_msgs rospy sensor_msgs cv_bridge

在功能包下,建立/cascades目录,用于存放opencv的人脸检测分类器文件haarcascade_frontalface_default.xml与haarcascade_profileface.xml。建立/scripts目录,存放图像处理基类文件ros_opencv.py与人脸检测文件face_detector.py,我们的人脸跟踪脚本将基于上述两个脚本进行编写。

ros_opencv.py:

  1. #!/usr/bin/env python  
    
    # coding:utf-8  
    
      
    
    import rospy  
    
    import cv2  
    
    from cv_bridge import CvBridge,CvBridgeError  
    
    from sensor_msgs.msg import Image  
    
    from geometry_msgs.msg import  Twist  
    
    import numpy as np  
    
    import time  
    
    import os, sys  
    
      
    
    class ROS2OPENCV(object):  
    
        # 基类的初始化方法,实例化时自动调用  
    
        def __init__(self,node_name):  
    
            self.node_name = node_name  
    
            os.chdir(sys.path[0])  
    
            rospy.init_node(node_name)  
    
            rospy.on_shutdown(self.shutdown)  
    
            ### 定义ros图像消息的订阅者和处理后图像的发布者  
    
            # 发布/display_image话题,类型为Image,队列大小设置为1  
    
            self.rgb_image_pub = rospy.Publisher("/display_image",Image,queue_size=1)  
    
            # 订阅/camera/rgb/image_raw图像话题,回调函数rgb_image_callback(self, data)  
    
            # 该回调函数实现了图像处理、帧率输出  
    
            self.rgb_topic = rospy.get_param("~rgb_image_topic", "/camera/rgb/image_raw")  
    
            self.rgb_image_sub = rospy.Subscriber(self.rgb_topic,Image,self.rgb_image_callback)  
    
            # 订阅深度图像  
    
            self.depth_topic = rospy.get_param("~depth_image_topic", "/camera/depth/image_raw")  
    
            self.depth_image_sub = rospy.Subscriber(self.depth_topic,Image,self.depth_image_callback)  
    
            self.move = rospy.get_param("~if_move", False)   
    
            # 定义机器人速度消息的发布者,用于机器人运动驱动   
    
            self.cmd_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=1)  
    
            self.bridge = CvBridge()  
    
            self.frame = None  
    
            self.frame_width = None  
    
            self.frame_height = None  
    
            self.frame_size = None  
    
            self.track_box = None  
    
            self.detect_box = None  
    
            self.display_box = None  
    
            self.marker_image = None  
    
            self.processed_image = None   
    
            self.display_image = None  
    
            self.target_center_x = None  
    
            self.cascade_frontal = None  
    
            self.cascade_profile = None  
    
            self.cps = 0  
    
            self.cps_values = list()  
    
            self.cps_n_values = 20  
    
            self.linear_speed = 0.0  
    
            self.angular_speed = 0.0  
    
              
    
      
    
        def rgb_image_callback(self, data):  
    
            start = time.time()  
    
            frame = self.convert_image(data)  
    
            if self.frame is None:  
    
                self.frame = frame.copy()  
    
                self.marker_image = np.zeros_like(frame)  
    
                self.frame_size = (frame.shape[1], frame.shape[0])  
    
                self.frame_width, self.frame_height = self.frame_size  
    
            else:  
    
                self.frame = frame.copy()  
    
                self.marker_image = np.zeros_like(frame)  
    
                # 调用图像处理方法,获取处理后图像  
    
                processed_image = self.process_image(frame)  
    
                self.processed_image = processed_image.copy()  
    
                self.display_image = cv2.bitwise_or(self.processed_image, self.marker_image)  
    
                # 绘制跟踪框  
    
                if self.track_box is not None and self.is_rect_nonzero(self.track_box):  
    
                    tx, ty, tw, th = self.track_box  
    
                    cv2.rectangle(self.display_image,(tx,ty),(tx+tw,ty+th),(255,255,0),2)  
    
      
    
                    ### 计算机器人移动速度  
    
                    # 计算跟踪目标中心x坐标  
    
                    self.target_center_x = int(tx+tw/2)  
    
                    # 计算目标中心与视野中心x坐标的偏移量  
    
                    offset_x = (self.target_center_x - self.frame_width/2)  
    
                    # 计算目标区域的面积  
    
                    target_area = tw*th  
    
                    # 若允许机器人跟踪目标  
    
                    if self.move:  
    
                        # 目标距离较远时速度大  
    
                        if target_area < 25000:  
    
                            linear_vel = 0.3  
    
                        # 到达目标时停止前进  
    
                        elif target_area > 45000:  
    
                            linear_vel = 0.0  
    
                        # 接近目标时速度减慢  
    
                        else:  
    
                            linear_vel = 0.1  
    
                        # 根据偏移量计算角速度  
    
                        angular_vel = float(-0.001 * (offset_x))  
    
                        # 设置角速度极大极小值  
    
                        if angular_vel > 0.2:  
    
                            angular_vel = 0.2  
    
                        if angular_vel < -0.2:  
    
                            angular_vel = -0.2  
    
                        # 调用update_cmd方法发布速度消息  
    
                        self.update_cmd(linear_vel,angular_vel)  
    
                # 绘制检测框  
    
                elif self.detect_box is not None and self.is_rect_nonzero(self.detect_box):  
    
                    dx, dy, dw, dh = self.detect_box  
    
                    cv2.rectangle(self.display_image, (dx, dy), (dx+dw, dy+dh), (255, 50, 50), 2)  
    
                ### 计算时间与帧率,并打印至图像  
    
                # self.cps_values包含若干条最新fps数据,最多为20条,对其求和取平均得到cps值  
    
                end = time.time()  
    
                duration = end - start  
    
                fps = int(1.0/duration)  
    
                self.cps_values.append(fps)  
    
                if len(self.cps_values)>self.cps_n_values:  
    
                    self.cps_values.pop(0)  
    
                self.cps = int(sum(self.cps_values)/len(self.cps_values))  
    
                # 根据不同的窗体大小设置字体位置  
    
                font_face = cv2.FONT_HERSHEY_SIMPLEX  
    
                font_scale = 0.5  
    
                if self.frame_size[0] >= 640:  
    
                    vstart = 25  
    
                    voffset = int(50+self.frame_size[1]/120.)  
    
                elif self.frame_size[0] == 320:  
    
                    vstart = 15  
    
                    voffset = int(35+self.frame_size[1]/120.)  
    
                else:  
    
                    vstart = 10  
    
                    voffset = int(20 + self.frame_size[1] / 120.)  
    
                # cv2.putText函数实现在画面上输出文字信息  
    
                cv2.putText(self.display_image, "CPS: " + str(self.cps), (10, vstart),font_face, font_scale,(255, 255, 0))  
    
                cv2.putText(self.display_image, "RES: " + str(self.frame_size[0]) + "X"+ str(self.frame_size[1]), (10, voffset), font_face, font_scale, (255, 255, 0))  
    
      
    
                # 发布处理后的图像  
    
                try:  
    
                    self.rgb_image_pub.publish(self.bridge.cv2_to_imgmsg(self.display_image,"bgr8"))  
    
                except CvBridgeError as e:  
    
                    print(e)  
    
      
    
      
    
        # 深度图像回调函数  
    
        def depth_image_callback(self,data):  
    
            depth_frame = self.convert_depth_image(data)  
    
          
    
        # 转换ros图像为cv2图像  
    
        def convert_image(self,ros_image):  
    
            try:  
    
                cv_image = self.bridge.imgmsg_to_cv2(ros_image,"bgr8")  
    
                cv_image = np.array(cv_image,dtype= np.uint8)  
    
                return cv_image  
    
            except CvBridgeError  as e:  
    
                print (e)  
    
      
    
        # 转换ros深度图为cv2图像  
    
        def convert_depth_image(self,ros_image):  
    
            try:  
    
                depth_image = self.bridge.imgmsg_to_cv2(ros_image,"passthrough")  
    
                depth_image =np.array(depth_image,dtype=np.float32)  
    
                return depth_image  
    
            except CvBridgeError as e:  
    
                print (e)  
    
      
    
        # 图像处理方法  
    
        def process_image(self,frame):  
    
            return frame  
    
      
    
        # 深度图处理方法  
    
        def process_depth_image(self,frame):  
    
            return frame  
    
      
    
        # 判断矩形框是否存在  
    
        def is_rect_nonzero(self,rect):  
    
            try:  
    
                (_,_,w,h) = rect  
    
                return ((w>0)and(h>0))  
    
            except :  
    
                try:  
    
                    ((_,_),(w,h),a) = rect  
    
                    return (w > 0)and (h > 0)  
    
                except:  
    
                    return False  
    
      
    
        # shut down  
    
        def shutdown(self):  
    
            rospy.loginfo("Shutting down node")  
    
      
    
        # 构造速度消息并发布  
    
        def update_cmd(self, linear_speed, angular_speed):  
    
            self.linear_speed = linear_speed  
    
            self.angular_speed = angular_speed  
    
            move_cmd = Twist()  
    
            move_cmd.linear.x = linear_speed  
    
            move_cmd.angular.z = angular_speed  
    
            self.cmd_pub.publish(move_cmd)  
    
      
    
      
    
    if __name__ == "__main__":  
    
        try:  
    
            ROS2OPENCV("ros_opencv")  
    
            rospy.loginfo("Please subscribe the ROS image.")  
    
            rospy.spin()  
    
        except KeyboardInterrupt:  
    
            print ("Shutting down cv_bridge_test node")  

face_detector.py:

  1. #!/usr/bin/env python  
    
    # -*- coding: utf-8 -*-  
    
    import rospy  
    
    import cv2  
    
    from ros_opencv import ROS2OPENCV  
    
    import os  
    
    import sys  
    
      
    
    class FaceDetector(ROS2OPENCV):  
    
        def __init__(self, node_name):  
    
            super(FaceDetector, self).__init__(node_name)  
    
            self.detect_box = None  
    
            self.result = None  
    
            self.count = 0  
    
            # 设置级联表的参数,优化人脸识别  
    
            self.haar_scaleFactor = 1.2  
    
            self.haar_minNeighbors = 3  
    
            self.haar_minSize = 40  
    
            self.haar_maxSize = 60  
    
            # 获取haar特征的级联表的XML文件, 其中存放的是训练好的分类器
    
            cascade_frontal = "../cascades/haarcascade_frontalface_default.xml"  
    
            cascade_profile = "../cascades/haarcascade_profileface.xml"  
    
            # 使用级联表初始化haar特征检测器  
    
            self.cascade_frontal = cv2.CascadeClassifier(cascade_frontal)  
    
            self.cascade_profile = cv2.CascadeClassifier(cascade_profile)  
    
      
    
        def process_image(self, frame):  
    
            src = frame.copy()  
    
            # 创建灰度图像  
    
            gray = cv2.cvtColor(src, cv2.COLOR_BGR2GRAY)  
    
            # 对图像进行直方图均衡化操作,减少光线影响,增加对比度  
    
            gray = cv2.equalizeHist(gray)  
    
        # 调用人脸检测方法,获取人脸检测结果
    
            faces = self.detect_face(gray)  
    
            result = src.copy()  
    
            self.result = result  
    
            # 绘制人脸位置  
    
            # (x,y,w,h)分别代表人脸框的左上角点的坐标,人脸框的宽度和长度  
    
            for (x, y, w, h) in faces:  
    
                result = cv2.rectangle(result, (x, y), (x + w, y + h), (255, 0, 0), 2)  
    
            return result  
    
        
    
        def detect_face(self, input_image):  
    
            faces = []  
    
            # 首先匹配正面人脸的模型  
    
            if self.cascade_frontal:  
    
          # detectMultiScale为人脸检测的核心函数
    
                faces = self.cascade_frontal.detectMultiScale(input_image, self.haar_scaleFactor ,  
    
                                    self.haar_minNeighbors,cv2.CASCADE_SCALE_IMAGE,(self.haar_minSize,self.haar_maxSize))  
    
            # 如果正面人脸匹配失败,那么就尝试匹配侧面人脸的模型  
    
                if len(faces) == 0 and self.cascade_profile:  
    
                    faces = self.cascade_profile.detectMultiScale(input_image, self.haar_scaleFactor ,  
    
                                        self.haar_minNeighbors,cv2.CASCADE_SCALE_IMAGE,(self.haar_minSize,self.haar_maxSize))  
    
            return faces  
    
      
    
      
    
    if __name__ == '__main__':  
    
        try:  
    
            node_name = "face_detector"  
    
            FaceDetector(node_name)  
    
            rospy.loginfo("Please subscribe the ROS image.")  
    
            rospy.spin()  
    
        except KeyboardInterrupt:  
    
            print("shutting down face detector node.")  

在/scripts目录中新建一个python脚本kcf_kalman_tracker.py,代码如下。

kcf_kalman_tracker.py

  1. #!/usr/bin/env python  
    
    # coding:utf-8  
    
    import time  
    
    import rospy  
    
    import cv2  
    
    import numpy as np  
    
    from face_detector import FaceDetector  
    
      
    
    class KcfKalmanTracker(FaceDetector):  
    
        def __init__(self, node_name):  
    
            self.flag = 0  
    
            super(KcfKalmanTracker, self).__init__(node_name)  
    
            # 初始化KCF跟踪器  
    
            self.tracker = cv2.TrackerKCF_create()  
    
            self.detect_box = None  
    
            self.track_box = None  
    
            self.status = None  
    
            self.result = None  
    
            # 调整haar人脸检测参数,降低人脸误判率  
    
            self.haar_minNeighbors = 5  
    
            # 允许机器人移动  
    
            self.move = 'true'  
    
            # 初始化卡尔曼滤波器,用于瞄点  
    
            self.kalman = cv2.KalmanFilter(4, 2)  
    
            self.kalman.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32)  
    
            self.kalman.transitionMatrix = np.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0  
    
                , 0, 1]], np.float32)  
    
            self.kalman.processNoiseCov = np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0], [0, 0,  
    
                                                                                               0, 1]], np.float32) * 0.03  
    
            self.measurement = np.array((2, 1), np.float32)  
    
            self.prediction = np.array((2, 1), np.float32)  
    
     
    
        def process_image(self, frame):  
    
            result = frame.copy()  
    
            if self.flag == 0:  
    
                # 等待类完成初始化  
    
                time.sleep(2)  
    
                self.flag = 1  
    
                print ("loadding---")  
    
            # 若检测框与跟踪框均不存在,调用get_track_box方法获取跟踪目标  
    
            if self.detect_box is None or self.track_box is None:  
    
                result = self.get_track_box(frame)  
    
            # 若已经获取到跟踪目标,调用process_track方法进行目标跟踪处理  
    
            if self.status:  
    
                result = self.process_track(result)  
    
            return result  
    
      
    
        # 使用人脸检测结果初始化跟踪器目标  
    
        def get_track_box(self, frame):  
    
            # 正在检测……  
    
            print ("detecting---")  
    
            # 建立数据副本  
    
            src = frame.copy()  
    
            # 基本图像处理 灰度化与均衡化  
    
            gray = cv2.cvtColor(src, cv2.COLOR_BGR2GRAY)  
    
            gray = cv2.equalizeHist(gray)  
    
            # 调用人脸检测方法获取初始检测框  
    
            faces = self.detect_face(gray)  
    
            # 再次建立数据副本,用于保存结果  
    
            result = src.copy()  
    
            self.result = result  
    
            # 遍历人脸位置列表  
    
            for (x, y, w, h) in faces:  
    
                # 生成初始检测框  
    
                self.detect_box = [x, y, w, h]  
    
                # 绘制初始检测框到result  
    
                result = cv2.rectangle(result, (x, y), (x + w, y + h), (255,0, 0), 2)  
    
            try:  
    
                # 如果检测框不存在或检测框存在但大小为0  
    
                if self.detect_box is None or not self.is_rect_nonzero(self.detect_box):  
    
                    # 返回原数据  
    
                    return frame  
    
                # 建立数据副本  
    
                src = frame.copy()  
    
                # 如果检测框存在,但跟踪框不存在或跟踪框存在但大小为0  
    
                if self.track_box is None or not self.is_rect_nonzero(self.track_box):  
    
                    # 从检测框创建跟踪框  
    
                    self.track_box = self.detect_box  
    
                    # 判断跟踪器是否已经成功初始化  
    
                    if self.tracker is None:  
    
                        raise Exception("tracker not init")  
    
                    # 使用检测框初始化跟踪器,保存初始化结果  
    
                    self.status = self.tracker.init(src, self.track_box)  
    
                    # 若跟踪器未能初始化成功,输出失败信息  
    
                    if not self.status:  
    
                        raise Exception("tracker initial failed")  
    
                    # 否则,输出成功信息  
    
                    else:  
    
                        print ("tracker init succeed")  
    
            except:  
    
                print("tracker initial failed")  
    
                pass  
    
            return result  
    
      
    
        # 目标跟踪函数  
    
        def process_track(self, frame):  
    
            # 调用track方法,获取跟踪区域  
    
            self.track_box = self.track(frame)  
    
            print ("tracking---")  
    
            return frame  
    
      
    
        # 获取跟踪器对当前图像帧跟踪的结果区域,并将跟踪中心作为观测值输入卡尔曼滤波器,获取并绘制对下一时刻的预测结果,即瞄点  
    
        def track(self, frame):  
    
            # 向跟踪器中输入图像,获取跟踪器的跟踪状态status及结果coord(x, y, w, h)  
    
            status, coord = self.tracker.update(frame)  
    
            # 计算跟踪中心: center_x = x + w / 2 , center_y = y + h / 2  
    
            center = np.array([[np.float32(coord[0] + coord[2] / 2)], [np.float32(coord[1] + coord[3] / 2)]])  
    
            # 将跟踪中心作为观测输入卡尔曼滤波器  
    
            self.kalman.correct(center)  
    
            # 获取卡尔曼滤波预测结果  
    
            self.prediction = self.kalman.predict()  
    
            # 绘制预测结果,即瞄点  
    
            cv2.circle(frame, (int(self.prediction[0]), int(self.prediction[1])), 4, (255, 60, 100), 2)  
    
            # 将跟踪区域取整返回  
    
            round_coord = (int(coord[0]), int(coord[1]), int(coord[2]), int(coord[3]))  
    
            return round_coord  
    
      
    
    if __name__ == '__main__':  
    
        try:  
    
            node_name = "kcfkalmantracker"  
    
            KcfKalmanTracker(node_name)  
    
            rospy.spin()  
    
        except KeyboardInterrupt:  
    
            print ("Shutting down kcfkalmantracker node.")  

以上代码生成了一个继承类KcfKalmanTracker,继承了人脸检测类FaceDetector。该脚本通过如下代码引入了同目录下的人脸检测类

from face_detector import FaceDetector 

其中人脸检测类FaceDetector继承了ROS 与 Opencv 接口类ROS2OPENCV。

KcfKalmanTracker中包含了如下方法:

1) 类的构造方法,用于类的初始化。       

在类的构造方法中,完成了对KCF跟踪器与卡尔曼滤波器的创建,同时覆盖了父类中两个重要成员变量的值。如下代码所示:

  1.         # 初始化KCF跟踪器  
    
            self.tracker = cv2.TrackerKCF_create()  
    
            # 调整haar人脸检测参数,降低人脸误判率  
    
            self.haar_minNeighbors = 5  
    
            # 允许机器人移动  
    
            self.move = 'true'  
    
            # 初始化卡尔曼滤波器,用于瞄点  
    
            self.kalman = cv2.KalmanFilter(4, 2)  
    
            self.kalman.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32)  
    
            self.kalman.transitionMatrix = np.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0  
    
                , 0, 1]], np.float32)  
    
            self.kalman.processNoiseCov = np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0], [0, 0,  
    
                                                                                               0, 1]], np.float32) * 0.03  
    
            self.measurement = np.array((2, 1), np.float32)  
    
            self.prediction = np.array((2, 1), np.float32)  

haar_minNeighbors参数是指当对一个对象的Haar检测框数量超过了该值时,判断检测对象为人脸。提高该值则有助于减少人脸检测的误判率,防止人脸跟踪获取到错误的识别目标。move 变量则是基类中用于判断进行小车运动驱动与否的标志,如下代码所示:

  1. # 若允许机器人跟踪目标  
    
    if self.move:  
    
        # 目标距离较远时速度大  
    
        if target_area < 25000:  
    
            linear_vel = 0.3  
    
        # 到达目标时停止前进  
    
        elif target_area > 45000:  
    
            linear_vel = 0.0  
    
        # 接近目标时速度减慢  
    
        else:  
    
            linear_vel = 0.1  
    
        # 根据偏移量计算角速度  
    
        angular_vel = float(-0.01 * (offset_x))  
    
        # 设置角速度极大极小值  
    
        if angular_vel > 0.1:  
    
            angular_vel = 0.1  
    
        if angular_vel < -0.1:  
    
            angular_vel = -0.1  
    
        # 调用update_cmd方法发布速度消息  
    
        self.update_cmd(linear_vel,angular_vel)

将move值置为true,就可以根据上面的判断条件发布小车速度指令,实现小车跟踪效果。

2) 重写了基类的process_image方法,用于控制人脸检测与人脸跟踪的整体流程。

process_image方法中,当检测框与跟踪框均不存在时,代表跟踪器尚未初始化,此时调用get_track_box方法检测人脸,获取跟踪目标,并使用初始目标初始化检测器。若已经获取到跟踪目标,调用process_track方法进行目标跟踪处理。

  1.         # 若检测框与跟踪框均不存在,调用get_track_box方法获取跟踪目标  
    
            if self.detect_box is None or self.track_box is None:  
    
                result = self.get_track_box(frame)  
    
            # 若已经获取到跟踪目标,调用process_track方法进行目标跟踪处理  
    
            if self.status:  
    
                result = self.process_track(result)  

3) get_track_box方法,使用人脸检测结果初始化跟踪器目标。

get_track_box方法在进行了基本的图像处理后调用人脸检测父类中的detect_face方法,获取人脸检测框位置列表,在检测到人脸的情况下,使用检测结果初始化跟踪器。

  1.     # 使用人脸检测结果初始化跟踪器目标  
    
        def get_track_box(self, frame):  
    
            # 正在检测……  
    
            print ("detecting---")  
    
            # 建立数据副本  
    
            src = frame.copy()  
    
            # 基本图像处理 灰度化与均衡化  
    
            gray = cv2.cvtColor(src, cv2.COLOR_BGR2GRAY)  
    
            gray = cv2.equalizeHist(gray)  
    
            # 调用人脸检测方法获取初始检测框  
    
            faces = self.detect_face(gray)  
    
            # 再次建立数据副本,用于保存结果  
    
            result = src.copy()  
    
            self.result = result  
    
            # 遍历人脸位置列表  
    
            for (x, y, w, h) in faces:  
    
                # 生成初始检测框  
    
                self.detect_box = [x, y, w, h]  
    
                # 绘制初始检测框到result  
    
                result = cv2.rectangle(result, (x, y), (x + w, y + h), (255,0, 0), 2)  
    
            try:  
    
                # 如果检测框不存在或检测框存在但大小为0  
    
                if self.detect_box is None or not self.is_rect_nonzero(self.detect_box):  
    
                    # 返回原数据  
    
                    return frame  
    
                # 建立数据副本  
    
                src = frame.copy()  
    
                # 如果检测框存在,但跟踪框不存在或跟踪框存在但大小为0  
    
                if self.track_box is None or not self.is_rect_nonzero(self.track_box):  
    
                    # 从检测框创建跟踪框  
    
                    self.track_box = self.detect_box  
    
                    # 判断跟踪器是否已经成功初始化  
    
                    if self.tracker is None:  
    
                        raise Exception("tracker not init")  
    
                    # 使用检测框初始化跟踪器,保存初始化结果  
    
                    self.status = self.tracker.init(src, self.track_box)  
    
                    # 若跟踪器未能初始化成功,输出失败信息  
    
                    if not self.status:  
    
                        raise Exception("tracker initial failed")  
    
                    # 否则,输出成功信息  
    
                    else:  
    
                        print ("tracker init succeed")  
    
            except:  
    
                print("tracker initial failed")  
    
                pass  
    
            return result  

4)   process_track方法,调用track方法,获取并返回跟踪目标区域。

  1.     def process_track(self, frame):  
    
            # 调用track方法,获取跟踪区域  
    
            self.track_box = self.track(frame)  
    
            print ("tracking---")  
    
            return frame  

5)   track方法,获取跟踪器对当前图像帧跟踪的结果区域,并将跟踪中心作为观测值输入卡尔曼滤波器,获取并绘制对下一时刻的预测结果,即瞄点。

track方法使用了KCF跟踪器的update()函数,为跟踪器不断输入当前帧,并获取跟踪状态status及跟踪框位置coord(x,y,w,h)。根据coord数组,计算出当前目标的中心坐标,调用卡尔曼滤波器的correct()函数,作为观测值。后调用predict()函数获取卡尔曼滤波预测结果,并绘制在画面上,作为对下一帧图像目标区域的瞄点。

  1.     def track(self, frame):  
    
            # 向跟踪器中输入图像,获取跟踪器的跟踪状态status及结果coord(x, y, w, h)  
    
            status, coord = self.tracker.update(frame)  
    
            # 计算跟踪中心: center_x = x + w / 2 , center_y = y + h / 2  
    
            center = np.array([[np.float32(coord[0] + coord[2] / 2)], [np.float32(coord[1] + coord[3] / 2)]])  
    
            # 将跟踪中心作为观测输入卡尔曼滤波器  
    
            self.kalman.correct(center)  
    
            # 获取卡尔曼滤波预测结果  
    
            self.prediction = self.kalman.predict()  
    
            # 绘制预测结果,即瞄点  
    
            cv2.circle(frame, (int(self.prediction[0]), int(self.prediction[1])), 4, (255,  
    
                                                                                      60, 100), 2)  
    
            # 将跟踪区域取整返回  
    
            round_coord = (int(coord[0]), int(coord[1]), int(coord[2]), int(coord[3]))  
    
            return round_coord  

程序运行流程如下:

  • 程序入口:主函数,在主函数中实例化KcfKalmanTracker类。

  • 进入类的构造方法,执行完毕返回主函数

  • rospy.spin()不断触发基类的回调函数rgb_image_callback(),其中调用图像处理方法process_image()

  • process_image()不断判断是否存在检测框与跟踪框,否则进入get_track_box()方法,是则在跟踪器已经初始化成功的情况下调用process_track()方法。

  • 进入get_track_box()方法,调用detect_face()方法,获取返回值,调用跟踪器初始化函数,返回人脸检测框图像。

  • 进入process_track()方法,调用track()方法。

  • 进入track()方法,返回跟踪区域数组。

  • process_track()方法获取到track()返回值,赋值给track_box。

  • rgb_image_callback()判断获取到跟踪框,执行运动驱动代码。

  • 循环往复,持续进行人脸跟踪任务,直到画面中检测不到初始目标。

给脚本添加可执行权限,并将脚本添加到CmakeList中后,在终端中输入如下命令启动人脸跟踪环境:

$ export TURTLEBOT3_MODEL=waffle_pi

$ source devel/setup.bash

$ roslaunch billboard tb3_house_with_one_face_board.launch

        另起终端二,输入如下命令启动人脸跟踪脚本:

$ source devel/setup.bash

$ rosrun proj04_face_tracker kcf_kalman_tracker.py:

另起终端三,输入如下命令打开RViz,并点击add订阅/display_image与原始相机图像。

$ rosrun rviz rviz

待rviz订阅显示出画面后,ctrl+c停止人脸跟踪脚本,另起终端四,输入如下命令打开键盘控制节点,控制小车停止运动。

$ rosrun teleop_twist_keyboard teleop_twist_keyboard.py

此时,重置Gazebo环境,待小车回到起点并静止不动后,再次启动人脸跟踪脚本,此时就可以完整观察小车的运动过程了。

得到以下结果:

中间紫色的圆点即为卡尔曼滤波计算出的瞄点,蓝色框则为跟踪框,可以观察到小车缓缓调整方向,向人脸标识牌的方向前进。

然而,小车的速度并没有如基类代码中写的那样,根据目标的远近调整速度的大小,而是一直维持前进的速度,直到撞击墙壁,检测不到人脸后才停止。

在基类代码中,判断目标距离的手段是计算跟踪框的面积大小。理论上,目标面积较小说明此时小车距离目标较远;目标面积逐渐增大说明小车正朝着目标前进;而当目标面积大于某一个值时,说明小车已经距离目标十分接近,该停止前进了。

然而KCF是一种预设了跟踪框大小的算法,即在面对跟踪目标尺寸的剧烈变化时并不能适应性的调整跟踪框的尺寸,而是一直维持着初始化时那个初始检测框的尺寸。跟踪框的面积大小始终没有变化,导致了小车速度无法调节,一直前进直至撞击墙壁。

这个问题有许多优化的方法,例如,能否一边执行人脸跟踪,一边执行人脸检测,用检测框代替跟踪框计算面积判断距离;或者,间歇性使用人脸检测结果重置人脸跟踪,动态变化跟踪框的大小;又或者,改进KCF算法本身,使之能适应目标尺寸的变化……这些问题留待读者去探索、研究。

      

  1. 实训结果

  • 进一步了解python中的继承特性,并学会编写继承类。

  • 学会使用opencv中的KCF算法接口,乃至其他目标跟踪算法。

  • 学习了卡尔曼滤波的opencv基本实现。

  1. 附录:环境配置

由于opencv3+中的KCF算法被迁移到了opencv-contrib库中,因此我们需要对应安装opencv-contrib库到系统环境。然而ros安装时默认安装的opencv版本为3.2,经过测试不可用KCF等跟踪器算法。pip安装3.2、3.4.24.0.0版本opencv-contrib库以及源码编译安装3.2+contrib扩展库经过测试均不可用。现提供opencv3.4.15+contrib的源码编译版本,拷贝至usr/local下如下图:

按如下步骤修改环境变量:

$ sudo gedit ~/.bashrc

在最后添加如下代码:

export PYTHONPATH=$PYTHONPATH:/usr/local/opencv3.4.15/lib/python2.7/dist-packages

终端依次输入:

$ source ~/.bashrc

$ python2

$ import cv2

$ cv2.__version__

查看版本是否切换成功。

Logo

音视频技术社区,一个全球开发者共同探讨、分享、学习音视频技术的平台,加入我们,与全球开发者一起创造更加优秀的音视频产品!

更多推荐