00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "interactive_marker_client.h"
00031 #include <ros/console.h>
00032
00033 namespace rviz
00034 {
00035
00036 InteractiveMarkerClient::PublisherContext::PublisherContext()
00037 {
00038 update_seen = false;
00039 init_seen = false;
00040 last_update_seq_num = 0;
00041 last_init_seq_num = 0;
00042 update_time_ok = true;
00043 initialized = false;
00044 last_update_time = ros::Time::now();
00045 }
00046
00047
00048
00049 bool InteractiveMarkerClient::PublisherContext::checkInitWith(const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& update)
00050 {
00051 if( (init_seen &&
00052 update->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE &&
00053 last_init_seq_num + 1 >= update->seq_num)
00054 ||
00055 (init_seen &&
00056 update->type == visualization_msgs::InteractiveMarkerUpdate::KEEP_ALIVE &&
00057 last_init_seq_num >= update->seq_num) )
00058 {
00059 initialized = true;
00060 }
00061 return initialized;
00062 }
00063
00064
00065
00066 bool InteractiveMarkerClient::PublisherContext::checkInitWith(const visualization_msgs::InteractiveMarkerInit::ConstPtr& init)
00067 {
00068 M_InteractiveMarkerUpdate::const_iterator q_it_same_seq = update_queue.find( init->seq_num );
00069 M_InteractiveMarkerUpdate::const_iterator q_it_seq_plus_one = update_queue.find( init->seq_num + 1 );
00070
00071 if( (update_seen &&
00072 init->seq_num + 1 >= last_update_seq_num + 1) ||
00073 (q_it_seq_plus_one != update_queue.end() && q_it_seq_plus_one->second->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE) ||
00074 (q_it_same_seq != update_queue.end() && q_it_same_seq->second->type == visualization_msgs::InteractiveMarkerUpdate::KEEP_ALIVE) )
00075 {
00076 initialized = true;
00077 }
00078 return initialized;
00079 }
00080
00081 void InteractiveMarkerClient::PublisherContext::enqueueUpdate(const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& update)
00082 {
00083 if( update->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE )
00084 {
00085 update_queue[ update->seq_num ] = update;
00086 }
00087 else
00088 {
00089
00090
00091 if( update_queue.find( update->seq_num ) == update_queue.end() )
00092 {
00093 update_queue[ update->seq_num ] = update;
00094 }
00095 }
00096 update_seen = true;
00097 last_update_seq_num = update->seq_num;
00098 }
00099
00100
00101 InteractiveMarkerClient::InteractiveMarkerClient( InteractiveMarkerReceiver* receiver )
00102 : cleared_( true )
00103 , subscribed_to_init_( false )
00104 {
00105 receiver_ = receiver;
00106 }
00107
00108
00109 void InteractiveMarkerClient::processMarkerInit(const visualization_msgs::InteractiveMarkerInit::ConstPtr& marker_init)
00110 {
00111 ROS_DEBUG("InteractiveMarkerClient: %s INIT %lu",
00112 marker_init->server_id.c_str(),
00113 marker_init->seq_num);
00114
00115
00116 if ( marker_init->server_id.empty() )
00117 {
00118 receiver_->setStatusError( "Topic", "server_id is empty!");
00119 }
00120
00121 M_PublisherContext::iterator context_it = publisher_contexts_.find(marker_init->server_id);
00122
00123
00124 if ( context_it == publisher_contexts_.end() )
00125 {
00126 PublisherContextPtr pc(new PublisherContext());
00127 context_it = publisher_contexts_.insert( std::make_pair(marker_init->server_id,pc) ).first;
00128 }
00129
00130 PublisherContextPtr context = context_it->second;
00131
00132
00133
00134
00135 if( context->initialized )
00136 {
00137 return;
00138 }
00139
00140 if( context->checkInitWith( marker_init ))
00141 {
00142 receiver_->processMarkerChanges( &marker_init->markers );
00143 cleared_ = false;
00144
00145 context->last_init_seq_num = marker_init->seq_num;
00146 context->init_seen = true;
00147 context->last_update_time = ros::Time::now();
00148
00149
00150 receiver_->setStatusOk( context_it->first, "Initialization complete.");
00151
00152
00153
00154
00155
00156 context->last_update_seq_num = context->last_init_seq_num;
00157
00158
00159 maybeUnsubscribeFromInit();
00160
00161 playbackUpdateQueue( context );
00162 }
00163 else if( context->update_queue.empty() )
00164 {
00165
00166
00167
00168
00169
00170
00171
00172 if( context->init_seen )
00173 {
00174 reinit();
00175 }
00176
00177 receiver_->processMarkerChanges( &marker_init->markers );
00178 cleared_ = false;
00179
00180 context->last_init_seq_num = marker_init->seq_num;
00181 context->init_seen = true;
00182 context->last_update_time = ros::Time::now();
00183 }
00184
00185 }
00186
00187
00188 void InteractiveMarkerClient::playbackUpdateQueue( PublisherContextPtr& context )
00189 {
00190 uint64_t next_seq_needed = context->last_init_seq_num + 1;
00191
00192 M_InteractiveMarkerUpdate::iterator update_it = context->update_queue.begin();
00193 for( ; update_it != context->update_queue.end(); update_it++ )
00194 {
00195 visualization_msgs::InteractiveMarkerUpdate::ConstPtr update = update_it->second;
00196
00197 if( update->seq_num == next_seq_needed )
00198 {
00199 applyUpdate( update, context );
00200 next_seq_needed++;
00201 }
00202 else if( update->seq_num < next_seq_needed )
00203 {
00204 ROS_DEBUG("Ignoring unneeded queued update number %lu, looking for %lu.", update->seq_num, next_seq_needed);
00205 }
00206 else
00207 {
00208 ROS_ERROR("Found queued update number %lu, missed %lu.", update->seq_num, next_seq_needed);
00209 }
00210 }
00211 context->update_queue.clear();
00212 }
00213
00214 void InteractiveMarkerClient::processMarkerUpdate(const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& marker_update)
00215 {
00216
00217
00218
00219
00220
00221
00222 if ( marker_update->server_id.empty() )
00223 {
00224 receiver_->setStatusError( "Topic", "server_id is empty!");
00225 }
00226
00227 M_PublisherContext::iterator context_it = publisher_contexts_.find(marker_update->server_id);
00228
00229
00230
00231
00232 if ( context_it == publisher_contexts_.end() )
00233 {
00234 PublisherContextPtr pc(new PublisherContext());
00235
00236 pc->enqueueUpdate(marker_update);
00237
00238 context_it = publisher_contexts_.insert( std::make_pair(marker_update->server_id,pc) ).first;
00239
00240 reinit();
00241
00242
00243
00244 return;
00245 }
00246
00247 PublisherContextPtr context = context_it->second;
00248
00249 if ( !context->initialized )
00250 {
00251 if( context->checkInitWith( marker_update ))
00252 {
00253
00254 receiver_->setStatusOk( context_it->first, "Initialization complete.");
00255
00256
00257
00258
00259
00260 context->last_update_seq_num = context->last_init_seq_num;
00261
00262
00263 maybeUnsubscribeFromInit();
00264 }
00265 else
00266 {
00267
00268
00269 receiver_->setStatusWarn( marker_update->server_id, "Received update or keep-alive without previous INIT message. It might be lost.");
00270 context->enqueueUpdate(marker_update);
00271 return;
00272 }
00273 }
00274
00275
00276
00277 applyUpdate( marker_update, context );
00278 }
00279
00280 void InteractiveMarkerClient::clear()
00281 {
00282 publisher_contexts_.clear();
00283 reinit();
00284 }
00285
00286 void InteractiveMarkerClient::unsubscribedFromInit()
00287 {
00288 subscribed_to_init_ = false;
00289 }
00290
00291 void InteractiveMarkerClient::reinit()
00292 {
00293 if( !cleared_ )
00294 {
00295 receiver_->clearMarkers();
00296 cleared_ = true;
00297 }
00298 if( !subscribed_to_init_ )
00299 {
00300 subscribed_to_init_ = receiver_->subscribeToInit();
00301 }
00302
00303
00304 M_PublisherContext::iterator context_it;
00305 for( context_it = publisher_contexts_.begin(); context_it != publisher_contexts_.end(); context_it++ )
00306 {
00307 context_it->second->initialized = false;
00308 }
00309 }
00310
00311
00312 void InteractiveMarkerClient::maybeUnsubscribeFromInit() {
00313 M_PublisherContext::iterator context_it;
00314 for( context_it = publisher_contexts_.begin(); context_it != publisher_contexts_.end(); context_it++ )
00315 {
00316 if( !context_it->second->initialized )
00317 {
00318 return;
00319 }
00320 }
00321 receiver_->unsubscribeFromInit();
00322 subscribed_to_init_ = false;
00323 }
00324
00325 void InteractiveMarkerClient::applyUpdate( const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& marker_update,
00326 PublisherContextPtr& context )
00327 {
00328 uint64_t expected_seq_num = 0;
00329
00330 switch ( marker_update->type )
00331 {
00332 case visualization_msgs::InteractiveMarkerUpdate::UPDATE:
00333 expected_seq_num = context->last_update_seq_num + 1;
00334 break;
00335
00336 case visualization_msgs::InteractiveMarkerUpdate::KEEP_ALIVE:
00337 expected_seq_num = context->last_update_seq_num;
00338 break;
00339 }
00340
00341 if ( marker_update->seq_num != expected_seq_num )
00342 {
00343 if( marker_update->seq_num < expected_seq_num )
00344 {
00345 ROS_INFO("Received sequence number %lu, less than expected sequence number %lu. Ignoring.",
00346 marker_update->seq_num, expected_seq_num);
00347 return;
00348 }
00349
00350
00351 std::ostringstream s;
00352 s << "Detected lost update or server restart. Resetting. Reason: Received wrong sequence number (expected: " <<
00353 expected_seq_num << ", received: " << marker_update->seq_num << ")";
00354 receiver_->setStatusError( marker_update->server_id, s.str());
00355 reinit();
00356 return;
00357 }
00358
00359 context->last_update_seq_num = marker_update->seq_num;
00360 context->update_seen = true;
00361 context->last_update_time = ros::Time::now();
00362
00363 if( marker_update->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE )
00364 {
00365 receiver_->processMarkerChanges( &marker_update->markers, &marker_update->poses, &marker_update->erases );
00366 cleared_ = false;
00367 }
00368 }
00369
00370 bool InteractiveMarkerClient::isPublisherListEmpty()
00371 {
00372 return publisher_contexts_.empty();
00373 }
00374
00375 void InteractiveMarkerClient::flagLateConnections()
00376 {
00377 M_PublisherContext::iterator it;
00378 for ( it = publisher_contexts_.begin(); it != publisher_contexts_.end(); it++ )
00379 {
00380 PublisherContextPtr& context = it->second;
00381 double time_since_last_update = (ros::Time::now() - context->last_update_time).toSec();
00382 if ( time_since_last_update > 1.0 )
00383 {
00384 std::stringstream s;
00385 s << "No update received for " << (int)time_since_last_update << " seconds. Connection might be lost.";
00386 receiver_->setStatusWarn( it->first, s.str() );
00387 context->update_time_ok = false;
00388 }
00389 if ( !context->update_time_ok && time_since_last_update <= 1.0 )
00390 {
00391 receiver_->setStatusOk( it->first, "OK" );
00392 }
00393 }
00394 }
00395
00396 }