Source code for mobrob_behcon.connectors.camera
#!/usr/bin/env python
import socket
import base64
import io
from io import BytesIO
import numpy as np
from PIL import Image
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
import cv2
[docs]class CameraTCP(object):
"""
The class CameraTCP
Represents a connector to the Camera on the robot.
Provides a function to get a image from camera.
"""
BUFFER_SIZE = 16
MESSAGE = b'getNewFrame'
[docs] def __init__(self, ip_address, port):
"""
constructor
:param ip_address: ip address of camera server
:param port: port of camera server
:type ip_address: string
:type port: int
"""
self.ip_address = ip_address
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.ip_address, self.port))
[docs] def get_frame(self):
"""
Get new frame from camera
:return: returns current frame from camera
"""
self.socket.send(self.MESSAGE)
data = self.socket.recv(self.BUFFER_SIZE)
size = int(data)
data = CameraTCP.recvall(self.socket, size)
frame = CameraTCP.stringToRGB(data)
return frame
[docs] @staticmethod
def stringToRGB(base64_string):
"""
Convert base64 encoded image to OpenCV image
:param base64_string: image encoded as base64 string
:type base64_string: string
:return: returns image as OpenCV image
"""
image = base64.b64decode(base64_string)
image = BytesIO(image)
image = Image.open(image)
image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
return image
[docs] @staticmethod
def recvall(sock, n):
"""
Helper function to recv n bytes or return None if EOF is hit
:param sock: socket
:type sock: TCPServer
:param n: number of bytes
:type n: int
:return: returns data as bytearray
"""
data = bytearray()
while len(data) < n:
packet = sock.recv(n - len(data))
if not packet:
return None
data.extend(packet)
return data