cam_vision 4.74 KB
Newer Older
1 2 3 4
#!/usr/bin/python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
5
from std_msgs.msg import String
6 7 8 9 10 11 12 13 14 15
import  numpy as np
import sys, cv2
from cv_bridge import CvBridge


class CameraVision(Node):

    def __init__(self, name="cam_vision", timerFreq = 1/60.0):
        super().__init__(name) # Create the node

16 17 18 19 20 21 22
        # Initialiaze node variables
        self._low = np.array([58-20, 60, 40])
        self._high = np.array([58+20, 255,255])
        self._kernel = np.ones((3, 3), np.uint8)

        self._color_info = (0, 0, 255)

23 24
        # Initialize subscribers
        self.create_subscription(Image, 'cam_rgb', self.rgb_callback, 10)
Inès EL HADRI's avatar
Inès EL HADRI committed
25 26 27
        # self.create_subscription(Image, 'cam_depth', self.depth_callback, 10)
        # self.create_subscription(Image, 'cam_infra_1', self.infra1_callback, 10)
        # self.create_subscription(Image, 'cam_infra_2', self.infra2_callback, 10)
28 29 30 31 32
        # Initialize subcriber variables
        self._rgb_img = None
        self._depth_img = None
        self._infra1_img = None
        self._infra2_img = None
33 34

        # Initialize publishers
Inès EL HADRI's avatar
Inès EL HADRI committed
35 36 37
        self._rgb_detection_publisher = self.create_publisher(Image, 'rgb_detection', 10)
        self._mask_detection_publisher = self.create_publisher(Image, 'mask_detection', 10)
        self._detection_publisher = self.create_publisher(String, 'detection', 10)
38 39 40 41 42 43 44 45 46 47

        # Initialize a clock for the publisher
        self.create_timer(timerFreq, self.loop)

        # Initialize a ROS2 Image to CV2 image converter
        self._bridge = CvBridge()

    # Callbacks
    def rgb_callback(self, img):
        self._rgb_img = self._bridge.imgmsg_to_cv2(img, desired_encoding='passthrough')
Inès EL HADRI's avatar
Inès EL HADRI committed
48 49 50 51 52 53
    # def depth_callback(self, img):
    #     self._depth_img = self._bridge.imgmsg_to_cv2(img, desired_encoding='passthrough')
    # def infra1_callback(self, img):
    #     self._infra1_img = self._bridge.imgmsg_to_cv2(img, desired_encoding='passthrough')
    # def infra2_callback(self, img):
    #     self._infra2_img = self._bridge.imgmsg_to_cv2(img, desired_encoding='passthrough')
54 55 56 57

        
    # Node internal loop
    def loop(self):
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
        if self._rgb_img is None :
            return
        
        # Convert image to HSV for easier thresholding
        image = cv2.cvtColor(self._rgb_img, cv2.COLOR_BGR2HSV)

        # Seuillage par colorimétrie
        mask = cv2.inRange(image, self._low, self._high)
        beforeErrosion = mask

        # Réduction du bruit
        mask = cv2.erode(mask, self._kernel, iterations=4)
        mask = cv2.dilate(mask, self._kernel, iterations=4)
        # print(mask)

        # cv2.imshow('Camera', self._rgb_img)
        # cv2.imshow('Before errosion', beforeErrosion)
        # cv2.imshow('Mask', mask)

        # Segmentation using the "Min enclosing circle" method
        elements = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)[-2]
        if len(elements) > 0: # If we detect elements
            c = max(elements, key=cv2.contourArea)
            (x, y), radius = cv2.minEnclosingCircle(c)

            if radius > 20: #If the radius of the biggest element detected is greater than 30, we consider that we have detected a bottle
                # Circle the bottle on the image 
                cv2.circle(image, (int(x), int(y)), int(radius), self._color_info, 2)
                cv2.circle(image, (int(x), int(y)), 5, self._color_info, 10)

                # Convert cv2 images to ROS2 RGB Image
                ros2_mask_image = cv2.cvtColor(mask, cv2.COLOR_GRAY2RGB)
                ros2_mask_image = cv2.cvtColor(ros2_mask_image, cv2.COLOR_RGB2BGR)
                ros2_mask_image = self._bridge.cv2_to_imgmsg(ros2_mask_image,"bgr8")
                ros2_mask_image.header.stamp = self.get_clock().now().to_msg()
                ros2_mask_image.header.frame_id = "image"

Inès EL HADRI's avatar
Inès EL HADRI committed
95 96 97 98
                ros2_rgb_image = cv2.cvtColor(image, cv2.COLOR_HSV2BGR)
                ros2_rgb_image = self._bridge.cv2_to_imgmsg(ros2_rgb_image,"bgr8")
                ros2_rgb_image.header.stamp = self.get_clock().now().to_msg()
                ros2_rgb_image.header.frame_id = "image"
99 100 101 102 103


                # Publish String message -> to inform that we have detected a bottle
                str = String()
                str.data = "A bottle has been detected"
Inès EL HADRI's avatar
Inès EL HADRI committed
104
                self._detection_publisher.publish(str)
105
                # Publish Image -> to be able to detect false positives and improve the algorithm
Inès EL HADRI's avatar
Inès EL HADRI committed
106 107
                self._rgb_detection_publisher.publish(ros2_rgb_image)
                self._mask_detection_publisher.publish(ros2_mask_image)
108 109 110



111 112 113 114 115 116 117 118
    
    

def main():
    """
    Main loop
    """
    rclpy.init()
119
    rosNode = CameraVision(timerFreq=1.0/120)
120 121 122 123 124 125 126 127
    rclpy.spin(rosNode)


    rosNode.destroy_node()
    rclpy.shutdown()

if __name__ == "__main__":
    main()