rospy控制小车多点巡航

场景:如何实现小车按照规定的路线自主的巡航
这里我采用小车多点导航来实现,且加入附近约束与超时共同作为条件约束。资源服务当出现优先级高于巡航的任务时,中断小车当前的巡航,从而执行新的/move_base任务。

#!/usr/bin/env python
#coding=utf-8
import rospy
import tf
import time
import PyKDL
import actionlib
from nav_msgs.msg import Path
from actionlib_msgs.msg import *
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
from geometry_msgs.msg import PoseWithCovarianceStamped
from tf_conversions import transformations
from math import radians, copysign, sqrt, pow, pi, atan2, cos, sin
from mux.srv import ControlState

class Navigation_multi:
    def __init__(self,robot_name="robot1", wait_time=1800, nearest_r=2):
        self.robot_name = robot_name
        self.move_base = actionlib.SimpleActionClient(str(self.robot_name)+"/move_base", MoveBaseAction)
        self.move_base.wait_for_server(rospy.Duration(60))
        self.nearest_r=nearest_r
        self.wait_time = wait_time
        rospy.on_shutdown(self.shutdown)
    # 四位数转欧拉角
    def quat_to_angle(self,quat):
        rot = PyKDL.Rotation.Quaternion(quat.x, quat.y, quat.z, quat.w)
        return rot.GetRPY()[2]

    # 当前位置
    def get_amcl_pose(self):
        msg = rospy.wait_for_message(str(self.robot_name)+"/amcl_pose",PoseWithCovarianceStamped,timeout=None)
        return msg.pose.pose.position, self.quat_to_angle(msg.pose.pose.orientation)

    # 当前位置到目标位置之间距离
    def stop_pose_distance(self,pose):
        (position, rotation) = self.get_amcl_pose()
        return sqrt(pow((position.x - pose[0]), 2) + pow((position.y - pose[1]), 2))
    
    # 打印启动位置与目标位置
    def print_point(self, pose):
        position = self.get_amcl_pose()[0]
        rospy.loginfo("Start pose: x:%s y:%s    ---------->    Stop pose: x:%s y:%s", position.x, position.y, pose[0], pose[1])

    def _done_cb(self, status, result):
        rospy.loginfo("navigation done! status:%d result:%s"%(status, result))

    def _active_cb(self):
        rospy.loginfo("[Navi] navigation has be actived")

    def _feedback_cb(self, feedback):
        rospy.loginfo("[Navi] navigation feedback\r\n%s"%feedback)

    # 资源的请求与释放 请求返回True False
    def control_client(self, permission, state):
        rospy.wait_for_service(str(self.robot_name)+"/control_server")
      
        try:
            client = rospy.ServiceProxy(str(self.robot_name)+"/control_server", ControlState)
            response = client(permission,state)
            return response.control
        except:
            return False
     
    def shutdown(self):
        # Always stop the robot when shutting down the node.
        self.control_client(1,0)
        rospy.loginfo("Stopping the robot...")
        self.move_base.cancel_goal()
        rospy.sleep(1)

    # 多目标附近巡航
    def go_on_patrol(self, goals):
        for pose in goals:
            goal = MoveBaseGoal()
            goal.target_pose.header.frame_id = 'map'
            goal.target_pose.header.stamp = rospy.Time.now()
            goal.target_pose.pose.position.x = pose[0]
            goal.target_pose.pose.position.y = pose[1]
            q = transformations.quaternion_from_euler(0.0, 0.0, pose[2]/180.0*pi)
            goal.target_pose.pose.orientation.x = q[0]
            goal.target_pose.pose.orientation.y = q[1]
            goal.target_pose.pose.orientation.z = q[2]
            goal.target_pose.pose.orientation.w = q[3]

            # 向控制发送 优先级:1 | 请求资源:1(1请求资源,0释放资源)
            if self.control_client(1,1):
                # 休眠两秒等待资源释放
                rospy.sleep(2)
                # 发送打印启动--->目标
                self.print_point(pose)
                self.move_base.send_goal(goal, self._done_cb, self._active_cb, self._feedback_cb)
                # 记录启动时间
                start = time.time()
                # 设定检测频率:检测是否到达目标附近频率、检测是否超时频率、检测是否资源被优先级高抢占频率
                rate = rospy.Rate(1)
                while True:
                    if rospy.is_shutdown():
                        rospy.loginfo("Ros is shutdown!")
                        break
                    elif float(self.stop_pose_distance(pose)) < float(self.nearest_r):
                        rospy.loginfo("The robot reaches the nearest constraint")
                        break
                    elif int(time.time()-start) > int(self.wait_time):
                        rospy.loginfo("Time out")
                        break
                    elif self.control_client(1,1) == False:
                        ros.loginfo("Resource occupied")
                        break
                    rate.sleep()
                # 取消目标请求
                self.move_base.cancel_goal()
                # 释放资源
                self.control_client(1,0)
                rospy.sleep(2)
            else:
                rospy.loginfo("Resource request failed")
                rospy.loginfo("Current location \n <------------> \n %s",pose)
                break
            
if __name__ == "__main__":
    rospy.init_node('Navigation_multi',anonymous=True)
    robot_id = rospy.get_param('~robot_num','3')
    robot_name = 'robot'+ str(robot_id)
    goalListX = rospy.get_param('~goalListX', '2.0, 2.0')
    goalListY = rospy.get_param('~goalListY', '2.0, 4.0')
    goalListYaw = rospy.get_param('~goalListYaw', '0, 90.0')
    goals = [[float(x), float(y), float(yaw)] for (x, y, yaw) in zip(goalListX.split(","),goalListY.split(","),goalListYaw.split(","))]
    navi = Navigation_multi(robot_name=robot_name,wait_time=1800, nearest_r=2)
    navi.go_on_patrol(goals)
    rospy.sleep(1)

总结

通过简单优先级服务,防止资源抢占问题

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐