この記事は私が金沢工業大学ロボティクス学科で担当している2020年度後学期に担当したロボットプログラミングⅡをROS2用に変更したものです。今回は、ROS2の通信方式であるトピックとそれをPythonで実現する方法を学びましょう!詳細なコンテンツは次のとおりです。
コンテンツ
- トピック通信
参考サイト
概 要
- ROS2の通信方式にはトピック通信とサービス通信があり、トピック通信は1対多(一つのノードから多くのノードに通信可能)の非同期通信で、メッセージ通信は1体1(一つのノードからはある特定のノートにしか通信できない)の同期通信です。ここではトピック通信を学びます。トピック通信では送り手と受け手がTopic(トピック)とよばれる名前付きのバス(伝送路)を介してデータをやり取りしています。送り手のことをPublisher(配信者)、受け手のことをSubscriber(購読者)、データのことをメッセージとよびます。
準 備
- 以下の演習を実施していない場合は、実施して必要なパッケージをインストールしてシミュレータの使い方を学ぶ。
- 端末を開き、byobuを実行して、端末を上下に3分割する。
- $ byobyu
- Shift+F2キーを押すと端末を上下に2分割される。
- もう一度、Shift+F2キーを押すと端末が全部で上下に3分割される。
- Shift-Alt-上下キーで端末のサイズを調整する。
パブリッシャ (Publisher)
- ソースコードと説明:キーボードからロボットを操縦するmy_teleopパッケージを作ろう!
ROS2演習3と同じ要領でmy_teleopパッケージを作る。今回の例ではパッケージ名helloをmy_teleop、ノード名をmy_teleop_nodeとする。1番目の端末で次のコマンドを実行して、my_teleopパッケージとmy_teleop_nodeノードを作成する。$ cd ~/colcon_ws/src
$ ros2 pkg create --build-type ament_python --node-name my_teleop_node my_teleop
このプログラムはメッセージの送リ手であるpublisher(配信者)プログラムの簡単な例にもなっている。プログラムの説明はソースコード内のコメントを参照して欲しい。好きなテキストエディタにコピペしてmy_teleop_node.pyと名前を付けて、~/colcon_ws/src/my_teleop/my_teleopディレクトリに保存する。以下の例ではgeditを使っている。
-
- $ cd ~/colcon_ws/src/my_teleop/my_teleop
- $ gedit my_teleop_node.py &
# ファイル名 my_teleop_node.py import rclpy # ROS2のPythonモジュールをインポート from rclpy.node import Node # rclpy.nodeモジュールからNodeクラスをインポート from std_msgs.msg import String # トピック通信に使うStringメッセージ型をインポート from geometry_msgs.msg import Twist # トピック通信に使うTwistメッセージ型をインポート class TeleopPublisher(Node): """タイマーのコールバック関数を使い、ロボットを動かす速度指令値のトピックcmd_vel をパブリッシュするクラス。 Node: TeleopPublisherクラスが継承するクラス """ def __init__(self): """コンストラクタ。パブリッシャーとタイマーを生成する。 """ # Nodeクラスのコンストラクタを呼び出し、'teleop_pulisher_node'というノード名をつける。 super().__init__('teleop_publisher_node') # パブリッシャーの生成。create_publisherの1番目の引数はトピック通信に使うメッセージ型。 # Twist型は速度指令値を通信するのに使われる。2番目の引数'cmd_vel'はトピック名。 # 3番目の引数はキューのサイズ。キューサイズはQOS(quality of service)の設定に使われる。 # サブスクライバーがデータを何らかの理由で受信できないときのキューサイズの上限となる。 self.publisher = self.create_publisher(Twist,'cmd_vel', 10) # タイマーの生成。タイマーのコールバック関数timer_callbackをtimer_period間隔で実行する。 # この例では0.01[s]秒ごとにtimer_callback関数を呼び出す。 timer_period = 0.01 # 秒 self.timer = self.create_timer(timer_period, self.timer_callback) # Twistメッセージ型オブジェクトの生成。メンバーにdVector3型の並進速度成分linear、 # 角速度成分angularを持つ。 self.vel = Twist() print("*** my_teleop node ***") print("Input f, b, r, l key, then press Enter key.") def timer_callback(self): """タイマーのコールバック関数。入力キーにより並進及び角速度を増減している。 input関数はブロックされるので、キーを入力した後にEnterキーを押さなければ速度は変更されない。 """ key = input("f:forward, b:backward, r:right, l:left, s:stop <<") print("key=", key) # linear.xは前後方向の並進速度(m/s)。前方向が正。 # angular.zは回転速度(rad/s)。反時計回りが正。 if key == 'f': # fキーが押されていたら self.vel.linear.x = 0.25 elif key == 'b': self.vel.linear.x = -0.25 elif key == 'l': self.vel.angular.z = 0.25 elif key == 'r': self.vel.angular.z = -0.25 else: print("Input f, b, r, l : ") self.publisher.publish(self.vel) # 速度指令メッセージをパブリッシュ(送信)する。 # 端末に並進と角速度を表示する。 self.get_logger().info("Velocity: Linear=%f angular=%f" % (self.vel.linear.x,self.vel.angular.z)) def main(args=None): rclpy.init(args=args) # rclpyモジュールの初期化 teleop_publisher = TeleopPublisher() # ノードの作成 rclpy.spin(teleop_publisher) # コールバック関数が呼び出し rclpy.shutdown() # rclpyモジュールの終了処理 if __name__ == '__main__': main()
ここで、馴染みのないコールバック関数callbackが登場しています。コールバック関数はIT用語辞典によると「コールバック関数とは、プログラム中で、呼び出し先の関数の実行中に実行されるように、あらかじめ指定しておく関数。」と定義されています。通常はマウスで線を描くなどの処理を実装するような、あるイベントが起きた時に処理をするプログラムを実装する場合に使われます。通常は、マウスのイベント処理などの場合のようにマウスのボタンが押されたり、移動したときに自動的にコールバック関数が呼び出されますが、ROSではspinOnce()やspin()で明示的に呼び出さなければいけません。spinOnce()はコールバック関数を一度だけ呼び出すので通常はwhileループの中で使います。spin()はノードが動いている間、コールバック関数を呼び続けます。ROSではメッセージの受け渡しにコールバック関数を使います。
2.ビルド
-
- 1番目の端末で以下のコマンドでビルドしよう。
$ cd ~/colcon_ws
$ colcon build --symlink-install
- 1番目の端末で以下のコマンドでビルドしよう。
3. 実行
-
- 1番目の端末で次のコマンドを実行して、my_teleopノードを実行する。
$ ros2 run my_teleop my_teleop_node
- 2番目の端末で次のコマンドを実行して、gazeboを起動する。
$ export TURTLEBOT3_MODEL=burger
$ ros2 launch turtlebot3_gazebo empty_world.launch.py
- 1番目の端末で次のコマンドを実行して、my_teleopノードを実行する。
サブスクライバ (Subscriber)
- ソースコードと説明
次に、メッセージの受け手であるsubscriber(購読者)の簡単なプログラムを示します。このプログラムはmy_teleop_nodeがパブリッシュするトピックcmd_velをサブスクライブして、並進速度(Linear Velocity)と角速度(Angular Velocity)を標準出力に出力する簡単なプログラムです。
# ファイル名 my_subscriber_node.py import rclpy # ROS2のPythonモジュールをインポート from rclpy.node import Node # rclpy.nodeモジュールからNodeクラスをインポート from std_msgs.msg import String # トピック通信に使うStringメッセージ型をインポート from geometry_msgs.msg import Twist # トピック通信に使うTwistメッセージ型をインポート class MySubscriber(Node): """速度指令値のトピックcmd_velをサブスクライブして端末に表示するたけの簡単なクラス """ def __init__(self): """コンストラクタ。サブスクライバーを生成する。 """ # Nodeクラスのコンストラクタを呼び出し、'my_subscriber_node'というノード名をつける。 super().__init__('my_subscriber_node') # サブスクライバーの生成。create_subscriptionの1番目の引数Twistはトピック通信に使うメッセージ型。 # Twist型は速度指令値を通信するのに使われる。2番目の引数'cmd_vel'はトピック名。 # 3番目の引数はコールバック関数。 4番目の引数はキューのサイズ。 self.subscription = self.create_subscription(Twist,'cmd_vel', self.listener_callback, 10) def listener_callback(self, Twist): """サブスクライバーのコールバック関数。端末に並進と角速度を表示する。 """ self.get_logger().info("Velocity: Linear=%f angular=%f" % (Twist.linear.x,Twist.angular.z)) def main(args=None): rclpy.init(args=args) # rclpyモジュールの初期化 my_subscriber = MySubscriber() # ノードの作成 rclpy.spin(my_subscriber) # コールバック関数が呼び出し my_subscriber.destory_node() # ノードの破壊 rclpy.shutdown() # rclpyモジュールの終了処理 if __name__ == '__main__': main()
では、my_subscriber_nodeを作ろう。
好きなテキストエディタにコピペしてmy_subscriber_node.pyと名前を付けて、~/colcon_ws/src/my_teleop/my_teleopディレクトリに保存する。以下の例ではgeditを使っている。
- $ cd ~/colcon_ws/src/my_teleop/my_teleop
- $ gedit my_subscriber_node.py &
2. 依存関係の追加
my_teleopパッケージを作成したので~/colcon_ws/src/my_teleopディレクトリに setup.py、setup.cfg、package.xmlが作られている。package.xmlの5行から7行を必要に応じて変更する。次に、依存関係を示す10行目から12行目を追加する。この3つのモジュールはソースコードでインポートしており、実行時に必要になる。
3. エントリポイントの追加
次に、setup.pyをエディタで開いて、エントリポイントとなる次の1行を24行目に挿入する。エントリーポイントには実行するファイルと関数を指定する。23行目に関しては、パッケージを作成するときにノード名my_teleop_nodeを指定したので自動で作られた。
-
- ‘my_subscriber_node = my_teleop.my_subscriber_node:main’,
4. Setup.cfgのチェック
setup.cfgは自動作成される。このファイルにはros2 runで使う実行可能ファイルがどこにあるかが書かれている。
5. ビルドと実行
ROS2をインストールしたときに依存関係のあるrclpy、std_msgs、geometry_msgsは既にインストールされているが、依存関係を解消する便利なコマンドを紹介する。足りないパッケージがあれば探してインストールしてくれる。
$ cd ~/colcon_ws
$ rosdep install -i --from-path src --rosdistro foxy -y
では、以下のコマンドでビルドして実行しよう。
$ cd ~/catkin_w
$ colcon build --symlink-install
$ rosrun my_subscriber my_subscriber_node
6. シミュレータでの確認
- 端末を3つ開く。
- 1番目の端末でシミュレータを起動する。
$ ros2 launch turtlebot3_gazebo empty_world.launch.py
- 2番目の端末でmy_teleop_nodeノードを起動する。
$ ros2 run my_teleop my_teleop_node
- 3番目の端末でmy_subscriberノードを起動する。
$ rosrun my_teleop my_subscriber_node
- 次にmy_teleopノードを起動した端末にマウスカーソルを持って行く。
f, b, l, rキーでロボットが移動し、sキーで停止したら成功。キーを押した後はエンターキーを押さないと動かない。 - 最後に、my_subscriber_nodeノードを起動した端末を見ましょう。速度が表示されていたら成功。
お疲れ様!
ホームワーク
- キーsを押すと、ロボットが停止するようにmy_teleop_node.pyのソースコードを変更しよう!
- my_teleop_node.cppではキーボードのf, b, r, lを押した後にエンターキーを押すと、一定の速度でロボットが動き出す。キーを押すたびに並進速度が0.01 [m/s]ずつ増減、回転角度が0.1 [rad/s]ずつ変化するようにソースコードを変更しよう!
- 指定した時間[s]だけ、指定した速度[m/s]と角速度[rad/s]で移動するプログラムを作ろう!
- ロボットの移動する軌跡が1辺x[m]の正方形になるようプログラムを変更しよう!
- ロボットの移動する軌跡が半径x[m]の円になるようプログラムを変更しよう!
- my_teleop_node.pyは、ROS2演習4-2021:シミュレータでTurtlebot3を動かそう!teleop_keyboardとは違いf, b, r, lのキーを押した後にエンターキーが必要です。エンターキーを押さずに、f, b, r, lキーを押すとロボットが移動するようにソースコードを変更しよう。なお、my_teleop_node.pyではinput関数でキーボードから入力しますがキーが入力されるまで待状態になります。これをブロックとよびます。エンターキーが入力されると次の処理に移ります。Linuxでブロックしない入出力をpythonで実現する方法を調べて実装しよう!
終わり
コメント