Point Cloud Library (PCL)  1.14.1-dev
synchronizer.h
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2011, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of the copyright holder(s) nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  * Author: Nico Blodow (blodow@in.tum.de), Suat Gedikli (gedikli@willowgarage.com)
34  */
35 
36 #pragma once
37 
38 #include <deque>
39 #include <functional>
40 #include <map>
41 #include <mutex>
42 #include <utility>
43 
44 namespace pcl
45 {
46  /** /brief This template class synchronizes two data streams of different types.
47  * The data can be added using add0 and add1 methods which expects also a timestamp of type unsigned long.
48  * If two matching data objects are found, registered callback functions are invoked with the objects and the time stamps.
49  * The only assumption of the timestamp is, that they are in the same unit, linear and strictly monotonic increasing.
50  * If filtering is desired, e.g. thresholding of time differences, the user can do that in the callback method.
51  * This class is thread safe.
52  * /ingroup common
53  */
54  template <typename T1, typename T2>
56  {
57  using T1Stamped = std::pair<unsigned long, T1>;
58  using T2Stamped = std::pair<unsigned long, T2>;
59  std::mutex mutex1_;
60  std::mutex mutex2_;
61  std::mutex publish_mutex_;
62  std::deque<T1Stamped> queueT1;
63  std::deque<T2Stamped> queueT2;
64 
65  using CallbackFunction = std::function<void(T1, T2, unsigned long, unsigned long)>;
66 
67  std::map<int, CallbackFunction> cb_;
68  int callback_counter = 0;
69  public:
70 
71  int
72  addCallback (const CallbackFunction& callback)
73  {
74  std::unique_lock<std::mutex> publish_lock (publish_mutex_);
75  cb_[callback_counter] = callback;
76  return callback_counter++;
77  }
78 
79  void
81  {
82  std::unique_lock<std::mutex> publish_lock (publish_mutex_);
83  cb_.erase (i);
84  }
85 
86  void
87  add0 (const T1& t, unsigned long time)
88  {
89  mutex1_.lock ();
90  queueT1.push_back (T1Stamped (time, t));
91  mutex1_.unlock ();
92  publish ();
93  }
94 
95  void
96  add1 (const T2& t, unsigned long time)
97  {
98  mutex2_.lock ();
99  queueT2.push_back (T2Stamped (time, t));
100  mutex2_.unlock ();
101  publish ();
102  }
103 
104  private:
105 
106  void
107  publishData ()
108  {
109  std::unique_lock<std::mutex> lock1 (mutex1_);
110  std::unique_lock<std::mutex> lock2 (mutex2_);
111 
112  for (const auto& cb: cb_)
113  {
114  if (cb.second)
115  {
116  cb.second.operator()(queueT1.front ().second, queueT2.front ().second, queueT1.front ().first, queueT2.front ().first);
117  }
118  }
119 
120  queueT1.pop_front ();
121  queueT2.pop_front ();
122  }
123 
124  void
125  publish ()
126  {
127  // only one publish call at once allowed
128  std::unique_lock<std::mutex> publish_lock (publish_mutex_);
129 
130  std::unique_lock<std::mutex> lock1 (mutex1_);
131  if (queueT1.empty ())
132  return;
133  T1Stamped t1 = queueT1.front ();
134  lock1.unlock ();
135 
136  std::unique_lock<std::mutex> lock2 (mutex2_);
137  if (queueT2.empty ())
138  return;
139  T2Stamped t2 = queueT2.front ();
140  lock2.unlock ();
141 
142  bool do_publish = false;
143 
144  if (t1.first <= t2.first)
145  { // iterate over queue1
146  lock1.lock ();
147  while (queueT1.size () > 1 && queueT1[1].first <= t2.first)
148  queueT1.pop_front ();
149 
150  if (queueT1.size () > 1)
151  { // we have at least 2 measurements; first in past and second in future -> find out closer one!
152  if ( (t2.first << 1) > (queueT1[0].first + queueT1[1].first) )
153  queueT1.pop_front ();
154 
155  do_publish = true;
156  }
157  lock1.unlock ();
158  }
159  else
160  { // iterate over queue2
161  lock2.lock ();
162  while (queueT2.size () > 1 && (queueT2[1].first <= t1.first) )
163  queueT2.pop_front ();
164 
165  if (queueT2.size () > 1)
166  { // we have at least 2 measurements; first in past and second in future -> find out closer one!
167  if ( (t1.first << 1) > queueT2[0].first + queueT2[1].first )
168  queueT2.pop_front ();
169 
170  do_publish = true;
171  }
172  lock2.unlock ();
173  }
174 
175  if (do_publish)
176  publishData ();
177  }
178  } ;
179 } // namespace
/brief This template class synchronizes two data streams of different types.
Definition: synchronizer.h:56
void removeCallback(int i)
Definition: synchronizer.h:80
int addCallback(const CallbackFunction &callback)
Definition: synchronizer.h:72
void add1(const T2 &t, unsigned long time)
Definition: synchronizer.h:96
void add0(const T1 &t, unsigned long time)
Definition: synchronizer.h:87