cam_vision 7.76 KB
Newer Older
1 2 3 4
#!/usr/bin/python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
5
from std_msgs.msg import String, Float32MultiArray
6
from geometry_msgs.msg import Point
7
import  numpy as np
8
import cv2, pathlib, statistics
9
from cv_bridge import CvBridge
10 11
from cam_driver import CameraDriver
import pyrealsense2 as rs
12 13


Inès EL HADRI's avatar
Inès EL HADRI committed
14

15 16
class CameraVision(CameraDriver):
    
17 18

    def __init__(self, name="cam_vision", timerFreq = 1/60.0):
19 20
        super().__init__(name, timerFreq) # Create the node

21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
        # Load parameters
        directory = pathlib.PurePath(__file__).parent
        try:
            # Load HSV threshold params from file
            file = open(str(directory / "config" / "hsv_params" ), "r")

            # Parse string
            text = file.read()
            low = text.split(",")[0].strip('][ ')
            high = text.split(",")[1].strip('][ ')

            self._low = np.array([int(i) for i in low.split(" ") if i])
            self._high = np.array([int(i) for i in high.split(" ") if i])

            print(f"Successfully loaded HSV params :\n - low : {self._low}\n - high {self._high}")
        except:
            # Initialize default variables if no file exists
            self._low = np.array([60-25, 50, 50])
            self._high = np.array([65+25, 255, 255])

            print("Set HSV params to default values :\n - low : {self._low}\n - high {self._high}")
42 43 44 45 46 47 48 49

        # Node parameters
        self.declare_parameters(
            namespace='',
            parameters=[
                ('DEPTH_CHECK_RADIUS', 10)
            ])

50 51
        
        # Node variables
52
        self._kernel = np.ones((5, 5), np.uint8)
53 54
        self._color_info = (0, 0, 255)

55 56
        # Initialize a ROS2 Image to CV2 image converter
        self._bridge = CvBridge()
57

58
    def init_publishers(self, timerFreq: float):
59 60 61 62
        # Initialize publishers
        self._rgb_detection_publisher = self.create_publisher(Image, 'rgb_detection', 10)
        self._mask_detection_publisher = self.create_publisher(Image, 'mask_detection', 10)
        self._detection_publisher = self.create_publisher(String, 'detection', 10)
63
        self._bottle_relative_pos_publisher = self.create_publisher(Point, 'bottle_relative_pos', 10)
64 65 66

        # Initialize a clock for the publisher
        self.create_timer(timerFreq, self.loop)
67
        self.create_timer(1, self.publish_imgs) # Limit the publishing of images to 1 each second (to prevent network issues)
68
            
69 70 71
    def publish_imgs(self):
        if self._rgb_detection is None or self._mask_detection is None:
            return
72

73 74
        # Convert cv2 images to ROS2 RGB Image
        ros2_mask_image = cv2.cvtColor(self._mask_detection, cv2.COLOR_GRAY2RGB)
75 76 77 78 79
        ros2_mask_image = cv2.cvtColor(ros2_mask_image, cv2.COLOR_RGB2BGR)
        ros2_mask_image = self._bridge.cv2_to_imgmsg(ros2_mask_image,"bgr8")
        ros2_mask_image.header.stamp = self.get_clock().now().to_msg()
        ros2_mask_image.header.frame_id = "image"

80
        ros2_rgb_image = cv2.cvtColor(self._rgb_detection, cv2.COLOR_HSV2BGR)
81 82 83 84
        ros2_rgb_image = self._bridge.cv2_to_imgmsg(ros2_rgb_image,"bgr8")
        ros2_rgb_image.header.stamp = self.get_clock().now().to_msg()
        ros2_rgb_image.header.frame_id = "image"

85 86 87 88 89 90 91 92 93 94 95
        # Publish Images -> to be able to detect false positives and improve the algorithm
        self._rgb_detection_publisher.publish(ros2_rgb_image)
        self._mask_detection_publisher.publish(ros2_mask_image)


        # Reset images
        self._rgb_detection = None
        self._mask_detection = None


    def publish_data(self, xDist, yDist):
96 97 98 99 100

        # Publish String message -> to inform that we have detected a bottle
        str = String()
        str.data = "A bottle has been detected"
        self._detection_publisher.publish(str)
Inès EL HADRI's avatar
Inès EL HADRI committed
101 102

        # Depth publish
103 104 105 106
        relative_position = Point()
        relative_position.x = xDist
        relative_position.y = yDist
        self._bottle_relative_pos_publisher.publish(relative_position)
107
        
108 109


110 111
    # Node internal loop
    def loop(self):
112 113 114 115 116
        """
        Main loop to call
        """
        # update images
        if self._update_imgs() is None :
117
            return
118 119

        color_image = np.asanyarray(self._color_frame.get_data())
120 121
        
        # Convert image to HSV for easier thresholding
122
        image = cv2.cvtColor(color_image, cv2.COLOR_BGR2HSV)
123 124 125 126 127 128 129 130 131

        # Seuillage par colorimétrie
        mask = cv2.inRange(image, self._low, self._high)

        # Réduction du bruit
        mask = cv2.erode(mask, self._kernel, iterations=4)
        mask = cv2.dilate(mask, self._kernel, iterations=4)


132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
        # Segmentation using shape matching
        directory = pathlib.PurePath(__file__).parent
        pattern = cv2.imread(str(directory / 'img' / 'template.jpg'), cv2.IMREAD_GRAYSCALE)
        pattern2 = cv2.imread(str(directory / 'img' / 'template2.jpg'), cv2.IMREAD_GRAYSCALE)

        contours,_ = cv2.findContours(pattern, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
        sorted_contours = sorted(contours, key=cv2.contourArea, reverse=True)
        template_contour = sorted_contours[0]

        contours,_ = cv2.findContours(pattern2, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
        sorted_contours = sorted(contours, key=cv2.contourArea, reverse=True)
        template2_contour = sorted_contours[0]
        
        contours,_ = cv2.findContours(mask, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_SIMPLE)
        sorted_contours = sorted(contours, key=cv2.contourArea, reverse=True)
        

Inès EL HADRI's avatar
Inès EL HADRI committed
149 150 151

        findPt = False
        (x, y) = 0, 0
152
        for c in sorted_contours:
153 154 155 156 157 158
        
            # Compare our mask with our templates
            match = cv2.matchShapes(template_contour, c, 3, 0.0)
            match2 = cv2.matchShapes(template2_contour, c, 3, 0.0)

            # Get size of object
159
            (x, y), radius = cv2.minEnclosingCircle(c)
Inès EL HADRI's avatar
Inès EL HADRI committed
160

161
            # If we find a correct match that is big enough, we publish data
162
            if min(match, match2) < 0.3 and radius > 30:
Inès EL HADRI's avatar
Inès EL HADRI committed
163
                findPt = True
164

Inès EL HADRI's avatar
Inès EL HADRI committed
165
                break # Stop checking other contours
166

167

Inès EL HADRI's avatar
Inès EL HADRI committed
168 169 170 171 172 173
        # point found
        if findPt:
            # Circle the bottle on the image 
            cv2.circle(image, (int(x), int(y)), int(radius), self._color_info, 2)
            cv2.circle(image, (int(x), int(y)), 5, self._color_info, 10)

174 175 176
            # Store images for publishing
            self._rgb_detection = image
            self._mask_detection = mask
Inès EL HADRI's avatar
Inès EL HADRI committed
177
        
178
            x,y = int(x), int(y)
179 180 181
            
            # Depth calculus            
            xDist, yDist = self.pixelToRelativePos(x,y)
Inès EL HADRI's avatar
Inès EL HADRI committed
182
        
183 184
            # Publish string and position message
            self.publish_data(xDist, yDist)
Inès EL HADRI's avatar
Inès EL HADRI committed
185 186 187
        


188 189 190
    def pixelToRelativePos(self, x, y):
        # Calculation of the depth value
        color_intrin = self._color_frame.profile.as_video_stream_profile().intrinsics
191
        D_RADIUS = self.paramInt('DEPTH_CHECK_RADIUS')
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
        
        front_dist = []
        lateral_dist = []
        for X in range(x-D_RADIUS, x+D_RADIUS):
            for Y in range(y-D_RADIUS, y+D_RADIUS):
                if (X-x)**2 + (Y-y)**2 <= D_RADIUS:
                    depth = self._depth_dist_frame.get_distance(x, y)
                    dx ,dy, dz = rs.rs2_deproject_pixel_to_point(color_intrin, [x, y], depth)

                    # Store values for average
                    front_dist.append(dz)
                    lateral_dist.append(-dx)

        xDist = statistics.mean(front_dist)
        yDist = statistics.mean(lateral_dist)

        return xDist, yDist
209 210 211 212
   
   
    def paramInt(self, name):
        return self.get_parameter(name).get_parameter_value().integer_value
213 214 215 216 217 218 219 220 221 222


    
    

def main():
    """
    Main loop
    """
    rclpy.init()
223
    rosNode = CameraVision(timerFreq=1.0/60)
224 225 226 227 228 229 230 231
    rclpy.spin(rosNode)


    rosNode.destroy_node()
    rclpy.shutdown()

if __name__ == "__main__":
    main()