/*** * ==++== * * Copyright (c) Microsoft Corporation. All rights reserved. * Microsoft would like to acknowledge that this concurrency data structure implementation * is based on Intel's implementation in its Threading Building Blocks ("Intel Material"). * * ==--== * =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ * * concurrent_priority_queue.h * * =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- ****/ /* Intel Material Copyright 2005-2011 Intel Corporation. All Rights Reserved. */ #pragma once #include #include #include #include #include #include #include #include #include #if !(defined (_M_X64) || defined (_M_IX86) || defined (_M_ARM)) #error ERROR: Concurrency Runtime is supported only on X64, X86 and ARM architectures. #endif /* !(defined (_M_X64) || defined (_M_IX86) || defined (_M_ARM)) */ #if defined (_M_CEE) #error ERROR: Concurrency Runtime is not supported when compiling /clr. #endif /* defined (_M_CEE) */ #pragma pack(push,_CRT_PACKING) #pragma warning (push) #pragma warning (disable: 4510 4512 4610) // disable warnings for compiler unable to generate constructor /// /// The concurrency namespace provides classes and functions that give you access to the Concurrency Runtime, /// a concurrent programming framework for C++. For more information, see . /// /**/ namespace Concurrency { namespace details { /// /// _Aggregated_operation base class /// template class _Aggregated_operation { public: volatile int _M_status; _Derived * _M_pNext; _Aggregated_operation() : _M_status(0), _M_pNext(NULL) { } }; /// /// An aggregator for collecting operations coming from multiple sources and executing them serially on a single thread. /// _Operation_type must be derived from _Aggregated_operation. The parameter _Handler_type is a functor that will be passed the /// list of operations and is expected to handle each operation appropriately, setting the status of each operation to non-zero. /// template < typename _Operation_type, typename _Handler_type > class _Aggregator { public: _Aggregator() : _M_handler_busy(0) { _M_pPending_operations = NULL; } ~_Aggregator() { } void _Initialize_handler(_Handler_type _Handler) { _M_handle_operations = _Handler; } /// /// Place operation in list and either handle list or wait for operation to complete. /// void _Execute(_Operation_type *_Op) { _Operation_type *_Res; // Insert the operation in the queue do { _Op->_M_pNext = _Res = _M_pPending_operations; } while (_InterlockedCompareExchangePointer(reinterpret_cast(&_M_pPending_operations), _Op, _Res) != _Res); if (_Res == NULL) { // This item was the first one in the list; this thread will handle the operations. Note that by the time the // thread pops the list of operations to handle, there may be several operations queued up. _Start_handle_operations(); _CONCRT_ASSERT(_Op->_M_status != 0); } else { // This item was not the first one on the list; a different thread is going to handle this operation, wait for // the result to be ready. ::Concurrency::details::_SpinWaitBackoffNone _SpinWait; while (_Op->_M_status == 0) { _SpinWait._SpinOnce(); } } } private: // An atomically updated list (also known as a mailbox) of pending operations _Operation_type * volatile _M_pPending_operations; // Controls thread access to _M_handle_operations volatile long _M_handler_busy; // The method that handles the operations _Handler_type _M_handle_operations; // Trigger the handling of operations when the handler is free void _Start_handle_operations() { _Operation_type * _POp_list; // Acquire the _M_handler_busy flag. Only one thread can possibly spin here at a time ::Concurrency::details::_SpinWaitBackoffNone _SpinWait; while (_M_handler_busy == 1) { _SpinWait._SpinOnce(); } long _OldVal = _InterlockedExchange(&_M_handler_busy, 1); _CONCRT_ASSERT(_OldVal == 0); // Get the list of pending operations _POp_list = reinterpret_cast<_Operation_type *>(_InterlockedExchangePointer(reinterpret_cast(&_M_pPending_operations), NULL)); // Handle all the operations _M_handle_operations(_POp_list); // Release the handler _OldVal = _InterlockedExchange(&_M_handler_busy, 0); _CONCRT_ASSERT(_OldVal == 1); } }; } // namespace details /// /// The concurrent_priority_queue class is a container that allows multiple threads to concurrently push and pop items. /// Items are popped in priority order where priority is determined by a functor supplied as a template argument. /// /// /// The data type of the elements to be stored in the priority queue. /// /// /// The type of the function object that can compare two element values as sort keys to determine their relative order /// in the priority queue. This argument is optional and the binary predicate less<> /// is the default value. /// /// /// The type that represents the stored allocator object that encapsulates details about the allocation and /// deallocation of memory for the concurrent priority queue. This argument is optional and the default value is /// allocator<>. /// /// /// For detailed information on the concurrent_priority_queue class, see . /// /// /**/ template , typename _Ax = std::allocator<_Ty> > class concurrent_priority_queue { public: /// /// A type that represents the data type stored in a concurrent priority queue. /// /**/ typedef _Ty value_type; /// /// A type that represents a reference to an element of the type stored in a concurrent priority queue. /// /**/ typedef _Ty& reference; /// /// A type that represents a const reference to an element of the type stored in a concurrent priority queue. /// /**/ typedef const _Ty& const_reference; /// /// A type that counts the number of elements in a concurrent priority queue. /// /**/ typedef size_t size_type; /// /// A type that represents the allocator class for the concurrent priority queue. /// /**/ typedef _Ax allocator_type; /// /// Constructs a concurrent priority queue. /// /// /// The allocator class to use with this object. /// /// /// All constructors store an allocator object and initialize the priority queue. /// The first constructor specifies an empty initial priority queue and optionally specifies an allocator. /// The second constructor specifies a priority queue with an initial capacity and optionally specifies /// an allocator. /// The third constructor specifies values supplied by the iterator range [, ) and /// optionally specifies an allocator. /// The fourth and fifth constructors specify a copy of the priority queue . /// The sixth and seventh constructors specify a move of the priority queue . /// /**/ explicit concurrent_priority_queue(const allocator_type& _Al = allocator_type()) : _M_mark(0), _M_size(0), _M_data(_Al) { _M_my_aggregator._Initialize_handler(_My_functor_type(this)); } /// /// Constructs a concurrent priority queue. /// /// /// The initial capacity of the concurrent_priority_queue object. /// /// /// The allocator class to use with this object. /// /// /// All constructors store an allocator object and initialize the priority queue. /// The first constructor specifies an empty initial priority queue and optionally specifies an allocator. /// The second constructor specifies a priority queue with an initial capacity and optionally specifies /// an allocator. /// The third constructor specifies values supplied by the iterator range [, ) and /// optionally specifies an allocator. /// The fourth and fifth constructors specify a copy of the priority queue . /// The sixth and seventh constructors specify a move of the priority queue . /// /**/ explicit concurrent_priority_queue(size_type _Init_capacity, const allocator_type& _Al = allocator_type()) : _M_mark(0), _M_size(0), _M_data(_Al) { _M_data.reserve(_Init_capacity); _M_my_aggregator._Initialize_handler(_My_functor_type(this)); } /// /// Constructs a concurrent priority queue. /// /// /// The type of the input iterator. /// /// /// The position of the first element in the range of elements to be copied. /// /// /// The position of the first element beyond the range of elements to be copied. /// /// /// The allocator class to use with this object. /// /// /// All constructors store an allocator object and initialize the priority queue. /// The first constructor specifies an empty initial priority queue and optionally specifies an allocator. /// The second constructor specifies a priority queue with an initial capacity and optionally specifies /// an allocator. /// The third constructor specifies values supplied by the iterator range [, ) and /// optionally specifies an allocator. /// The fourth and fifth constructors specify a copy of the priority queue . /// The sixth and seventh constructors specify a move of the priority queue . /// /**/ template concurrent_priority_queue(_InputIterator _Begin, _InputIterator _End, const allocator_type& _Al = allocator_type()) : _M_data(_Begin, _End, _Al) { // Set the mark to 0 to indicate that nothing is sorted. _M_mark = 0; _M_size = _M_data.size(); _M_my_aggregator._Initialize_handler(_My_functor_type(this)); _Heapify(); } /// /// Constructs a concurrent priority queue. /// /// /// The source concurrent_priority_queue object to copy or move elements from. /// /// /// All constructors store an allocator object and initialize the priority queue. /// The first constructor specifies an empty initial priority queue and optionally specifies an allocator. /// The second constructor specifies a priority queue with an initial capacity and optionally specifies /// an allocator. /// The third constructor specifies values supplied by the iterator range [, ) and /// optionally specifies an allocator. /// The fourth and fifth constructors specify a copy of the priority queue . /// The sixth and seventh constructors specify a move of the priority queue . /// /**/ concurrent_priority_queue(const concurrent_priority_queue& _Src) : _M_mark(_Src._M_mark), _M_size(_Src._M_size), _M_data(_Src._M_data) { _M_my_aggregator._Initialize_handler(_My_functor_type(this)); _Heapify(); } /// /// Constructs a concurrent priority queue. /// /// /// The source concurrent_priority_queue object to copy or move elements from. /// /// /// The allocator class to use with this object. /// /// /// All constructors store an allocator object and initialize the priority queue. /// The first constructor specifies an empty initial priority queue and optionally specifies an allocator. /// The second constructor specifies a priority queue with an initial capacity and optionally specifies /// an allocator. /// The third constructor specifies values supplied by the iterator range [, ) and /// optionally specifies an allocator. /// The fourth and fifth constructors specify a copy of the priority queue . /// The sixth and seventh constructors specify a move of the priority queue . /// /**/ concurrent_priority_queue(const concurrent_priority_queue& _Src, const allocator_type& _Al) : _M_mark(_Src._M_mark), _M_data(_Src._M_data.begin(), _Src._M_data.end(), _Al) { _M_size = _M_data.size(); _M_my_aggregator._Initialize_handler(_My_functor_type(this)); _Heapify(); } /// /// Constructs a concurrent priority queue. /// /// /// The source concurrent_priority_queue object to copy or move elements from. /// /// /// All constructors store an allocator object and initialize the priority queue. /// The first constructor specifies an empty initial priority queue and optionally specifies an allocator. /// The second constructor specifies a priority queue with an initial capacity and optionally specifies /// an allocator. /// The third constructor specifies values supplied by the iterator range [, ) and /// optionally specifies an allocator. /// The fourth and fifth constructors specify a copy of the priority queue . /// The sixth and seventh constructors specify a move of the priority queue . /// /**/ concurrent_priority_queue(concurrent_priority_queue&& _Src) : _M_mark(_Src._M_mark), _M_size(_Src._M_size), _M_data(std::move(_Src._M_data)) { // Set _M_mark and _M_size for the argument to 0 because its data has been moved over to this priority queue. _Src._M_mark = 0; _Src._M_size = 0; _M_my_aggregator._Initialize_handler(_My_functor_type(this)); _Heapify(); } /// /// Constructs a concurrent priority queue. /// /// /// The source concurrent_priority_queue object to copy or move elements from. /// /// /// The allocator class to use with this object. /// /// /// All constructors store an allocator object and initialize the priority queue. /// The first constructor specifies an empty initial priority queue and optionally specifies an allocator. /// The second constructor specifies a priority queue with an initial capacity and optionally specifies /// an allocator. /// The third constructor specifies values supplied by the iterator range [, ) and /// optionally specifies an allocator. /// The fourth and fifth constructors specify a copy of the priority queue . /// The sixth and seventh constructors specify a move of the priority queue . /// /**/ concurrent_priority_queue(concurrent_priority_queue&& _Src, const allocator_type& _Al) : _M_mark(_Src._M_mark), _M_size(_Src._M_size), _M_data(std::move(_Src._M_data), _Al) { // Set _M_mark and _M_size for the argument to 0 because its data has been moved over to this priority queue. _Src._M_mark = 0; _Src._M_size = 0; _M_my_aggregator._Initialize_handler(_My_functor_type(this)); _Heapify(); } /// /// Assigns the contents of another concurrent_priority_queue object to this one. This method is not concurrency-safe. /// /// /// The source concurrent_priority_queue object. /// /// /// A reference to this concurrent_priority_queue object. /// /**/ concurrent_priority_queue& operator=(const concurrent_priority_queue& _Src) { if (this != &_Src) { std::vector(_Src._M_data.begin(), _Src._M_data.end(), _Src._M_data.get_allocator()).swap(_M_data); _M_mark = _Src._M_mark; _M_size = _Src._M_size; } return *this; } /// /// Assigns the contents of another concurrent_priority_queue object to this one. This method is not concurrency-safe. /// /// /// The source concurrent_priority_queue object. /// /// /// A reference to this concurrent_priority_queue object. /// /**/ concurrent_priority_queue& operator=(concurrent_priority_queue&& _Src) { if (this != &_Src) { _M_data = std::move(_Src._M_data); _M_mark = _Src._M_mark; _M_size = _Src._M_size; // Set _M_mark and _M_size for the arguement to 0 because its data has been moved over to this priority queue. _Src._M_mark = 0; _Src._M_size = 0; } return *this; } /// /// Tests if the concurrent priority queue is empty at the time this method is called. This method is concurrency-safe. /// /// /// true if the priority queue was empty at the moment the function was called, false otherwise. /// /**/ bool empty() const { return (_M_size == 0); } /// /// Returns the number of elements in the concurrent priority queue. This method is concurrency-safe. /// /// /// The number of elements in this concurrent_priority_queue object. /// /// /// The returned size is guaranteed to include all elements added by calls to the function push. /// However, it may not reflect results of pending concurrent operations. /// /**/ size_type size() const { return _M_size; } /// /// Adds an element to the concurrent priority queue. This method is concurrency-safe. /// /// /// The element to be added to the concurrent priority queue. /// void push(const value_type& _Elem) { _Cpq_operation _Op_data(_Elem, _PUSH_OP_COPY); _M_my_aggregator._Execute(&_Op_data); if (_Op_data._M_status == _FAILED) { // Rethrow the saved exception. std::rethrow_exception(_Op_data._M_exception_ptr); } } /// /// Adds an element to the concurrent priority queue. This method is concurrency-safe. /// /// /// The element to be added to the concurrent priority queue. /// void push(value_type&& _Elem) { _Cpq_operation _Op_data(_Elem, _PUSH_OP_MOVE); _M_my_aggregator._Execute(&_Op_data); if (_Op_data._M_status == _FAILED) { // Rethrow the saved exception std::rethrow_exception(_Op_data._M_exception_ptr); } } /// /// Removes and returns the highest priority element from the queue if the queue is non-empty. This method is concurrency-safe. /// /// /// A reference to a variable that will be populated with the highest priority element, if the queue is non-empty. /// /// /// true if a value was popped, false otherwise. /// /**/ bool try_pop(reference _Elem) { _Cpq_operation _Op_data(_POP_OP); _Op_data._M_elem = &_Elem; _M_my_aggregator._Execute(&_Op_data); return (_Op_data._M_status == _SUCCEEDED); } /// /// Erases all elements in the concurrent priority. This method is not concurrency-safe. /// /// /// clear is not concurrency-safe. You must ensure that no other threads are invoking methods /// on the concurrent priority queue when you call this method. clear does not free memory. /// /**/ void clear() { _M_data.clear(); _M_mark = 0; _M_size = 0; } /// /// Swaps the contents of two concurrent priority queues. This method is not concurrency-safe. /// /// /// The concurrent_priority_queue object to swap contents with. /// /**/ void swap(concurrent_priority_queue& _Queue) { _M_data.swap(_Queue._M_data); std::swap(_M_mark, _Queue._M_mark); std::swap(_M_size, _Queue._M_size); } /// /// Returns a copy of the allocator used to construct the concurrent priority queue. This method is concurrency-safe. /// /// /// A copy of the allocator used to construct the concurrent_priority_queue object. /// /**/ allocator_type get_allocator() const { return _M_data.get_allocator(); } private: enum _Operation_type {_INVALID_OP, _PUSH_OP_COPY, _PUSH_OP_MOVE, _POP_OP}; enum _Operation_status { _WAIT=0, _SUCCEEDED, _FAILED }; class _Cpq_operation : public ::Concurrency::details::_Aggregated_operation<_Cpq_operation> { public: _Operation_type _M_type; union { value_type * _M_elem; size_type _M_size; }; std::exception_ptr _M_exception_ptr; _Cpq_operation(const_reference _E, _Operation_type _T) : _M_type(_T), _M_elem(const_cast(&_E)) {} _Cpq_operation(value_type&& _E, _Operation_type _T) : _M_type(_T), _M_elem(const_cast(&_E)) {} _Cpq_operation(_Operation_type _T) : _M_type(_T) {} }; class _My_functor_type { concurrent_priority_queue<_Ty, _Compare, _Ax> *_M_PCpq; public: _My_functor_type() {} _My_functor_type(concurrent_priority_queue<_Ty, _Compare, _Ax> * _PCpq) : _M_PCpq(_PCpq) {} void operator()(_Cpq_operation* _POp_list) { _M_PCpq->_M_handle_operations(_POp_list); } }; ::Concurrency::details::_Aggregator< _Cpq_operation, _My_functor_type > _M_my_aggregator; // Padding added to avoid false sharing char _M_padding1[64 /* CACHE_LINE_SIZE */ - sizeof(::Concurrency::details::_Aggregator< _Cpq_operation, _My_functor_type >)]; // The point at which unsorted elements begin size_type _M_mark; // Size of the concurrent priority queue. This is cached instead of using vector::size(), because that method is unsafe in the presence // of concurrent pushes/pops. volatile size_type _M_size; // The comparator function to determine relative priority between elements stored in the priority queue. _Compare _M_compare; // Padding added to avoid false sharing char _M_padding2[64 /* CACHE_LINE_SIZE */ - sizeof(size_type) - sizeof(_Compare)]; // Storage for the heap of elements in queue, plus unheapified elements. _M_data has the following structure: // // binary unheapified // heap elements // ____|_______|____ // | | | // v v v // [_|...|_|_|...|_| |...| ] // 0 ^ ^ ^ // | | |__capacity // | |__size // |__mark // // Thus, _M_data stores the binary heap starting at position 0 through _M_mark-1 (it may be empty). Then there are 0 or more elements // that have not yet been inserted into the heap, in positions _M_mark through size-1. std::vector _M_data; /// /// Handle a batch of operations pending on the concurrent priority queue. Only one thread can execute this routine at a time. /// void _M_handle_operations(_Cpq_operation *_POp_list) { _Cpq_operation *_PTmp, *_PPop_list=NULL; _CONCRT_ASSERT(_M_mark == _M_data.size()); // First pass processes all constant time operations: pushes, some pops. while (_POp_list != NULL) { _CONCRT_ASSERT(_POp_list->_M_type != _INVALID_OP); _PTmp = _POp_list; _POp_list = _POp_list->_M_pNext; if (_PTmp->_M_type == _PUSH_OP_COPY) { try { _M_data.push_back(*(_PTmp->_M_elem)); ++_M_size; _InterlockedExchange((volatile long *) &_PTmp->_M_status, _SUCCEEDED); } catch (...) { _PTmp->_M_exception_ptr = std::current_exception(); _InterlockedExchange((volatile long *) &_PTmp->_M_status, _FAILED); } } else if (_PTmp->_M_type == _PUSH_OP_MOVE) { try { _M_data.push_back(std::move(*(_PTmp->_M_elem))); ++_M_size; _InterlockedExchange((volatile long *) &_PTmp->_M_status, _SUCCEEDED); } catch (...) { _PTmp->_M_exception_ptr = std::current_exception(); _InterlockedExchange((volatile long *) &_PTmp->_M_status, _FAILED); } } else { _CONCRT_ASSERT(_PTmp->_M_type == _POP_OP); if (_M_mark < _M_size && _M_compare(_M_data[0], _M_data[_M_size - 1])) { // There are newly pushed elems and the last one is higher than top // Because we're going to pop the last element, we can move the _M_data instead of copying it. *(_PTmp->_M_elem) = std::move(_M_data[_M_size - 1]); --_M_size; _InterlockedExchange((volatile long *) &_PTmp->_M_status, _SUCCEEDED); _M_data.pop_back(); _CONCRT_ASSERT(_M_mark <= _M_size); } else { // No convenient item to pop; postpone _PTmp->_M_pNext = _PPop_list; _PPop_list = _PTmp; } } } // Second pass processes pop operations. while (_PPop_list != NULL) { _PTmp = _PPop_list; _PPop_list = _PPop_list->_M_pNext; _CONCRT_ASSERT(_PTmp->_M_type == _POP_OP); if (_M_size == 0) { _InterlockedExchange((volatile long *) &_PTmp->_M_status, _FAILED); } else { _CONCRT_ASSERT(_M_mark <= _M_size); if (_M_mark < _M_size && _M_compare(_M_data[0], _M_data[_M_size - 1])) { // There are newly pushed elems and the last one is higher than top. // Because we're going to pop the last element, we can move the _M_data instead of copying it. *(_PTmp->_M_elem) = std::move(_M_data[_M_size - 1]); // copy the _M_data --_M_size; _InterlockedExchange((volatile long *) &_PTmp->_M_status, _SUCCEEDED); _M_data.pop_back(); } else { // Extract top and push last element down heap. *(_PTmp->_M_elem) = std::move(_M_data[0]); // copy the _M_data --_M_size; _InterlockedExchange((volatile long *) &_PTmp->_M_status, _SUCCEEDED); _Reheap(); } } } _CONCRT_ASSERT(_M_size == _M_data.size()); // _Heapify any leftover pushed elements before doing the next batch of operations. if (_M_mark < _M_size) { _Heapify(); } _CONCRT_ASSERT(_M_mark == _M_size); } /// /// Merge unsorted elements into heap. /// void _Heapify() { if (_M_mark == 0 && _M_size > 0) { _M_mark = 1; } for (; _M_mark < _M_size; ++_M_mark) { // for each unheapified element under size size_type _Cur_pos = _M_mark; value_type _To_place = std::move(_M_data[_M_mark]); do { // push _To_place up the heap size_type _Parent = (_Cur_pos - 1) >> 1; if (!_M_compare(_M_data[_Parent], _To_place)) { break; } _M_data[_Cur_pos] = std::move(_M_data[_Parent]); _Cur_pos = _Parent; } while( _Cur_pos != 0 ); _M_data[_Cur_pos] = std::move(_To_place); } } /// /// Re-_Heapify by pushing last element down the heap from the root. /// void _Reheap() { size_type _Cur_pos=0, _Child=1; // Use _M_data.size() instead of _M_size throughout this function, because _M_size has already // been decremented to account for the pop right before Reheap was invoked. while (_Child < _M_mark) { size_type _Target = _Child; if (_Child + 1 < _M_mark && _M_compare(_M_data[_Child], _M_data[_Child + 1])) { ++_Target; } // _Target now has the higher priority _Child. if (_M_compare(_M_data[_Target], _M_data[_M_data.size() - 1])) { break; } _M_data[_Cur_pos] = std::move(_M_data[_Target]); _Cur_pos = _Target; _Child = (_Cur_pos << 1) + 1; } _M_data[_Cur_pos] = std::move(_M_data[_M_data.size() - 1]); _M_data.pop_back(); if (_M_mark > _M_data.size()) { _M_mark = _M_data.size(); } } }; } // namespace Concurrency namespace concurrency = Concurrency; #pragma warning (pop) #pragma pack(pop)