• 发文
  • 评论
  • 微博
  • 空间
  • 微信

slam技术讲堂之Actionlib 机制(一)

说科技 2020-11-10 15:49 发文


一、Actionlib的简介

本期文章同大家分享下ros 中actionlib机制的使用。简单说,actionlib机制是功能更强大的service/client机制。更多信息,欢迎关注EAI微信公众号。

在某些情况下,服务需要很长时间才能执行完毕,这时用户往往希望获得执行过程中的状态反馈或中途取消任务。比如在机器人移动过程中,希望获得机器人当前的姿态、移动的距离等信息或立刻取消当前动作并执行下一动作,这时使用actionlib机制就变的非常简单。

二、Actionlib的使用

1、以EAI科技的D1底盘为例(系统环境:Ubuntu-14.04,ROS-indigo),介绍actionlib的使用方法及遇到的问题,提醒大家避免我们踩过的坑。

2、适合用户:了解ROS及linux基础操作。

3、目标:用actionlib 机制改写D1使用手册中前进一米、原地旋转360度的python脚本,使其在执行过程中实时反馈已前进距离或已旋转角度;同时,每接收到新任务,立刻停止当前任务并执行新任务。

4、总体思路:

1)在D1中添加action依赖;

2)编写service和client;

3)运行并查看反馈过程。

三、Actionlib的使用方法

1、在D1中添加action依赖

1)远程登录到D1(ssh eaibot@192.168.31.200,密码eaibot)

2)路径~/dashgo_ws/src/dashgo/dashgo_tools下, 创建“action”文件夹并在其中创建check_msg.action文本,包含如下内容(goal、result、feedback间用“---”分割,且类型是float32而不是float):

#goal definition

float32 goal # 前进距离或旋转角度

float32 vel # 线速度或角速度

float32 error # 容忍的误差

---

#result definition

bool issuccess # 是否成功

---

#feedback

float32 accomplished # 已前进距离或已旋转角度

3)在dashgo_tools包的(路径:~/dashgo_ws/src/dashgo/dashgo_tools)CMakeLists.txt和package.xml中添加依赖:

a)在CMakeLists.txt中添加(请先找到对应函数并修改,而不要简单复制):

find_package(catkin REQUIRED COMPONENTS

  actionlib

  actionlib_msgs

)

add_action_files(

  DIRECTORY action

  FILES check_msg.action

)

generate_messages(DEPENDENCIES actionlib_msgs std_msgs)

catkin_package(CATKIN_DEPENDS actionlib_msgs)

b)在package.xml中添加:





4)增编译 dashgo_ws(路径:~/dashgo_ws/):删除build和devel,然后catkin_make。

5)编写service和client:脚本请参考文末。

请将脚本(前进一米、旋转360度各有一对service和client,共四份)放入D1 路径:~/dashgo_ws/src/dashgo/dashgo_tools/scripts下。

6) 运行并查看反馈过程:

a)前进一米

rosrun dashgo_tools check_linear_action_server.py

rosrun dashgo_tools check_linear_action_client.py

查看反馈rostopic echo /dashgo_tools/feedback

b)旋转360度

rosrun dashgo_tools check_linear_action_server.py

rosrun dashgo_tools check_linear_action_client.py

查看反馈rostopic echo /dashgo_tools/feedback

脚本同时实现了D1接收到新命令后,停止执行当前命令,并执行新命令。

四、脚本

4.1.1、创建check_angular_action_client.py

#! /usr/bin/env python

from __future__ import print_function

import rospy

import actionlib

import dashgo_tools.msg

def linear_start():

    print("loginfo: angular_start()")

    # Creates the SimpleActionClient, passing the type of the action

    client = actionlib.SimpleActionClient('angular_server_py', dashgo_tools.msg.check_msgAction)

    print("loginfo: client created")

    # Waits until the action server has started up and started

    # listening for goals.

    client.wait_for_server()

    print("loginfo: server connected")

    # Creates a goal to send to the action server.

    _goal = dashgo_tools.msg.check_msgGoal(goal = 360, vel = 0.5, error = 1) # degrees, radians per second, degrees converted to radians

    print("loginfo: goal created")

    # Sends the goal to the action server.

    client.send_goal(_goal)

    print("loginfo: goal sent")

if __name__ == '__main__':

    try:

        print("angular_client_py begin")

        # Initializes a rospy node so that the SimpleActionClient can

        # publish and subscribe over ROS.

        rospy.init_node('angular_client_py')

        linear_start()

        print("angular_client_py finished")

    except rospy.ROSInterruptException:

        print("angular_client_py faild")

4.1.2、创建check_angular_action_server.py

#!/usr/bin/env python

import rospy

from geometry_msgs.msg import Twist, Quaternion

from nav_msgs.msg import Odometry

import tf

from math import radians, copysign

import PyKDL

from math import pi

import actionlib

import dashgo_tools.msg

def quat_to_angle(quat):

    rot = PyKDL.Rotation.Quaternion(quat.x, quat.y, quat.z, quat.w)

    return rot.GetRPY()[2]

def normalize_angle(angle):

    res = angle

    while res > pi:

        res -= 2.0 * pi

    while res < -pi:

        res += 2.0 * pi

    return res

class CalibrateAngular():

    # create messages that are used to publish feedback/result

    _goal = dashgo_tools.msg.check_msgGoal()

    _feedback = dashgo_tools.msg.check_msgFeedback()

    _result = dashgo_tools.msg.check_msgResult()

    def __init__(self, name):

        print("loginfo: __init__()")

        self._action_name = name

        self._as = actionlib.SimpleActionServer(self._action_name, dashgo_tools.msg.check_msgAction, self.execute_cb, False)

        self._as.start()

        # Set rospy to execute a shutdown function when terminating the script

        rospy.on_shutdown(self.shutdown)

    def execute_cb(self, goal):

        print("loginfo: execute_cb() called")

        # How fast will we check the odometry values?

        self.rate = rospy.get_param('~rate', 20)

        r = rospy.Rate(self.rate)

        self._goal.goal = goal.goal

        self._goal.vel = goal.vel

        self._goal.error = goal.error

        # The test angle is 360 degrees

        self.test_angle = radians(rospy.get_param('~test_angle', self._goal.goal))

        self.speed = rospy.get_param('~speed', self._goal.vel) # radians per second

        self.tolerance = radians(rospy.get_param('tolerance', self._goal.error)) # degrees converted to radians

        self.odom_angular_scale_correction = rospy.get_param('~odom_angular_scale_correction', 1.0)

        self.start_test = rospy.get_param('~start_test', True)

        # Publisher to control the robot's speed

        self.cmd_vel = rospy.Publisher('/cmd_vel', Twist, queue_size=5)

        # The base frame is usually base_link or base_footprint

        self.base_frame = rospy.get_param('~base_frame', '/base_footprint')

        # The odom frame is usually just /odom

        self.odom_frame = rospy.get_param('~odom_frame', '/odom')

        # Initialize the tf listener

        self.tf_listener = tf.TransformListener()

        # Give tf some time to fill its buffer

        rospy.sleep(2)

        # Make sure we see the odom and base frames

        self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60.0))

        rospy.loginfo("Bring up rqt_reconfigure to control the test.")

        reverse = 1

        while not rospy.is_shutdown():

            if self.start_test:

                # Get the current rotation angle from tf

                self.odom_angle = self.get_odom_angle()

                last_angle = self.odom_angle

                turn_angle = 0

                self.test_angle *= reverse

                error = self.test_angle - turn_angle

                # Alternate directions between tests

                reverse = -reverse

                while abs(error) > self.tolerance and self.start_test:

                    if rospy.is_shutdown():

                        return

                    if self._as.is_preempt_requested():

                        rospy.loginfo('check_angular: Preempted')

                        self._as.set_preempted()

                        break

                    # Rotate the robot to reduce the error

                    move_cmd = Twist()

                    move_cmd.angular.z = copysign(self.speed, error)

                    self.cmd_vel.publish(move_cmd)

                    r.sleep()

                    # Get the current rotation angle from tf                  

                    self.odom_angle = self.get_odom_angle()

                    # Compute how far we have gone since the last measurement

                    delta_angle = self.odom_angular_scale_correction * normalize_angle(self.odom_angle - last_angle)

                    # Add to our total angle so far

                    turn_angle += delta_angle

                    self._feedback.accomplished = turn_angle * 57.2957795 # 180/pi

                    self._as.publish_feedback(self._feedback)

                    # Compute the new error

                    error = self.test_angle - turn_angle

                    # Store the current angle for the next comparison

                    last_angle = self.odom_angle

                # Stop the robot

                self.cmd_vel.publish(Twist())

                # Update the status flag

                self.start_test = False

                params = {'start_test': False}

                self._result.issuccess = True

                rospy.loginfo('%s: Succeeded' % self._action_name)

                self._as.set_succeeded(self._result)

                break

            rospy.sleep(0.5)

        # Stop the robot

        self.cmd_vel.publish(Twist())

    def get_odom_angle(self):

        # Get the current transform between the odom and base frames

        try:

            (trans, rot)  = self.tf_listener.lookupTransform(self.odom_frame, self.base_frame, rospy.Time(0))

        except (tf.Exception, tf.ConnectivityException, tf.LookupException):

            rospy.loginfo("TF Exception")

            return

        # Convert the rotation from a quaternion to an Euler angle

        return quat_to_angle(Quaternion(*rot))

    def shutdown(self):

        # Always stop the robot when shutting down the node

        rospy.loginfo("Stopping the robot...")

        self.cmd_vel.publish(Twist())

        rospy.sleep(1)

if __name__ == '__main__':

    print("")

    print("angular_server_py begin")

    rospy.init_node('angular_server_py')

    server = CalibrateAngular(rospy.get_name())

    rospy.spin()

4.2.1、创建check_linear_action_client.py

#! /usr/bin/env python

from __future__ import print_function

import rospy

import actionlib

import dashgo_tools.msg

def linear_start():

    print("loginfo: linear_start()")

    # Creates the SimpleActionClient, passing the type of the action

    client = actionlib.SimpleActionClient('linear_server_py', dashgo_tools.msg.check_msgAction)

    print("loginfo: client created")

    # Waits until the action server has started up and started

    # listening for goals.

    client.wait_for_server()

    print("loginfo: server connected")

    # Creates a goal to send to the action server.

    goal = dashgo_tools.msg.check_msgGoal(goal = 1.0, vel = 0.15, error = 0.01) # distance, velocity, error

    print("loginfo: goal created")

    # Sends the goal to the action server.

    client.send_goal(goal)

    print("loginfo: goal sent")

if __name__ == '__main__':

    try:

        print("linear_client_py begin")

        # Initializes a rospy node so that the SimpleActionClient can

        # publish and subscribe over ROS.

        rospy.init_node('linear_client_py')

        linear_start()

        print("linear_client_py finished")

    except rospy.ROSInterruptException:

        print("linear_client_py faild")

2.2.2、创建check_linear_action_server.py

#! /usr/bin/env python

import rospy

from geometry_msgs.msg import Twist, Point

from math import copysign, sqrt, pow

import tf

import actionlib

import dashgo_tools.msg

class CalibrateLinear(object):

    # create messages that are used to publish feedback/result

    _goal = dashgo_tools.msg.check_msgGoal()

    _feedback = dashgo_tools.msg.check_msgFeedback()

    _result = dashgo_tools.msg.check_msgResult()

    def __init__(self, name):

        print("loginfo: __init__()")

        self._action_name = name

        self._as = actionlib.SimpleActionServer(self._action_name, dashgo_tools.msg.check_msgAction, self.execute_cb, False)

        self._as.start()

        # Set rospy to execute a shutdown function when terminating the script

        rospy.on_shutdown(self.shutdown)

    def execute_cb(self, goal):

        print("loginfo: execute_cb() called")

        # How fast will we check the odometry values?

        self.rate = rospy.get_param('~rate', 20)

        r = rospy.Rate(self.rate)

        self._goal.goal = goal.goal

        self._goal.vel = goal.vel

        self._goal.error = goal.error

        # Set the distance to travel

        self.test_distance = rospy.get_param('~test_distance', self._goal.goal) # meters

        self.speed = rospy.get_param('~speed', self._goal.vel) # meters per second

        self.tolerance = rospy.get_param('~tolerance', self._goal.error) # meters

        self.odom_linear_scale_correction = rospy.get_param('~odom_linear_scale_correction', 1.0)

        self.start_test = rospy.get_param('~start_test', True)

        # Publisher to control the robot's speed

        self.cmd_vel = rospy.Publisher('/cmd_vel', Twist, queue_size=5)

        # The base frame is base_footprint for the TurtleBot but base_link for Pi Robot

        self.base_frame = rospy.get_param('~base_frame', '/base_footprint')

        # The odom frame is usually just /odom

        self.odom_frame = rospy.get_param('~odom_frame', '/odom')

        # Initialize the tf listener

        self.tf_listener = tf.TransformListener()

        # Give tf some time to fill its buffer

        rospy.sleep(2)

        # Make sure we see the odom and base frames

        self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60.0))       

        rospy.loginfo("Bring up rqt_reconfigure to control the test.")

        self.position = Point()

        # Get the starting position from the tf transform between the odom and base frames

        self.position = self.get_position()

        x_start = self.position.x

        y_start = self.position.y

        move_cmd = Twist()

        while not rospy.is_shutdown():

            # Stop the robot by default

            move_cmd = Twist()

            if self._as.is_preempt_requested():

                rospy.loginfo('check_linear: Preempted')

                self._as.set_preempted()

                break

            if self.start_test:

                # Get the current position from the tf transform between the odom and base frames

                self.position = self.get_position()

                # Compute the Euclidean distance from the target point

                distance = sqrt(pow((self.position.x - x_start), 2) +

                                pow((self.position.y - y_start), 2))

                # Correct the estimated distance by the correction factor

                distance *= self.odom_linear_scale_correction

                # How close are we?

                error =  distance - self.test_distance

                # Are we close enough?

                if not self.start_test or abs(error) <  self.tolerance:

                    self.start_test = False

                    params = {'start_test': False}

                    # rospy.loginfo(params)

                    self._result.issuccess = True

                    rospy.loginfo('%s: Succeeded' % self._action_name)

                    self._as.set_succeeded(self._result)

                    break

                else:

                    # If not, move in the appropriate direction

                    move_cmd.linear.x = copysign(self.speed, -1 * error)

                    self._feedback.accomplished = distance

                    self._as.publish_feedback(self._feedback)

            else:

                self.position = self.get_position()

                x_start = self.position.x

                y_start = self.position.y

            self.cmd_vel.publish(move_cmd)

            r.sleep()

        # Stop the robot

        self.cmd_vel.publish(Twist())

    def get_position(self):

        # Get the current transform between the odom and base frames

        try:

            (trans, rot)  = self.tf_listener.lookupTransform(self.odom_frame, self.base_frame, rospy.Time(0))

        except (tf.Exception, tf.ConnectivityException, tf.LookupException):

            rospy.loginfo("TF Exception")

            return

        return Point(*trans)

    def shutdown(self):

        # Always stop the robot when shutting down the node

        rospy.loginfo("Stopping the robot...")

        self.cmd_vel.publish(Twist())

        rospy.sleep(1)

if __name__ == '__main__':

    print("")

    print("linear_server_py begin")

    rospy.init_node('linear_server_py')

    server = CalibrateLinear(rospy.get_name())

    rospy.spin()

五、ROS官网

Actionlib 机制的详细介绍请参见官网:


声明:本文为OFweek维科号作者发布,不代表OFweek维科号立场。如有侵权或其他问题,请及时联系我们举报。
2
评论

评论

    相关阅读

    暂无数据

    说科技

    关注机器人行业,3D打印行业等。...

    举报文章问题

    ×
    • 营销广告
    • 重复、旧闻
    • 格式问题
    • 低俗
    • 标题夸张
    • 与事实不符
    • 疑似抄袭
    • 我有话要说
    确定 取消

    举报评论问题

    ×
    • 淫秽色情
    • 营销广告
    • 恶意攻击谩骂
    • 我要吐槽
    确定 取消

    用户登录×

    请输入用户名/手机/邮箱

    请输入密码