空飛ぶロボットのつくりかた

ロボットをつくるために必要な技術をまとめます。ロボットの未来についても考えたりします。

エッジ検出のお勉強(OpenCV+python)

Sobel Edge Detection

import cv2
 
image = cv2.imread('iron_man.png')
gray_image = cv2.cvtColor(image,cv2.COLOR_BGR2GRAY)
 
gray_sobel_x = cv2.Sobel(gray_image,cv2.CV_32F,1,0)
cv2.imshow('gray_sobel_x',gray_sobel_x)
 
gray_sobel_y = cv2.Sobel(gray_image,cv2.CV_32F,0,1)
cv2.imshow('gray_sobel_y',gray_sobel_y) 
 
gray_abs_sobel_x = cv2.convertScaleAbs(gray_sobel_x) 
cv2.imshow('gray_abs_sobel_x',gray_abs_sobel_x) 

gray_abs_sobel_y = cv2.convertScaleAbs(gray_sobel_y)
cv2.imshow('gray_abs_sobel_y',gray_abs_sobel_y) 

gray_sobel_edge = cv2.addWeighted(gray_abs_sobel_x, 0.5, gray_abs_sobel_y, 0.5, 0) 
 
cv2.imshow('gray_sobel_edge',gray_sobel_edge)

cv2.waitKey(0)
cv2.destroyAllWindows()

x f:id:robonchu:20170603163245p:plain y f:id:robonchu:20170603163252p:plain z f:id:robonchu:20170603163259p:plain

参考: 配列操作 — opencv 2.2 documentation  Python + OpenCVでエッジ検出 (Sobel) | Non-Fiction

Laplacian Edge Detection

import cv2
 
image = cv2.imread('iron_man.png',0)
#image = cv2.GaussianBlur(image, (3,3), 0) # gaussian filter
 
laplacian = cv2.Laplacian(image,cv2.CV_32F)
#laplacian = cv2.Laplacian(image,cv2.CV_32F, 3)
laplacian_edge = cv2.convertScaleAbs(laplacian)
 
cv2.imshow('laplacian_edge', laplacian_edge)

cv2.waitKey(0)
cv2.destroyAllWindows()

f:id:robonchu:20170603165349p:plain

平滑化するとより暗くなり、わかりにくい。

参考: 画像フィルタリング — opencv v2.1 documentation

Canny Edge Detection

opencvにそのままあった…

import cv2

ORIGIN_FILE_NAME = "iron_man.png"
GRAY_FILE_NAME = "gray.png"
CANNY_FILE_NAME = "canny.png"

origin_image = cv2.imread(ORIGIN_FILE_NAME)
gray_image = cv2.imread(ORIGIN_FILE_NAME, cv2.IMREAD_GRAYSCALE)
canny_image = cv2.Canny(gray_image, 200, 230)

cv2.imshow(ORIGIN, origin_image)
cv2.imshow(GRAY_FILE_NAME, gray_image)
cv2.imshow(CANNY_FILE_NAME, canny_image)

cv2.waitKey(0)
cv2.destroyAllWindows()

f:id:robonchu:20170603154145p:plainf:id:robonchu:20170603154200p:plainf:id:robonchu:20170603154215p:plain

白の部分が何ピクセルあるかを表示

import cv2

ORIGIN_FILE_NAME = "iron_man.png"

origin_image = cv2.imread(ORIGIN_FILE_NAME)
gray_image = cv2.imread(ORIGIN_FILE_NAME, cv2.IMREAD_GRAYSCALE)
canny_image = cv2.Canny(gray_image, 50, 150)

height, width = canny_image.shape
canny_image = canny_image[height/2:height , 0:width/2 ] # limit range

save_edge_data = 0

for i in canny_image:
    for j in i:
        save_edge_data += j

print save_edge_data/255 # display white space pixel volume

cv2.waitKey(0)
cv2.destroyAllWindows()

例えばこの値を用いてある場所にエッジがあったかどうかを判断できる。

参考: OpenCVでエッジ検出してみる - Qiita

テストを学ぼう(3)!~テストダブル~

理想的なユニットテストでは、依存するすべてのシステムを利用して行う。しかし、依存する本物のオブジェクトを常に使用できるとは限らない。

こんな時、リファクタリングをしたり、仮のオブジェクトを用いてテストを行うことができる。

テスタビリティ

テスタビリティ:テストのしやすさ


テスト対象が複雑な場合、テストも複雑になりがち。

しかし、テスト対象が複雑でなくてもテストが複雑になったり、不安定になることがある…

例えば、以下のテストは不安定、というかうまく行かない↓

プロダクションコード

#!/usr/bin/python
# coding: UTF-8

import time
from datetime import datetime

class DateState:
    def __init__(self):
        self.date = datetime.today()

    def do_something(self):
        self.date = datetime.today()
        print "do something"
        return self.date


if __name__ == '__main__':
    date_state = DateState()
    print date_state.date    

    time.sleep(1)
    date_state.do_something()
    print date_state.date

テストコード

# -*- coding:utf-8 -*-                                                 
import unittest, date_example
from datetime import datetime


class TestDate(unittest.TestCase):
    # 前処理                                                           
    def setUp(self):
        pass

    # 後片付け                                                         
    def tearDowm(self):
        pass

    # 現在時刻のテスト                                                   
    def test_get_day(self):        
        date_state = date_example.DateState()
        date_state.do_something()
        self.assertEqual(date_state.date, datetime.today())


if __name__ == "__main__":
    unittest.main()

実行すると

do something
F
======================================================================
FAIL: test_get_day (test_date_example0.TestDate)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "test_date_example0.py", line 19, in test_get_day
    self.assertEqual(date_state.date,datetime.today())
AssertionError: datetime.datetime(2017, 5, 28, 13, 13, 44, 456115) != datetime.datetime(2017, 5, 28, 13, 13, 44, 456151)

----------------------------------------------------------------------
Ran 1 test in 0.000s

FAILED (failures=1)

そんな時はどうするの?

リファクタリング

リファクタリング:プログラムの外部的な振る舞いを変えずに内部構造を変更し、コードを整理するテクニック


例えば、

  1. メソッドの抽出
  2. クラスの抽出

などがある

Tips : リファクタリングを行う前にテストを書くこと

メソッドの抽出

方法:テストがむずかしいメソッドを抽出し、抽出したメソッドをオーバーライドしてテストする

プロダクションコード

#!/usr/bin/python
# coding: UTF-8

import time
from datetime import datetime

class DateState:
    def __init__(self):
        #self.date = datetime.today()
        self.date = self.new_date()

    def do_something(self):
        #self.date = datetime.today()
        self.date = self.new_date()
        print "do something"
        return self.date

    def new_date(self):
        return datetime.today()


if __name__ == '__main__':
    date_state = DateState()
    print date_state.date    

    time.sleep(1)
    date_state.do_something()
    print date_state.date

テストコード

# -*- coding:utf-8 -*-                                                 
import unittest, date_example
from datetime import datetime


class TestDate(unittest.TestCase):
    # 前処理                                                           
    def setUp(self):
        pass

    # 後片付け                                                         
    def tearDowm(self):
        pass

    # 現在時刻のテスト                                                   
    def test_get_day(self):
        current = datetime.today()
        
        class NewDateState(date_example.DateState):
            def new_date(self):
                return current

        date_state = NewDateState()
        date_state.do_something()
        self.assertEqual(date_state.date,current)


if __name__ == "__main__":
    unittest.main()

実行結果

do something
.
----------------------------------------------------------------------
Ran 1 test in 0.000s

OK

このようにメソッドを抽出することで安定したテストが可能

実際、テスト対象はテスト対象クラスのサブクラスになっているが、不安定なテストよりこちらのほうが良いんじゃないでしょうか

委譲オブジェクトの抽出

方法:処理をオブジェクトとして抽出し委譲する

参考:

Python の関数はオブジェクト | すぐに忘れる脳みそのためのメモ

Pythonを書き始める前に見るべきTips - Qiita

Pyramidでzope.interfaceを使う — hirokiky's blog

テストダブル

テスト対象のクラスやメソッドは往々にして他のオブジェクト等に依存しており、依存する機能がユニットテストで扱いづらい時、依存するオブジェクトを代役で置き換える方法が有効。

この、代役は

  1. スタブ
  2. モック

などがあり、総称してテストダブルと呼ばれる。

スタブとは

依存するクラスやモジュールの代用として使用する仮のクラスやモジュールのこと。

↓こんな時にスタブを使う

  1. 依存オブジェクトが予測できない振る舞いをする
  2. 依存オブジェクトのクラスが存在しない
  3. 依存オブジェクトの実行コストが高く、簡単に利用できない
  4. 依存オブジェクトが実行環境に強く依存している
固定値を返すスタブ

スタブの例として、ランダムな整数(1〜100)を返す機能を考えてみる。

以下のように固定値を返すスタブを作ると、不確実性を排除できるため正しくテストすることができる。また、再利用できるスタブなどテストに応じて自由に作ることができる。

#!/usr/bin/python
# coding: UTF-8

import random

# 乱数生成クラス
class RandomNumberGenerator:
    def __init__(self):
        pass

    def nextInt(self):
        return random.randint(1,100)

# 固定値を返すスタブ
class RandomNumberGeneratorStub(RandomNumberGenerator):
    def nextInt(self):
        return 1

# 再利用可能な固定値を返すスタブ
class RandomNumberGeneratorFixedResultStub(RandomNumberGenerator):
    def __init__(self,num):
        self.num = num

    def nextInt(self):
        return self.num

# 乱数生成オブジェクトを持つクラス
class Randoms:
    def __init__(self):
        self.generator = RandomNumberGenerator()

    def choice(self,options):
        if len(options) is 0:
            return None
        idx = self.generator.nextInt() % len(options)
        return options[idx]


if __name__ == '__main__':
    options = []
    options.append("A")
    options.append("B")
    sut = Randoms()
    #sut.generator = RandomNumberGeneratorFixedResultStub(0)
    sut.generator = RandomNumberGeneratorFixedResultStub(1)
    print sut.choice(options)
例外を送出するスタブ

オブジェクトによっては例外の発生条件が非常に複雑であり、簡単にテストできない場合がある。そんな時はスタブオブジェクトで無条件に例外を送出するよう実装すれば簡単に テストできるよ。

#!/usr/bin/python
# coding: UTF-8

class SetUserInfo:
    def __init__(self):
        self.user_id = []
        pass

    def set(self,user_id):
        self.user_id = user_id
        if len(user_id) is 0:
            raise ValueError("error!")

class SetUserInfoStub(SetUserInfo):
    def set(self,user_id):
        raise ValueError("error!")


if __name__ == '__main__':
    #set_user_info = SetUserInfo()
    set_user_info = SetUserInfoStub()
    zero_list = [1,2]
    set_user_info.set(zero_list)

ROSrviz/rqtのお勉強

rviz

$ rosrun rviz rviz

左のaddからいろいろ追加してみよう

rqt

素のrqt

$ rqt

rqt_plot

$ rqt_plot

topic指定

$ rqt_plot /turtle1/cmd_vel/linear/x

rqt_image_view

$ rosrun uvc_camera uvc_camera_node
$ rqt

そして Plugins->Visualization->ImageViewとし、左上のTopicから/image_rawを選ぶとカメラ画像が確認できる

or

$ rqt_image_view

rqt_graph

$ rqt

Plugins->Introspection->Node_Graph

or

$ rqt_graph

rqt_bag

$ rqt

Plugins->Logging->Bag

そして、Load bagボタンでbagファイルのインポート

ROStfのお勉強

TFコマンド

フレームツリーグラフ

$ rosrun tf view_frames
$ evince frames.pdf

フレーム関係

$ roslaunch turtle_tf turtle_tf_demo.launch
$ rosrun tf tf_echo world turtle1
At time 1496036356.439
- Translation: [5.544, 5.544, 0.000]
- Rotation: in Quaternion [0.000, 0.000, 0.000, 1.000]
            in RPY (radian) [0.000, -0.000, 0.000]
            in RPY (degree) [0.000, -0.000, 0.000]

TFの構成

TFの実装例

ブロードキャスター

#!/usr/bin/env python  
import rospy

# Because of transformations
import tf

import tf2_ros
import geometry_msgs.msg
import turtlesim.msg


def handle_turtle_pose(msg, turtlename):
    br = tf2_ros.TransformBroadcaster()
    t = geometry_msgs.msg.TransformStamped()

    t.header.stamp = rospy.Time.now()
    t.header.frame_id = "world"
    t.child_frame_id = turtlename
    t.transform.translation.x = msg.x
    t.transform.translation.y = msg.y
    t.transform.translation.z = 0.0
    q = tf.transformations.quaternion_from_euler(0, 0, msg.theta)
    t.transform.rotation.x = q[0]
    t.transform.rotation.y = q[1]
    t.transform.rotation.z = q[2]
    t.transform.rotation.w = q[3]

    br.sendTransform(t)

参考:tf2/Tutorials/Writing a tf2 broadcaster (Python) - ROS Wiki

リスナー

TFで'odom'と'base'フレームの相対位置、回転の取得

import tf.transformations as T
import tf2_ros
import tf2_geometry_msgs                                                                                                                                                                          

tfBuffer = tf2_ros.Buffer()
listener = tf2_ros.TransformListener(tfBuffer)

rospy.sleep(2)
rate = rospy.Rate(10.0)

while not rospy.is_shutdown():
    try:
        trans_odom_base = tfBuffer.lookup_transform('odom','base', rospy.Time())
        break
    except (tf2_ros.LookupException,tf2_ros.ConnectivityException,tf2_ros.ExtrapolationException):
        print "tf odom to base not found"
        continue


translation_odom_base = trans_odom_base.transform.translation
rotation_odom_base = trans_odom_base.transform.rotation

position_x = translation_odom_base.x
position_y = translation_odom_base.y
position_z = translation_odom_base.z

quaternion_odom_base = [rotation_odom_base.x, rotation_odom_base.y, rotation_odom_base.z, rotation_odom_base.w]
euler_odom_base = T.euler_from_quaternion(quaternion_odom_base)
yaw_odom_base = euler_odom_base[2]

座標変換

TFで'base'フレームからあるオフセット分だけ移動した'odom'フレーム視点での座標

import tf.transformations as T
import tf2_ros
import tf2_geometry_msgs 

pose_st = PoseStamped()
pose_st.pose = geometry.tuples_to_pose(geometry.pose(x=0.3,y=0.2,z=0.6,ei=0,ej=3.14/2,ek=3.14))
pose_st.header.frame_id = 'base'

trans = tfBuffer.transform(pose_st, 'odom')                                                                                                                                                  

position = [trans.pose.position.x,trans.pose.position.y, trans.pose.position.z]
quat = [trans.pose.orientation.x, trans.pose.orientation.y, trans.pose.orientation.z, trans.pose.orientation.w]
euler = T.euler_from_quaternion(quat)

参考:

世界で一番簡単なtfの使い方 - MyEnigma

座標変換パッケージ "tf"の使い方 - うごくものづくりのために

最初にTFのフレームを作るとき

Run/create a first fixed frame with NO_PARENT - ROS Answers: Open Source Q&A Forum

ROSプラグインのお勉強

プラグインプログラムの作成

  1. ベースプラグインクラス
  2. プラグインクラス

で構成される

プラグインをエクスポート

他のクラスで動的に利用するためにPLUGINLIB_EXPORT_CLASSを用いる。

src/sample_plugins.cppの例

#include <pluginlib/class_list_macros.h>
#include <sample_plugins/sample_base_plugins.h>
#include <sample_plugins/sample_plugins.h>

PLUGINLIB_EXPORT_CLASS(sample_plugins::plugin1,sample_base_plugins::BasePlugin);
PLUGINLIB_EXPORT_CLASS(sample_plugins::plugin2,sample_base_plugins::BasePlugin);

2つのプラグインsample_plugins::plugin1、sample_plugins::plugin2を ベースクラスsample_plugins_base::BasePluginに登録している。

include/sample_plugins/sample_base_plugin.h

#include <~>

namespace sample_base_plugins{
class BasePlugin
{~}

void BasePlugin::initialize(char *file_name)
{~}

include/sample_plugins/sample_plugin.h

#include <ros/ros.h>
#include <sample_plugins/sample_base_plugin.h>

namespace sample_plugins{
class plugin1 : public sample_base_plugin::BasePlugin
{
public:
    void sort(){~};

private:
    static int sample_plugin1(~){};
};

class plugin2 : public sample_base_plugin::BasePlugin
{~};

プラグインの登録

package.xmlに以下を記載

<export>
  <sample_plugins="${prefix}/sample_plugins.xml" />
</export>

${prefix}はパッケージのルートディレクト

プラグインの記述

以下、2つのプラグインクラスが同じライブラリに含まれている。 sample_plugins.xml

<library path = "lib/libsample_plugins">
  <class name = "sample_plugins/plugin1" type="sample_plugins::plugin1"
         base_class_type = "sample_base_plugin::BasePlugin">
  </class>
  <class name = "sample_plugins/plugin2" type="sample_plugins::plugin2"
         base_class_type = "sample_base_plugin::BasePlugin">
  </class>
</library>

プラグインライブラリの構築

add_library(sample_plugins src/sample_plugins.cpp)

プラグインを利用する

例 src/plugin_sort.cpp

#include <ros/ros.h>
#include <boost/shared_ptr.hpp>
#include <pluginlib/class_loader.h>
#include <sample_plugins/sample_base_plugin.h>

//ベースクラスのロード
pluginlib::ClassLoader<sample_base_plugin::BasePlugin>
        loader("sample_plugins","sample_base_plugin::BasePlugin");

//インスタンス生成
boost::shared_ptr<sampel_base_plugin::BasePlugin>
      plug1 = loader.createInstance("sample_plugins/sample_plugin1);

plug1->initialize(~)
plug1->sort()

実行

$ rosrun sample_plugins plugin_sort

ROSアクションのお勉強

rosアクションの復習

アクションの作成

型の定義

$ roscd sample
$ mkdir action
$ cd action
$ touch sample.action

sample.actionの中身↓

uint16[] freqs
---
bool finished
---
uint32 steps

上からGoal,Result,Feedbackのmsgになる

設定

package.xmlに以下を追加

<build_depend>actionlib_msgs</build_depend>
<run_depend>actionlib_msgs</run_depend>

CMakeLists.txtに以下を追加

find_package(catkin REQUIRED COMPONENTS
   rospy
   std_msgs
   message_generation # 追加
   actionlib_msgs # 追加
)
add_action_files(
  FILES 
   sample.action # 追加
)
generate_message(
    DEPENDENCIES
    actionlib_msgs # 追加
    std_msgs
)

actionの実装例

サーバー

import rospy, actionlib
from sample.msg import sampleAction, sampleResult, sampleFeedback

def execute_sample(goal):
    result = sampleResult()

    feedback = sampleFeedback()
  feedback.steps = = len(goal.freqs)

    #feedback = sampleFeedback(steps = len(goal.freqs))

    action_sample.publish_feedback(feedback) # FEEDBACK                                                                                                                                                      

    if action_sample.is_preempt_requested():
        result.finished = Flase
        action_sample.set_preempted(result) # 中断された時                                                                                                                                                   
    return


    result.finished = True
    action_sample.set_succeeded(result) # RESULT


rospy.init_node('sample_server')
action_sample = actionlib.SimpleActionServer('sample', sampleAction, execute_sample, False)
action_sample.start()
rospy.spin()

クライアント

import rospy, actionlib
from sample.msg import sampleAction, sampleResult, sampleFeedback, sampleGoal

action_sample_client = actionlib.SimpleActionClient('sample', sampleAction)

def do_sample():
    goal = sampleGoal()
    goal.freqs = [100, 200, 0, 0]

    action_sample_client.wait_for_server()
    action_sample_client.send_goal(goal,feedback_cb=feedback_cb)
    action_sample_client.wait_for_result()

    result = action_sample.get_result()
    if result.finished is True:
        print "Goal"
    else:
        print "Not Goal"


def feedback_cb(feedback):
    print feedback

if __name__ == '__main__':
    rospy.init_node('sample_client')
    do_sample()

send_goalでフィードバックを受け取るコールバック関数を指定することができる

ROSメッセージのお勉強

rosメッセージの復習

メッセージコマンド

すべてのメッセージを表示

$ rosmsg list 

指定したメッセージの型を表示

$ rosmsg show geometry_msgs/Twist
geometry_msgs/Vector3 linear
  float64 x
  float64 y
  float64 z
geometry_msgs/Vector3 angular
  float64 x
  float64 y
  float64 z

パッケージへ行って中身確認

$ roscd geometry_msgs/msg/
$ e Twist.msg
# This expresses velocity in free space broken into its linear and angular part\
s.
Vector3  linear
Vector3  angular

定義されているmsg型を利用してmsgを作ることができる↑

Vecter3を調べてみる

$ e Vector3.msg
# This represents a vector in free space.
# It is only meant to represent a direction. Therefore, it does not
# make sense to apply a translation to it (e.g., when applying a
# generic rigid transformation to a Vector3, tf2 will only apply the
# rotation). If you want your data to be translatable too, use the
# geometry_msgs/Point message instead.

float64 x
float64 y
float64 z

指定したパッケージで使用されているメッセージ

$ rosmsg package turtlesim 
turtlesim/Color
turtlesim/Pose

メッセージファイルの作成

msgデイレクトリにmsgファイルを作成

$ roscd sample
$ mkdir msg
$ cd msg
$ touch msg_sample.msg

msg_sample.msgの中身↓

int32 data

package.xmlに以下を追加

<build_depend>message_generation</build_depend>
<run_depend>message_runtime</run_depend>

CMakeLists.txtに以下を追加

find_package(catkin REQUIRED COMPONENTS
   rospy
   std_msgs
   message_generation # 追加
)
add_message_files(
  FILES 
   msg_sample.msg
)
generate_message(
    DEPENDENCIES
    std_msgs
)
catkin_package(
  CATKIN_DEPENDS rospy std_msgs message_runtime #  message_runtimeを追加
)

スクリプトから呼び出すときは

from sample.msg import msg_sample