OSVR Framework (Internal Development Docs)  0.6-1962-g59773924
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
IPCRingBuffer.cpp
Go to the documentation of this file.
1 
11 // Copyright 2015 Sensics, Inc.
12 //
13 // Licensed under the Apache License, Version 2.0 (the "License");
14 // you may not use this file except in compliance with the License.
15 // You may obtain a copy of the License at
16 //
17 // http://www.apache.org/licenses/LICENSE-2.0
18 //
19 // Unless required by applicable law or agreed to in writing, software
20 // distributed under the License is distributed on an "AS IS" BASIS,
21 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 // See the License for the specific language governing permissions and
23 // limitations under the License.
24 
25 // Internal Includes
26 #include "IPCRingBufferResults.h"
28 #include "SharedMemory.h"
32 #include <osvr/Util/Log.h>
33 #include <osvr/Util/Logger.h>
34 
35 // Library/third-party includes
36 #include <boost/version.hpp>
37 
38 // Standard includes
39 #include <stdexcept>
40 #include <type_traits>
41 #include <utility>
42 
43 namespace osvr {
44 namespace common {
46  static util::log::Logger &getIPCRingBufferLogger() {
47  static util::log::LoggerPtr logger =
48  util::log::make_logger("IPCRingBuffer");
49  return *logger;
50  }
51 
56  static IPCRingBuffer::abi_level_type SHM_SOURCE_ABI_LEVEL = 0;
57 
62 #if (BOOST_VERSION > 106300)
63 #error \
64  "Using an untested Boost version - inspect the Boost Interprocess release notes/changelog to see if any ABI breaks affect us."
65 #endif
66 
67 #ifdef _WIN32
68 #if (BOOST_VERSION < 105400)
69 #error \
70  "Boost Interprocess pre-1.54 on Win32 is ABI-incompatible with newer Boost due to changed bootstamp function."
71 #endif
72 #else // !_WIN32
73 // No obvious ABI breaks through 1.58 seem to apply to us on non-Windows
74 // platforms
75 #endif
76 
77  static_assert(std::is_same<IPCRingBuffer::BackendType,
78  ipc::SharedMemoryBackendType>::value,
79  "The typedefs IPCRingBuffer::BackendType and "
80  "ipc::SharedMemoryBackendType must remain in sync!");
81  static_assert(
82  std::is_same<IPCRingBuffer::value_type, OSVR_ImageBufferElement>::value,
83  "The ring buffer's individual byte type must match the image buffer "
84  "element type.");
85 
86  namespace bip = boost::interprocess;
87  namespace {
88  typedef OSVR_ImageBufferElement BufferType;
89 
90  template <typename T, typename ManagedMemory>
91  using ipc_deleter_type =
92  typename ManagedMemory::template deleter<T>::type;
93 
94  typedef IPCRingBuffer::sequence_type sequence_type;
95 
98  template <typename T, typename ManagedMemory>
99  inline void destroy_unique_instance(ManagedMemory &shm) {
100  auto result = shm.template find<T>(bip::unique_instance);
101  if (result.first) {
102  shm.template destroy<T>(bip::unique_instance);
103  }
104  }
105  } // namespace
106 
107  namespace {
108 
109  static size_t computeRequiredSpace(IPCRingBuffer::Options const &opts) {
110  size_t alignedEntrySize = opts.getEntrySize() + opts.getAlignment();
111  size_t dataSize = alignedEntrySize * (opts.getEntries() + 1);
112  // Give 33% overhead on the raw bookkeeping data
113  static const size_t BOOKKEEPING_SIZE =
114  (sizeof(detail::Bookkeeping) +
115  (sizeof(detail::ElementData) * opts.getEntries())) *
116  4 / 3;
117  return dataSize + BOOKKEEPING_SIZE;
118  }
119 
120  class SharedMemorySegmentHolder {
121  public:
122  SharedMemorySegmentHolder() : m_bookkeeping(nullptr) {}
123  virtual ~SharedMemorySegmentHolder(){};
124 
125  detail::Bookkeeping *getBookkeeping() { return m_bookkeeping; }
126 
127  virtual uint64_t getSize() const = 0;
128  virtual uint64_t getFreeMemory() const = 0;
129 
130  protected:
131  detail::Bookkeeping *m_bookkeeping;
132  };
133 
134  template <typename ManagedMemory>
135  class SegmentHolderBase : public SharedMemorySegmentHolder {
136  public:
137  typedef ManagedMemory managed_memory_type;
138 
139  virtual uint64_t getSize() const { return m_shm->get_size(); }
140  virtual uint64_t getFreeMemory() const {
141  return m_shm->get_free_memory();
142  }
143 
144  protected:
145  unique_ptr<managed_memory_type> m_shm;
146  };
147  template <typename ManagedMemory>
148  class ServerSharedMemorySegmentHolder
149  : public SegmentHolderBase<ManagedMemory> {
150  public:
151  typedef SegmentHolderBase<ManagedMemory> Base;
152  ServerSharedMemorySegmentHolder(IPCRingBuffer::Options const &opts)
153  : m_name(opts.getName()) {
154  getIPCRingBufferLogger().debug() << "Creating segment, name "
155  << opts.getName() << ", size "
156  << computeRequiredSpace(opts);
157  try {
158  removeSharedMemory();
161  Base::m_shm.reset(new ManagedMemory(
162  bip::create_only, opts.getName().c_str(),
163  computeRequiredSpace(opts)));
164  } catch (bip::interprocess_exception &e) {
165  getIPCRingBufferLogger().error()
166  << "Failed to create shared memory segment "
167  << opts.getName() << " with exception: " << e.what();
168  return;
169  }
170  // detail::Bookkeeping::destroy(*Base::m_shm);
171  Base::m_bookkeeping =
172  detail::Bookkeeping::construct(*Base::m_shm, opts);
173  }
174 
175  virtual ~ServerSharedMemorySegmentHolder() {
176  detail::Bookkeeping::destroy(*Base::m_shm);
177  removeSharedMemory();
178  }
179 
180  void removeSharedMemory() {
181  ipc::device_type<ManagedMemory>::remove(m_name.c_str());
182  }
183 
184  private:
185  std::string m_name;
186  };
187 
188  template <typename ManagedMemory>
189  class ClientSharedMemorySegmentHolder
190  : public SegmentHolderBase<ManagedMemory> {
191  public:
192  typedef SegmentHolderBase<ManagedMemory> Base;
193  ClientSharedMemorySegmentHolder(
194  IPCRingBuffer::Options const &opts) {
195  getIPCRingBufferLogger().debug() << "Finding segment, name "
196  << opts.getName();
197  try {
198  Base::m_shm.reset(new ManagedMemory(
199  bip::open_only, opts.getName().c_str()));
200  } catch (bip::interprocess_exception &e) {
201  getIPCRingBufferLogger().error()
202  << "Failed to open shared memory segment "
203  << opts.getName() << " with exception: " << e.what();
204  return;
205  }
206  Base::m_bookkeeping = detail::Bookkeeping::find(*Base::m_shm);
207  }
208 
209  virtual ~ClientSharedMemorySegmentHolder() {}
210 
211  private:
212  };
213 
215  template <typename ManagedMemory>
216  inline unique_ptr<SharedMemorySegmentHolder>
217  constructMemorySegment(IPCRingBuffer::Options const &opts,
218  bool doCreate) {
219  unique_ptr<SharedMemorySegmentHolder> ret;
220  if (doCreate) {
221  ret.reset(
222  new ServerSharedMemorySegmentHolder<ManagedMemory>(opts));
223  } else {
224  ret.reset(
225  new ClientSharedMemorySegmentHolder<ManagedMemory>(opts));
226  }
227  if (nullptr == ret->getBookkeeping()) {
228  ret.reset();
229  } else {
230  getIPCRingBufferLogger().debug()
231  << "size: " << ret->getSize()
232  << ", free: " << ret->getFreeMemory();
233  }
234  return ret;
235  }
236  } // namespace
237 
239  detail::IPCPutResultPtr &&data, IPCRingBufferPtr &&shm)
240  : m_buf(nullptr), m_seq(0), m_data(std::move(data)) {
241  if (m_data) {
242  m_buf = m_data->buffer;
243  m_seq = m_data->seq;
244  m_data->shm = std::move(shm);
245  }
246  }
247 
248  IPCRingBuffer::BufferReadProxy::BufferReadProxy(
249  detail::IPCGetResultPtr &&data, IPCRingBufferPtr &&shm)
250  : m_buf(nullptr), m_seq(0), m_data(std::move(data)) {
251  if (nullptr != m_data) {
252  m_buf = m_data->buffer;
253  m_seq = m_data->seq;
254  m_data->shm = std::move(shm);
255  }
256  }
257 
258  IPCRingBuffer::smart_pointer_type
260  return smart_pointer_type(m_data, m_buf);
261  }
262 
263  IPCRingBuffer::Options::Options()
264  : m_shmBackend(ipc::DEFAULT_MANAGED_SHM_ID) {}
265 
266  IPCRingBuffer::Options::Options(std::string const &name)
267  : m_name(ipc::make_name_safe(name)),
268  m_shmBackend(ipc::DEFAULT_MANAGED_SHM_ID) {}
269 
270  IPCRingBuffer::Options::Options(std::string const &name,
271  BackendType backend)
272  : m_name(ipc::make_name_safe(name)), m_shmBackend(backend) {}
273 
274  IPCRingBuffer::Options &
275  IPCRingBuffer::Options::setName(std::string const &name) {
276  m_name = ipc::make_name_safe(name);
277  return *this;
278  }
279 
280  IPCRingBuffer::Options &
281  IPCRingBuffer::Options::setAlignment(alignment_type alignment) {
283  m_alignment = alignment;
284  return *this;
285  }
286 
287  IPCRingBuffer::Options &
288  IPCRingBuffer::Options::setEntries(entry_count_type entries) {
289  m_entries = entries;
290  return *this;
291  }
292 
293  IPCRingBuffer::Options &
294  IPCRingBuffer::Options::setEntrySize(entry_size_type entrySize) {
295  m_entrySize = entrySize;
296  return *this;
297  }
298  class IPCRingBuffer::Impl {
299  public:
300  Impl(unique_ptr<SharedMemorySegmentHolder> &&segment,
301  Options const &opts)
302  : m_seg(std::move(segment)), m_bookkeeping(nullptr), m_opts(opts) {
303  m_bookkeeping = m_seg->getBookkeeping();
304  m_opts.setEntries(m_bookkeeping->getCapacity());
305  m_opts.setEntrySize(m_bookkeeping->getBufferLength());
306  }
307 
308  detail::IPCPutResultPtr put() {
309  return m_bookkeeping->produceElement();
310  }
311 
312  detail::IPCGetResultPtr get(sequence_type num) {
313  detail::IPCGetResultPtr ret;
314  auto boundsLock = m_bookkeeping->getSharableLock();
315  auto elt = m_bookkeeping->getBySequenceNumber(num, boundsLock);
316  if (nullptr != elt) {
317  auto readerLock = elt->getSharableLock();
318  auto buf = elt->getBuf(readerLock);
320  ret.reset(new detail::IPCGetResult{buf, std::move(readerLock),
321  num, nullptr});
322  }
323  return ret;
324  }
325 
326  detail::IPCGetResultPtr getLatest() {
327  detail::IPCGetResultPtr ret;
328  auto boundsLock = m_bookkeeping->getSharableLock();
329  auto elt = m_bookkeeping->back(boundsLock);
330  if (nullptr != elt) {
331  auto readerLock = elt->getSharableLock();
332  auto buf = elt->getBuf(readerLock);
334  ret.reset(new detail::IPCGetResult{
335  buf, std::move(readerLock),
336  m_bookkeeping->backSequenceNumber(boundsLock), nullptr});
337  }
338  return ret;
339  }
340 
341  Options const &getOpts() const { return m_opts; }
342 
343  private:
344  unique_ptr<SharedMemorySegmentHolder> m_seg;
345  detail::Bookkeeping *m_bookkeeping;
346 
347  Options m_opts;
348  };
349 
350  IPCRingBufferPtr IPCRingBuffer::m_constructorHelper(Options const &opts,
351  bool doCreate) {
352 
353  IPCRingBufferPtr ret;
354  unique_ptr<SharedMemorySegmentHolder> segment;
355 
356  switch (opts.getBackend()) {
357 
358  case ipc::BASIC_MANAGED_SHM_ID:
359  segment =
360  constructMemorySegment<ipc::basic_managed_shm>(opts, doCreate);
361  break;
362 
363 #ifdef OSVR_HAVE_WINDOWS_SHM
364  case ipc::WINDOWS_MANAGED_SHM_ID:
365  segment = constructMemorySegment<ipc::windows_managed_shm>(
366  opts, doCreate);
367  break;
368 #endif
369 
370 #ifdef OSVR_HAVE_XSI_SHM
371  case ipc::SYSV_MANAGED_SHM_ID:
372  segment =
373  constructMemorySegment<ipc::sysv_managed_shm>(opts, doCreate);
374  break;
375 #endif
376 
377  default:
378  getIPCRingBufferLogger().error()
379  << "Unsupported/unrecognized shared memory backend: "
380  << int(opts.getBackend());
381  break;
382  }
383 
384  if (!segment) {
385  return ret;
386  }
387  unique_ptr<Impl> impl(new Impl(std::move(segment), opts));
388  ret.reset(new IPCRingBuffer(std::move(impl)));
389  return ret;
390  }
391 
392  IPCRingBuffer::abi_level_type IPCRingBuffer::getABILevel() {
393  return SHM_SOURCE_ABI_LEVEL;
394  }
395 
397  return m_constructorHelper(opts, true);
398  }
400  return m_constructorHelper(opts, false);
401  }
402 
403  IPCRingBuffer::IPCRingBuffer(unique_ptr<Impl> &&impl)
404  : m_impl(std::move(impl)) {}
405 
407 
408  IPCRingBuffer::BackendType IPCRingBuffer::getBackend() const {
409  return m_impl->getOpts().getBackend();
410  }
411 
412  std::string const &IPCRingBuffer::getName() const {
413  return m_impl->getOpts().getName();
414  }
415 
416  uint32_t IPCRingBuffer::getEntrySize() const {
417  return m_impl->getOpts().getEntrySize();
418  }
419 
420  uint16_t IPCRingBuffer::getEntries() const {
421  return m_impl->getOpts().getEntries();
422  }
423 
424  IPCRingBuffer::BufferWriteProxy IPCRingBuffer::put() {
425  return BufferWriteProxy(m_impl->put(), shared_from_this());
426  }
427 
428  IPCRingBuffer::sequence_type IPCRingBuffer::put(pointer_to_const_type data,
429  size_t len) {
430  auto proxy = put();
431  std::memcpy(proxy.get(), data, len);
432  return proxy.getSequenceNumber();
433  }
434 
435  IPCRingBuffer::BufferReadProxy IPCRingBuffer::get(sequence_type num) {
436  return BufferReadProxy(m_impl->get(num), shared_from_this());
437  }
438 
439  IPCRingBuffer::BufferReadProxy IPCRingBuffer::getLatest() {
440  return BufferReadProxy(m_impl->getLatest(), shared_from_this());
441  }
442 
443 } // namespace common
444 } // namespace osvr
Options & setAlignment(alignment_type alignment)
Sets the alignment for each entry, which must be a power of 2 (rounded up to the nearest if it's not)...
Options & setName(std::string const &name)
sets the name, after sanitizing the input string.
BufferReadProxy get(sequence_type num)
Gets access to an element in the buffer by sequence number: returns a proxy object that behaves mostl...
BufferReadProxy getLatest()
Gets access to the most recent element in the buffer: returns a proxy object that behaves mostly like...
STL namespace.
Header for basic internal log reference. To actually log to the produced loggers, include
shared_ptr< IPCRingBuffer > IPCRingBufferPtr
Pointer type for holding a shared memory ring buffer.
Definition: IPCRingBuffer.h:49
static IPCRingBufferPtr find(Options const &opts)
Named constructor, for use by client processes: accesses an IPC ring buffer using the options structu...
uint16_t getEntries() const
Returns the total capacity, in number of buffer entries, of this ring buffer.
static abi_level_type getABILevel()
Gets an integer representing a unique arrangement of the internal shared memory layout, such that if two processes try to communicate with different ABI levels, they will (likely) not succeed and thus should not try.
uint32_t sequence_type
The sequence number is automatically incremented with each "put" into the buffer. Note that...
BackendType getBackend() const
Returns an integer identifying the IPC backend used.
Header defining the types placed into shared memory for an IPCRingBuffer.
BufferWriteProxy(BufferWriteProxy const &)=delete
not copyable
std::string const & getName() const
Returns the name string used to create or find this ring buffer.
Options & setEntries(entry_count_type entries)
Sets the number of entries in the ring buffer.
Options & setEntrySize(entry_size_type entrySize)
Sets the size of each entry in the ring buffer.
Header to include for OSVR-internal usage of the logging mechanism: provides the needed definition of...
smart_pointer_type getBufferSmartPointer() const
Gets a smart pointer to the buffer that shares ownership of the underlying resources of this object...
uint32_t getEntrySize() const
Returns the size of each individual buffer entry, in bytes.
unsigned char OSVR_ImageBufferElement
Type for raw buffer access to image data.
BufferWriteProxy put()
Gets a proxy object for putting data in the next element in the buffer. You're responsible for doing ...
static IPCRingBufferPtr create(Options const &opts)
Named constructor, for use by server processes: creates a shared memory ring buffer given the options...