Source code for mobrob_behcon.core.resolver

#!/usr/bin/env python

#import sys
#from os import path
#sys.path.append( path.dirname( path.dirname( path.abspath(__file__) ) ) )

import rospy
from geometry_msgs.msg import Twist
from mobrob_behcon.desires.desire import Desire
from mobrob_behcon.desires.desires import DesCmdVel, DesTransVel, DesTransDir, DesRotVel
from mobrob_behcon.behaviours.behaviour import Behaviour
import copy
import math
import time

[docs]class Resolver: """ The class Resolver It resolves a output for the actuators from given desires. Currently it understands desires fo following type: - DesCmdVel """
[docs] def __init__(self): """ Construct a new 'Resolver' object. :return: returns nothing """ self.lst_behaviours = [] self.lst_desires_vel = [] self.lst_des_transvel = [] self.lst_des_rotvel = [] self.lst_des_transdir = [] # ROS publisher self.pub_cmd_vel = rospy.Publisher('cmd_vel', Twist, queue_size=10) self.last_cmd_msg = None self.last_sendtime_vel = 0
[docs] def add_desire(self, desire): """ Add new desire to resolver :param desire: a desire to be added to the resolver :type desire: Desire :return: returns nothing """ if isinstance(desire, DesCmdVel): self.lst_desires_vel.append(desire) elif isinstance(desire, DesTransVel): self.lst_des_transvel.append(desire) elif isinstance(desire, DesTransDir): self.lst_des_transdir.append(desire) elif isinstance(desire, DesRotVel): self.lst_des_rotvel.append(desire) else: print("Type of desire not known: " + type(desire).__name__)
[docs] def reset(self): """ Reset all lists (desires and behaviours) """ self.lst_behaviours = [] self.lst_desires_vel = [] self.lst_des_transvel = [] self.lst_des_rotvel = [] self.lst_des_transdir = []
[docs] def resetDesires(self): """ Reset all desire lists """ self.lst_desires_vel = [] self.lst_des_transvel = [] self.lst_des_rotvel = [] self.lst_des_transdir = []
[docs] def set_behaviour_lst(self, behaviours): """ set the new active behaviour list :param lst_behaviour: list of behaviour which are currently active :type lst_behaviour: list(Behaviour) """ self.lst_behaviours = behaviours
[docs] def resolveDesire(self, lst_desires): """ function to resolve a value from a list of similar desires :param lst_desires: a list of desires (all items of the same type) :type lst_desires: list[Desire] :return: returns the resolved value (depending on type of value in desire, can also be an array) """ output_value = 0 if len(lst_desires) > 0: # sort list of desires based on priority (high to low) lst_desires.sort(key=lambda x: x.priority, reverse=True) # print type of desires and whole list #print("Desire-Type: " + type(lst_desires[0]).__name__) #for d in lst_desires: # print(d) # create resulting desire (mostly used to track the value and strength during next calculations) result_desire = Desire(0, 0) # declare variable to count desires with the same priority num_desire_prio = 0 # declare index variable for outer while loop i = 0 while result_desire.strength < 1.0 and i < len(lst_desires): # reset counter variable num_desire_prio = 0 # create temporary desire to calculate weighted mean of desires with same priority temp_desire = Desire(0, 0) for k in range(len(lst_desires)): # loop through list of desires and find desires with same priority if lst_desires[k].priority == lst_desires[i].priority: # add weighted value and strength of selected desire to temporary desire temp_desire.value = temp_desire.value + lst_desires[k].value * lst_desires[k].strength temp_desire.strength = temp_desire.strength + lst_desires[k].strength num_desire_prio = num_desire_prio + 1 # calc average of the strengths (maybe not the right way?!) #temp_desire.strength = temp_desire.strength / num_desire_prio # add strengths and value to result_desire result_desire.value = result_desire.value + temp_desire.value result_desire.strength = result_desire.strength + temp_desire.strength # jump over all desires with the same priority in list i = i + num_desire_prio output_value = result_desire.value / result_desire.strength return output_value
[docs] def runOnce(self): """ Run one cycle of the resolver to calculate output values and publish corresponding messages """ # clear all desire lists self.resetDesires() # trigger the fire()-function of all active behaviours for beh in self.lst_behaviours: beh.fire() # resolve desires to get velocity values transvel = self.resolveDesire(self.lst_des_transvel) transdir = self.resolveDesire(self.lst_des_transdir) rotvel = self.resolveDesire(self.lst_des_rotvel) # create cmd_vel msg out of resolved values cmd_msg = Twist() cmd_msg.linear.x = transvel * math.cos(math.radians(transdir)) cmd_msg.linear.y = transvel * math.sin(math.radians(transdir)) cmd_msg.linear.z = 0.0 cmd_msg.angular.x = 0.0 cmd_msg.angular.y = 0.0 cmd_msg.angular.z = rotvel currenttime = Resolver.get_current_time() diff_to_last = 0 # calc difference between last send cmd_vel command if self.last_cmd_msg: diff_to_last = abs(self.last_cmd_msg.linear.x - cmd_msg.linear.x) \ + abs(self.last_cmd_msg.linear.y - cmd_msg.linear.y) \ + abs(self.last_cmd_msg.angular.z - cmd_msg.angular.z) # check if a update need to be send if diff_to_last > 0.01 or currenttime - self.last_sendtime_vel > 1000: rospy.loginfo("Resolver: transvel=%f, rotvel=%f", transvel, rotvel) self.pub_cmd_vel.publish(cmd_msg) self.last_sendtime_vel = currenttime self.last_cmd_msg = cmd_msg
#cmd_value = self.resolveDesire(self.lst_desires_vel) #print("Resolver: cmd value = " + str(cmd_value)) #cmd_msg = Twist() #cmd_msg.linear.x = cmd_value[0] #cmd_msg.linear.y = cmd_value[1] #cmd_msg.linear.z = cmd_value[2] #cmd_msg.angular.x = cmd_value[3] #cmd_msg.angular.y = cmd_value[4] #cmd_msg.angular.z = cmd_value[5] # publish velocity command
[docs] @staticmethod def get_current_time(): """ Get current time in milliseconds :return: time in milliseconds """ return int(round(time.time() * 1000))