Python rospy and rclpy


  • Description: ROS Python clients, rospy (ROS 1) first then rclpy (ROS 2). Nodes, publishers/subscribers, services, timers, parameters, the bag APIs (rosbag and rosbag2_py), and migration notes
  • My Notion Note ID: K2A-D2-3
  • Created: 2023-04-05
  • Updated: 2026-05-11
  • License: Reuse is very welcome. Please credit Yu Zhang and link back to the original on yuzhang.io

Table of Contents


1. ROS 1 vs ROS 2 in Brief

  • ROS 1 (Indigo, Kinetic, Melodic, Noetic), custom XML-RPC + TCPROS stack, central master. Final release Noetic EOL 2025-05-31.
  • ROS 2 (Foxy, Humble, Jazzy, …), built on DDS, no central master, native QoS, real Windows/macOS support, lifecycle nodes. Active development.
  • Use ROS 2 for new robots; many fleets still maintain ROS 1
  • ros1_bridge lets the two stacks talk during migration

2. ROS 1 with rospy

2.1 Node and init_node

  • Every ROS process is a node
  • rospy.init_node registers it with the master
#!/usr/bin/env python
import rospy

if __name__ == "__main__":
    rospy.init_node("listener", anonymous=False)
    rospy.loginfo("listener started")
    rospy.spin()              # block until shutdown
  • anonymous=True, random suffix so multiple instances of the same script can coexist
  • Logging: rospy.loginfo, logwarn, logerr, logfatal, logdebug

2.2 Publisher and Subscriber

from std_msgs.msg import String

pub = rospy.Publisher("/chatter", String, queue_size=10)
rate = rospy.Rate(10)                   # 10 Hz

while not rospy.is_shutdown():
    msg = String(data="hello")
    pub.publish(msg)
    rate.sleep()

def callback(msg: String) -> None:
    rospy.loginfo("got: %s", msg.data)

rospy.Subscriber("/chatter", String, callback, queue_size=10)
rospy.spin()
  • queue_size is the outbound (publisher) or inbound (subscriber) buffer in messages, set explicitly to avoid the unbounded-queue warning

2.3 Services

  • Synchronous request/reply over a typed .srv interface
from my_pkg.srv import AddTwoInts, AddTwoIntsResponse

def handle(req):
    return AddTwoIntsResponse(req.a + req.b)

rospy.Service("add_two_ints", AddTwoInts, handle)
rospy.spin()

# Client:
rospy.wait_for_service("add_two_ints")
add = rospy.ServiceProxy("add_two_ints", AddTwoInts)
resp = add(2, 3)

2.4 Parameters

rospy.set_param("/robot/name", "yuzhang_bot")
name = rospy.get_param("/robot/name", "default")
rospy.delete_param("/robot/name")
rospy.has_param("/robot/name")
  • Parameters live on the master, global, not per-node

2.5 Time and Rate

now = rospy.Time.now()                  # rospy.Time
duration = rospy.Duration(secs=1, nsecs=500_000_000)
t_after = now + duration

r = rospy.Rate(20)
while not rospy.is_shutdown():
    do_step()
    r.sleep()
  • Sim time (/use_sim_time=true + /clock publisher), rospy.Time.now() returns simulator time

2.6 rosbag (ROS 1)

import rosbag

# Read
with rosbag.Bag("recorded.bag", "r") as bag:
    for topic, msg, t in bag.read_messages(topics=["/chatter", "/imu"]):
        process(topic, msg, t)

# Write
with rosbag.Bag("out.bag", "w") as bag:
    bag.write("/chatter", String(data="hello"), rospy.Time.now())
  • Bag closed on with exit
  • CLI rosbag info out.bag summarizes it

3. ROS 2 with rclpy

3.1 Node Class

  • Idiomatic ROS 2, subclass Node
import rclpy
from rclpy.node import Node

class Listener(Node):
    def __init__(self):
        super().__init__("listener")
        self.get_logger().info("listener started")

def main():
    rclpy.init()
    node = Listener()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == "__main__":
    main()
  • get_logger(), node-scoped logger; .debug/.info/.warning/.error/.fatal

3.2 Publisher and Subscriber

from std_msgs.msg import String

class Talker(Node):
    def __init__(self):
        super().__init__("talker")
        self.pub = self.create_publisher(String, "chatter", 10)
        self.timer = self.create_timer(0.1, self.tick)  # 10 Hz

    def tick(self):
        self.pub.publish(String(data="hello"))

class Listener(Node):
    def __init__(self):
        super().__init__("listener")
        self.create_subscription(String, "chatter", self.cb, 10)

    def cb(self, msg):
        self.get_logger().info(f"got: {msg.data}")
  • Topic name has no leading / by convention, names resolved relative to the node's namespace

3.3 QoS Profiles

  • DDS QoS replaces ROS 1's queue_size
  • A QoS depth integer is the shortcut; full profiles control reliability, history, durability
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy, HistoryPolicy

qos = QoSProfile(
    reliability=ReliabilityPolicy.RELIABLE,        # or BEST_EFFORT
    durability=DurabilityPolicy.TRANSIENT_LOCAL,   # late joiners get the last message
    history=HistoryPolicy.KEEP_LAST,
    depth=10,
)
self.create_publisher(String, "chatter", qos)
  • Publishers and subscribers must agree on QoS to connect
  • rclpy.qos_event, callbacks for QoS mismatches

3.4 Services and Clients

from example_interfaces.srv import AddTwoInts

class Server(Node):
    def __init__(self):
        super().__init__("adder")
        self.srv = self.create_service(AddTwoInts, "add_two_ints", self.handle)

    def handle(self, request, response):
        response.sum = request.a + request.b
        return response

class Client(Node):
    def __init__(self):
        super().__init__("adder_client")
        self.cli = self.create_client(AddTwoInts, "add_two_ints")
        self.cli.wait_for_service()

    async def call(self, a, b):
        req = AddTwoInts.Request(a=a, b=b)
        return await self.cli.call_async(req)
  • call_async returns a Future, await it (asyncio-compatible executor) or rclpy.spin_until_future_complete

3.5 Parameters

  • Parameters are per node
self.declare_parameter("rate_hz", 10.0)
rate = self.get_parameter("rate_hz").get_parameter_value().double_value
  • add_on_set_parameters_callback, validate/react to runtime changes

3.6 Executors and Multi-Threading

from rclpy.executors import MultiThreadedExecutor

executor = MultiThreadedExecutor(num_threads=4)
executor.add_node(node_a)
executor.add_node(node_b)
try:
    executor.spin()
finally:
    executor.shutdown()
  • Use ReentrantCallbackGroup for concurrent callbacks on the same node
  • Default group serializes them

3.7 rosbag2_py

import rosbag2_py
from rclpy.serialization import deserialize_message
from rosidl_runtime_py.utilities import get_message

# Read
reader = rosbag2_py.SequentialReader()
storage = rosbag2_py.StorageOptions(uri="path/to/bag", storage_id="sqlite3")
converter = rosbag2_py.ConverterOptions("", "")
reader.open(storage, converter)

type_map = {t.name: t.type for t in reader.get_all_topics_and_types()}

while reader.has_next():
    topic, raw, t = reader.read_next()
    msg_cls = get_message(type_map[topic])
    msg = deserialize_message(raw, msg_cls)
    process(topic, msg, t)

# Write
writer = rosbag2_py.SequentialWriter()
writer.open(storage, converter)
writer.create_topic(rosbag2_py.TopicMetadata(
    name="/chatter", type="std_msgs/msg/String", serialization_format="cdr",
))
  • Default storage backend: sqlite3 (Foxy/Humble)
  • Newer distros also offer mcap

4. Migration: rospy → rclpy

Concept rospy rclpy
Init rospy.init_node(name) rclpy.init(); class Node
Spin rospy.spin() rclpy.spin(node)
Publisher rospy.Publisher(topic, type, queue_size=) self.create_publisher(type, topic, qos)
Subscriber rospy.Subscriber(topic, type, cb) self.create_subscription(type, topic, cb, qos)
Service rospy.Service(name, srv, handler) self.create_service(srv, name, handler)
Param rospy.get_param(name) (global) self.declare_parameter(name); self.get_parameter(name) (per-node)
Time rospy.Time, rospy.Duration rclpy.time.Time, rclpy.duration.Duration
Rate rospy.Rate(hz) self.create_timer(period, cb)
Bag rosbag.Bag rosbag2_py.SequentialReader/Writer
Logging rospy.loginfo self.get_logger().info
  • Message type imports also change: from std_msgs.msg import String works in both
  • In rclpy the package path includes the kind explicitly when introspecting (std_msgs/msg/String)

5. References