/*** * ==++== * * Copyright (c) Microsoft Corporation. All rights reserved. * * ==--== * =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ * * agents.h * * Main public header file for ConcRT's asynchronous agents layer. This is the only header file a * C++ program must include to use asynchronous agents. * * The core runtime, Parallel Patterns Library (PPL), and resource manager are defined in separate header files. * =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- ****/ #pragma once #include #include #include #include #include #include #include #include #define _AGENTS_H #pragma pack(push,_CRT_PACKING) #pragma warning(push) #pragma warning(disable: 4100) // Unreferenced formal parameter - needed for document generation #pragma warning(disable: 4702) // Unreachable code - needed for retail version code path // Forward declarations /// /// The Concurrency namespace provides classes and functions that provide access to the Concurrency Runtime, /// a concurrent programming framework for C++. For more information, see . /// /**/ namespace Concurrency { /// /// Each message instance has an identity that follows it as it is /// cloned and passed between messaging components. This cannot be the /// address of the message object. /// /**/ typedef __int32 runtime_object_identity; /// /// A lock holder that acquires a non-reentrant lock on instantiation and releases /// it on destruction. /// /**/ typedef ::Concurrency::details::_NonReentrantPPLLock::_Scoped_lock _NR_lock; /// /// A lock holder that acquires a reentrant lock on instantiation and releases /// it on destruction /// /**/ typedef ::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock; //*************************************************************************** // Internal namespace: // // Concurrency::details contains definitions to support routines in the public namespaces and macros. // Clients should not directly interact with this namespace. //*************************************************************************** namespace details { //************************************************************************** // Core Messaging Support: //************************************************************************** // // A base class to derive from that keeps unique IDs on its derived classes // class _Runtime_object : public _AllocBase { public: // Creates a new runtime object. _CRTIMP2 _Runtime_object(); // Creates a runtime object from an identity. _CRTIMP2 _Runtime_object(::Concurrency::runtime_object_identity _Id); // Gets the runtime object identity. virtual ::Concurrency::runtime_object_identity _GetId() const { return _M_id; } protected: // The runtime object identity. ::Concurrency::runtime_object_identity _M_id; }; // A queue used to hold the messages for the messaging blocks template class _Queue : public _AllocBase { protected: // A pointer to the head of the queue. _Message * _M_pHead; // A pointer to a pointer to the tail of the queue. _Message ** _M_ppTail; // The number of elements presently stored in the queue. size_t _M_count; public: typedef typename _Message type; // Create a Queue _Queue() : _M_pHead(NULL), _M_ppTail(&_M_pHead), _M_count(0) { } // Destroy the queue ~_Queue() { } // Returns the count of items in the queue size_t _Count() const { return _M_count; } // Add an item to the tail of the queue // // Returns a Boolean indicating whether the operation succeeded. bool _Enqueue(_Message *_Element) { _CONCRT_ASSERT(_Element->_M_pNext == NULL); _CONCRT_ASSERT(*_M_ppTail == NULL); *_M_ppTail = _Element; _Element->_M_pNext = NULL; _M_ppTail = &(_Element->_M_pNext); _M_count++; return true; } // Remove the specified element from the queue // // Returns a Boolean indicating whether the operation succeeded, that is, the message was found in the queue. bool _Remove(_Message * _OldElement) { bool _Result = false; _CONCRT_ASSERT(_OldElement != NULL); if (_M_pHead == _OldElement) { _M_pHead = _OldElement->_M_pNext; if (_M_pHead == NULL) { _M_ppTail = &_M_pHead; } _OldElement->_M_pNext = NULL; _M_count--; _Result = true; } else { _Message * _Next = NULL; for (_Message * _Node = _M_pHead; _Node != NULL; _Node = _Next) { _Next = _Node->_M_pNext; if (_Node->_M_pNext == _OldElement) { _Node->_M_pNext = _OldElement->_M_pNext; // if this is the last element of the _Queue if (_Node->_M_pNext == NULL && _M_count == 1) { _M_ppTail = &_M_pHead; } _OldElement->_M_pNext = NULL; _M_count--; _Result = true; break; } } } return _Result; } // Dequeue an item from the head of queue // // Returns a pointer to the message found at the head of the queue. _Message * _Dequeue() { if (_M_pHead == NULL) { return NULL; } _Message * _Result = _M_pHead; _M_pHead = _Result->_M_pNext; if (_M_pHead == NULL) { _M_ppTail = &_M_pHead; } _Result->_M_pNext = NULL; _M_count--; return _Result; } // Return the item at the head of the queue, without dequeuing // // Returns a pointer to the message found at the head of the queue. _Message * _Peek() { return _M_pHead; } // Return true if the ID matches the message at the head of the queue bool _Is_head(runtime_object_identity _MsgId) { // Peek at the next message in the message buffer. Use it to // check if the IDs match _Message * _Msg = _M_pHead; if (_Msg == NULL || _Msg->msg_id() != _MsgId) { return false; } return true; } }; // // _Dynamic_array implements a container very similar to std::vector. // However, it exposes a reduced subset of functionality that is // geared towards use in network_link_registry. The array acess is not // thread-safe. // template class _Dynamic_array { public: typedef _Dynamic_array<_Type> _Myt; typedef _Type& reference; typedef _Type const& const_reference; // // Construct a dynamic array // _Dynamic_array() { _Init(); } // // Release any resources used by dynamic array // ~_Dynamic_array() { _Clear(); } // // Assignment operator. Copy the contents of _Right // _Myt& operator=(const _Myt& _Right) { if (this != &_Right) { // Remove all the elements _Clear(); // Allocate space for the new elements size_t _Size = _Right._Size(); _Grow(_Size); // Copy over the new elements for (size_t _I=0; _I < _Size; _I++) { _Push_back(_Right[_I]); } } return *this; } // // Clear all the elements in the array // void _Clear() { if (_M_array != NULL) { delete [] _M_array; _Init(); } } // // Add an element to the end of the array // void _Push_back(_Type const& _Element) { if (_M_index >= _M_size) { // Not enough space. Grow the array size_t _NewSize = (_M_index + 1) * _S_growthFactor; _Grow(_NewSize); } _CONCRT_ASSERT(_M_index < _M_size); _M_array[_M_index] = _Element; _M_index++; } // // Index operation. Retrieve an element at the specified index. No bounds check is done. // reference operator[](size_t _Pos) { _CONCRT_ASSERT(_Pos < _M_size); return _M_array[_Pos]; } // // Index operation. Retrieve an element at the specified index. No bounds check is done. // const_reference operator[](size_t _Pos) const { _CONCRT_ASSERT(_Pos < _M_size); return _M_array[_Pos]; } // // Returns the count of elements in the array // size_t _Size() const { return _M_index; } // // Swap the contents of this array with _Right // void _Swap(_Myt& _Right) { if (this != &_Right) { // Swap the details. _Type * _Array = _M_array; size_t _Index = _M_index; size_t _Size = _M_size; _M_array = _Right._M_array; _M_index = _Right._M_index; _M_size = _Right._M_size; _Right._M_array = _Array; _Right._M_index = _Index; _Right._M_size = _Size; } } private: // // Initialize the array // void _Init() { _M_array = NULL; _M_index = 0; _M_size = 0; } // // Grow the array to the given size. The old elements are copied over. // void _Grow(size_t _NewSize) { _CONCRT_ASSERT( _NewSize > _M_size ); _Type * _Array = new _Type[_NewSize]; if (_M_array != NULL) { // Copy over the elememts for (size_t _I = 0; _I < _M_size; _I++) { _Array[_I] = _M_array[_I]; } delete [] _M_array; } _M_array = _Array; _M_size = _NewSize; } // Private data members // Array of elements _Type * _M_array; // Index where the next element should be inserted size_t _M_index; // Capacity of the array. size_t _M_size; static const int _S_growthFactor = 2; }; // // Returns an identifier for the given object that could be used // in an ETW trace (call to _Trace_agents) // template __int64 _Trace_agents_get_id(_Type * _PObject) { return reinterpret_cast<__int64>(_PObject); } } // namespace details //************************************************************************** // Public Namespace: // // Anything in the Concurrency namespace is intended for direct client consumption. // //************************************************************************** // // Forward declarations: // template class ISource; template class ITarget; //************************************************************************** // Network link registry //************************************************************************** // Forward declaration for use in the iterator template class network_link_registry; /// /// Const iterator for network link registry. Message blocks should use /// the link_registry::iterator type for iteration. /// /// /// The network block type /// /**/ template class _Network_link_iterator { public: typedef _Network_link_iterator<_Block> _Myt; typedef network_link_registry<_Block> _MyContainer; // Element type typedef _Block* _EType; // Const iterator - iterator shall not be used to modify the links typedef _EType const& const_reference; typedef _EType const* const_pointer; /// /// Construct iterator /// /**/ _Network_link_iterator(_MyContainer * _PNetwork_link, size_t _Index) : _M_pNetwork_link(_PNetwork_link), _M_index(_Index), _M_value(NULL) { _M_pNetwork_link->_Next_index(_M_index); } /// /// Copy construct an iterator /// /**/ _Network_link_iterator(_Myt const& _Right) { _M_pNetwork_link = _Right._M_pNetwork_link; _M_index = _Right._M_index; } /// /// Copy assign an iterator /// /**/ _Myt const& operator=(_Myt const& _Right) { _M_pNetwork_link = _Right._M_pNetwork_link; _M_index = _Right._M_index; return *this; } /// /// Returns the object pointed to by the iterator /// /// /// Reference to the object pointed to by the iterator /// /**/ const_reference operator*() { _M_value = _M_pNetwork_link->_Get_element(_M_index); return _M_value; } /// /// Returns a pointer to the class object /// /// /// Returns a pointer to the class object /// /**/ const_pointer operator->() const { return (&**this); } /// /// Pre-increment the iterator to point to the next element /// /// /// Reference to the object pointer to by the iterator after /// incrementing it /// /**/ _Myt& operator++() { ++_M_index; _M_pNetwork_link->_Next_index(_M_index); return (*this); } /// /// Post-increment the iterator to point to the next element /// /// /// Reference to the object pointer to by the iterator before /// incrementing it /// /**/ _Myt operator++(int) { _Myt _Tmp = *this; ++*this; return (_Tmp); } private: // Pointer to the underlying container (network link registry) _MyContainer * _M_pNetwork_link; // Current index size_t _M_index; // Current value _EType _M_value; }; /// /// The network_link_registry abstract base class manages the links between source /// and target blocks. /// /// /// The block data type being stored in the network_link_registry. /// /// /// The network link registry is not safe for concurrent access. /// /// /// /**/ template class network_link_registry { public: /// /// A type that represents the block type stored in the network_link_registry object. /// /**/ typedef typename _Block type; /// /// A type that represents an element pointer stored in the network_link_registry object. /// /**/ typedef _Block * _EType; /// /// A type that provides a reference to a const element stored in a /// network_link_registry object for reading and performing const operations. /// /**/ typedef _EType const& const_reference; /// /// A type that provides a pointer to a const element in a /// network_link_registry object. /// /**/ typedef _EType const* const_pointer; // Make the iterators friends so that they can access some of the // private routines such as _Get_element. /**/ friend class _Network_link_iterator<_Block>; /// /// A type that provides an iterator that can read or modify any element in a /// network_link_registry object. /// /**/ typedef _Network_link_iterator<_Block> iterator; /// /// When overridden in a derived class, adds a link to the network_link_registry /// object. /// /// /// A pointer to a block to be added. /// /**/ virtual void add(_EType _Link) = 0; /// /// When overridden in a derived class, removes a specified block from the /// network_link_registry object. /// /// /// A pointer to a block to be removed, if found. /// /// /// true if the link was found and removed, false otherwise. /// /**/ virtual bool remove(_EType _Link) = 0; /// /// When overridden in a derived class, searches the network_link_registry object /// for a specified block. /// /// /// A pointer to a block that is being searched for in the network_link_registry /// object. /// /// /// true if the block was found, false otherwise. /// /**/ virtual bool contains(_EType _Link) = 0; /// /// When overridden in a derived class, returns the number of items in the /// network_link_registry object. /// /// /// The number of items in the network_link_registry object. /// /**/ virtual size_t count() = 0; /// /// When overridden in a derived class, returns an iterator to the first element in the /// network_link_registry object. /// /// /// The end state of the iterator is indicated by a NULL link. /// /// /// An iterator addressing the first element in the network_link_registry object. /// /**/ virtual iterator begin() = 0; protected: /// /// Skips empty slots and updates the index to the next /// non-empty slot. This is called by the iterator. /// /// /// A reference to the index that is to be updated. /// /**/ virtual void _Next_index(size_t& _Index) = 0; /// /// Retrieves the element at the given index. If the index is out of bounds, /// NULL is returned. Users need to use the iterator to access the links. /// /// /// Index of the link to be retrieved. /// /// /// The element in the registry at the index specified by the parameter. /// /**/ virtual _EType _Get_element(size_t _Index) const = 0; }; /// /// The single_link_registry object is a network_link_registry that manages /// only a single source or target block. /// /// /// The block data type being stored in the single_link_registry object. /// /// /**/ template class single_link_registry : public network_link_registry<_Block> { public: /// /// Constructs a single_link_registry object. /// /**/ single_link_registry() : _M_connectedLink(NULL) { } /// /// Destroys the single_link_registry object. /// /// /// The method throws an invalid_operation exception if /// it is called before the link is removed. /// /**/ virtual ~single_link_registry() { // It is an error to delete link registry with links // still present if (count() != 0) { throw invalid_operation("Deleting link registry before removing all the links"); } } /// /// Adds a link to the single_link_registry object. /// /// /// A pointer to a block to be added. /// /// /// The method throws an invalid_link_target exception /// if there is already a link in this registry. /// /**/ virtual void add(_EType _Link) { if (_Link == NULL) { return; } // Only one link can be added. if (_M_connectedLink != NULL) { throw invalid_link_target("_Link"); } _M_connectedLink = _Link; } /// /// Removes a link from the single_link_registry object. /// /// /// A pointer to a block to be removed, if found. /// /// /// true if the link was found and removed, false otherwise. /// /**/ virtual bool remove(_EType _Link) { if ((_Link != NULL) && (_M_connectedLink == _Link)) { _M_connectedLink = NULL; return true; } return false; } /// /// Searches the single_link_registry object for a specified block. /// /// /// A pointer to a block that is to be searched for in the single_link_registry object. /// /// /// true if the link was found, false otherwise. /// /**/ virtual bool contains(_EType _Link) { return ((_Link != NULL) && (_M_connectedLink == _Link)); } /// /// Counts the number of items in the single_link_registry object. /// /// /// The number of items in the single_link_registry object. /// /**/ virtual size_t count() { return (_M_connectedLink == NULL) ? 0 : 1; } /// /// Returns an iterator to the first element in the single_link_registry object. /// /// /// The end state is indicated by a NULL link. /// /// /// An iterator addressing the first element in the single_link_registry object. /// /**/ virtual iterator begin() { return (iterator(this, 0)); } protected: /// /// Skips empty slots and updates the index to the next /// non-empty slot. This is called by the iterator. /// /// /// A reference to the index that is to be updated. /// /**/ virtual void _Next_index(size_t& _Index) { if (_M_connectedLink == NULL) { _Index++; } } /// /// Retrieves the element at the given index. If the index is out of bounds, /// NULL is returned. Users need to use the iterator to access the links. /// /// /// The index of the link to be retrieved. /// /// /// The element in the registry at the index specified by the parameter. /// /**/ virtual _EType _Get_element(size_t _Index) const { if (_Index == 0) { return _M_connectedLink; } return NULL; } private: // A single pointer is used to hold the link _EType _M_connectedLink; }; /// /// The multi_link_registry object is a network_link_registry that manages multiple /// source blocks or multiple target blocks. /// /// /// The block data type being stored in the multi_link_registry object. /// /// /**/ template class multi_link_registry : public network_link_registry<_Block> { public: /// /// Constructs a multi_link_registry object. /// /**/ multi_link_registry() : _M_maxLinks(_NOT_SET) { } /// /// Destroys the multi_link_registry object. /// /// /// The method throws an invalid_operation exception if /// called before all links are removed. /// /**/ virtual ~multi_link_registry() { // It is an error to delete link registry with links // still present if (count() != 0) { throw invalid_operation("Deleting link registry before removing all the links"); } } /// /// Sets an upper bound on the number of links that the multi_link_registry object /// can hold. /// /// /// The maximum number of links that the multi_link_registry object can hold. /// /// /// After a bound is set, unlinking an entry will cause the multi_link_registry /// object to enter an immutable state where further calls to add will throw an /// invalid_link_target exception. /// /**/ void set_bound(size_t _MaxLinks) { _CONCRT_ASSERT(count() == 0); _M_maxLinks = _MaxLinks; } /// /// Adds a link to the multi_link_registry object. /// /// /// A pointer to a block to be added. /// /// /// The method throws an invalid_link_target exception if /// the link is already present in the registry, or if a bound has already been set with the set_bound /// function and a link has since been removed. /// /**/ virtual void add(_EType _Link) { if (_Link == NULL) { return; } _Add(_Link); } /// /// Removes a link from the multi_link_registry object. /// /// /// A pointer to a block to be removed, if found. /// /// /// true if the link was found and removed, false otherwise. /// /**/ virtual bool remove(_EType _Link) { if (_Link == NULL) { return false; } return (_Remove(_Link)); } /// /// Searches the multi_link_registry object for a specified block. /// /// /// A pointer to a block that is to be searched for in the multi_link_registry object. /// /// /// true if the specified block was found, false otherwise. /// /**/ virtual bool contains(_EType _Link) { if (_Link == NULL) { return false; } return (_Find(_Link) < _M_vector._Size()); } /// /// Counts the number of items in the multi_link_registry object. /// /// /// The number of items in the multi_link_registry object. /// /**/ virtual size_t count() { return _Count(); } /// /// Returns an iterator to the first element in the multi_link_registry object. /// /// /// The end state is indicated by a NULL link. /// /// /// An iterator addressing the first element in the multi_link_registry object. /// /**/ virtual iterator begin() { return (iterator(this, 0)); } protected: /// /// Skips empty slots and updates the index to the next /// non-empty slot. This is called by the iterator. /// /// /// A reference to the index that is to be updated. /// /**/ virtual void _Next_index(size_t& _Index) { size_t _Size = _M_vector._Size(); while (_Index < _Size) { if (_M_vector[_Index] != NULL) { break; } ++_Index; } } /// /// Retrieves the element at the given index. If the index is out of bounds, /// NULL is returned. Users need to use the iterator to access the links /// /// /// Index of the link to be retrieved. /// /// /// The element in the registry at the index specified by the parameter. /// /**/ virtual _EType _Get_element(size_t _Index) const { if (_Index < _M_vector._Size()) { return _M_vector[_Index]; } return NULL; } private: /// /// Adds a link to the multi_link_registry object. /// /// /// A pointer to a block to be added. /// /**/ void _Add(_EType _Link) { size_t _Size = _M_vector._Size(); size_t _Insert_pos = 0; _CONCRT_ASSERT(_Link != NULL); // If max links is set, ensure that inserting the new // link will not exceed the bound. if ((_M_maxLinks != _NOT_SET) && ((_Size+1) > (size_t) _M_maxLinks)) { throw invalid_link_target("_Link"); } for (size_t _Index = 0; _Index < _Size; _Index++) { if (_M_vector[_Index] != NULL) { // We want to find the first NULL entry after all the // non-NULL entries. _Insert_pos = _Index + 1; // Throw if dupiclate entry is found if (_M_vector[_Index] == _Link) { throw invalid_link_target("_Link"); } } } if (_Insert_pos < _Size) { _M_vector[_Insert_pos] = _Link; } else { _M_vector._Push_back(_Link); } } /// /// Removes a link from the multi_link_registry /// /// /// A pointer to a block to be removed, if found. /// /// /// true if the specified link was found and removed, false otherwise. /// /**/ bool _Remove(_EType _Link) { _CONCRT_ASSERT(_Link != NULL); for (size_t _Index = 0; _Index < _M_vector._Size(); _Index++) { if (_M_vector[_Index] == _Link) { _M_vector[_Index] = NULL; // If max links is set, prevent new additions to the registry if (_M_maxLinks != _NOT_SET && _M_maxLinks > 0) { // Setting the bound to 0. This causes add to always throw. _M_maxLinks = 0; } return true; } } return false; } /// /// Searches the registry for the given link /// /// /// A pointer to a block that is to be searched. /// /// /// Index of the entry if found. /// /**/ virtual size_t _Find(_EType _Link) { size_t _Index = 0; for (_Index = 0; _Index < _M_vector._Size(); _Index++) { if (_M_vector[_Index] == _Link) { break; } } return _Index; } /// /// Returns the count of items in the registry. /// /// /// The count of items in the registry. /// /**/ size_t _Count() const { size_t _Count = 0; for (size_t _Index = 0; _Index < _M_vector._Size(); _Index++) { if (_M_vector[_Index] != NULL) { _Count++; } } return _Count; } static const size_t _NOT_SET = SIZE_MAX; // Maximum number of links allowed. size_t _M_maxLinks; // ::Concurrency::details::_Dynamic_array is used to hold the links ::Concurrency::details::_Dynamic_array<_EType> _M_vector; }; // Forward declaration for the iterator template class source_link_manager; /// /// Const Iterator for referenced link manager. /// /// /// The underlying network link registry /// /**/ template class _Source_link_iterator { public: typedef typename _LinkRegistry::type _Block; typedef _Source_link_iterator<_LinkRegistry> _Myt; typedef source_link_manager<_LinkRegistry> _MyContainer; // Element type typedef _Block* _EType; // Const iterator - iterator shall not be used to modify the links typedef _EType const& const_reference; typedef _EType const* const_pointer; /// /// Construct iterator /// /**/ _Source_link_iterator(_MyContainer * _PNetwork_link, size_t _Index) : _M_pNetwork_link(_PNetwork_link), _M_index(_Index), _M_sentinel(NULL) { // Take a snapshot of the link registry. This will reference the registry. _M_pNetwork_link->_To_array(_M_array); } /// /// Destruct iterator /// /**/ virtual ~_Source_link_iterator() { if (_M_pNetwork_link != NULL) { _M_pNetwork_link->release(); } } /// /// Copy construct an iterator /// /**/ _Source_link_iterator(_Myt const& _Right) { _M_pNetwork_link = _Right._M_pNetwork_link; _M_index = _Right._M_index; _M_array = _Right._M_array; _M_pNetwork_link->reference(); } /// /// Copy assign an iterator /// /**/ _Myt const& operator=(_Myt const& _Right) { _MyContainer * _OldContainer = _M_pNetwork_link; _CONCRT_ASSERT(_OldContainer != NULL); _M_pNetwork_link = _Right._M_pNetwork_link; _M_index = _Right._M_index; _M_array = _Right._M_array; if (_OldContainer != _M_pNetwork_link) { _OldContainer->release(); _M_pNetwork_link->reference(); } return *this; } /// /// Returns the object pointed to by the iterator /// /// /// Reference to the object pointed to by the iterator /// /**/ const_reference operator*() { return _Get(0); } /// /// Returns a pointer to the class object /// /// /// Returns a pointer to the class object /// /**/ const_pointer operator->() const { return (&**this); } /// /// Index operation. Retrieve an element at the specified index. /// /**/ const_reference operator[](size_t _Pos) const { return _Get(_Pos); } /// /// Pre-increment the iterator to point to the next element /// /// /// Reference to the object pointer to by the iterator after incrementing it /// /**/ _Myt& operator++() { ++_M_index; return (*this); } /// /// Post-increment the iterator to point to the next element /// /// /// Reference to the object pointer to by the iterator before incrementing it /// /**/ _Myt operator++(int) { _Myt _Tmp = *this; ++*this; return (_Tmp); } private: // Get the element at the given offset. const_reference _Get(size_t _Pos) const { size_t _Index = _M_index + _Pos; if (_Index >= _M_array._Size()) { return _M_sentinel; } return _M_array[_Index]; } // Array to hold the snapshot of the link registry ::Concurrency::details::_Dynamic_array<_EType> _M_array; // Pointer to the underlying container (network link registry) _MyContainer * _M_pNetwork_link; // Current index size_t _M_index; // Sentinel value to return on bounds overflow _EType _M_sentinel; }; /// /// The source_link_manager object manages messaging block network links /// to ISource blocks. /// /// /// The network link registry. /// /// /// Currently, the source blocks are reference counted. This is a wrapper on a /// network_link_registry object that allows concurrent access to the links and /// provides the ability to reference the links through callbacks. Message /// blocks (target_blocks or propagator_blocks) should use this class /// for their source links. /// /// /// /**/ template class source_link_manager { public: /// /// The type of link registry being managed by the source_link_manager object. /// /**/ typedef _LinkRegistry type; /// /// The type of the blocks being managed by the source_link_manager object. /// /**/ typedef typename _LinkRegistry::type _Block; /// /// The method signature for a callback method for this source_link_manager object. /// /**/ typedef std::tr1::function _Callback_method; /// /// A type that represents a pointer to an element stored in the source_link_manager object. /// /**/ typedef _Block * _EType; /// /// A type that provides a reference to a const element stored in a source_link_manager object /// for reading and performing const operations. /// /**/ typedef _EType const& const_reference; /// /// A type that provides a pointer to a const element in a source_link_manager object. /// /**/ typedef _EType const* const_pointer; // Iterator friend class _Source_link_iterator<_LinkRegistry>; /// /// A type that provides an iterator that can read or modify any element in the /// source_link_manager object. /// /**/ typedef _Source_link_iterator<_LinkRegistry> iterator; /// /// A type that provides a reentrant lock for the source_link_manager object. /// /**/ typedef ::Concurrency::details::_ReentrantPPLLock _LockType; /// /// A type that provides a RAII scoped lock holder for a lock. /// /**/ typedef _LockType::_Scoped_lock _LockHolder; /// /// Constructs a source_link_manager object. /// /**/ source_link_manager() : _M_iteratorCount(0), _M_pLinkedTarget(NULL) { } /// /// Destroys the source_link_manager object. /// /**/ ~source_link_manager() { _CONCRT_ASSERT(_M_pendingRemove._Size() == 0); } /// /// Registers the target block that holds this source_link_manager object. /// /// /// The target block holding this source_link_manager object. /// /**/ void register_target_block(_Inout_ ITarget * _PTarget) { _M_pLinkedTarget = _PTarget; } /// /// Sets the maximum number of source links that can be added to this /// source_link_manager object. /// /// /// The maximum number of links. /// /**/ void set_bound(size_t _MaxLinks) { _M_links.set_bound(_MaxLinks); } /// /// Adds a source link to the source_link_manager object. /// /// /// A pointer to a block to be added. /// /**/ void add(_EType _Link) { if (_Link == NULL) { return; } { _LockHolder _Lock(_M_lock); _M_links.add(_Link); // We need to add the _Link first and then invoke the // callback because _Add could throw. // As soon as the above lock is released, remove would // find the link that was added and could unlink it before // we are able to invoke the notification below. Keeping an // active iterator would prevent that from happening. _M_iteratorCount++; } // Acquire a reference on this link by the target _Link->acquire_ref(_M_pLinkedTarget); // Release the active iterator release(); } /// /// Removes a link from the source_link_manager object. /// /// /// A pointer to a block to be removed, if found. /// /// /// true if the link was found and removed, false otherwise. /// /**/ bool remove(_EType _Link) { bool _Removed = false; _EType _RemovedLink = NULL; ITarget * _LinkedTarget = _M_pLinkedTarget; if (_Link == NULL) { return false; } { _LockHolder _Lock(_M_lock); _Removed = _M_links.remove(_Link); if (!_Removed) { // No change was made return _Removed; } if (_M_iteratorCount == 0) { // Set the removed link to indicate that // notification callback needs to be invoked. _RemovedLink = _Link; } else { // The iterator will complete the pending operation _M_pendingRemove._Push_back(_Link); } } // NOTE: touching "this" pointer is dangerous as soon as the above lock is released // Release the reference for this link if (_RemovedLink != NULL) { _RemovedLink->release_ref(_LinkedTarget); } return _Removed; } /// /// Acquires a reference on the source_link_manager object. /// /**/ void reference() { _LockHolder _Lock(_M_lock); _M_iteratorCount++; } /// /// Releases the reference on the source_link_manager object. /// /**/ void release() { ITarget * _LinkedTarget = _M_pLinkedTarget; ::Concurrency::details::_Dynamic_array<_EType> _LinksToRemove; { _LockHolder _Lock(_M_lock); _CONCRT_ASSERT(_M_iteratorCount > 0); _M_iteratorCount--; if (_M_iteratorCount == 0) { if (_M_pendingRemove._Size() > 0) { // Snap the pending remove list with the lock held _M_pendingRemove._Swap(_LinksToRemove); } } } // NOTE: touching "this" pointer is dangerous as soon as the above lock is released // Release the references size_t _Size = _LinksToRemove._Size(); for (size_t _I=0; _I < _Size; _I++) { _LinksToRemove[_I]->release_ref(_LinkedTarget); } } /// /// Searches the network_link_registry within this source_link_manager /// object for a specified block. /// /// /// A pointer to a block that is to be searched for in the source_link_manager object. /// /// /// true if the specified block was found, false otherwise. /// /**/ bool contains(_EType _Link) { _LockHolder _Lock(_M_lock); return _M_links.contains(_Link); } /// /// Counts the number of linked blocks in the source_link_manager object. /// /// /// The number of linked blocks in the source_link_manager object. /// /**/ size_t count() { _LockHolder _Lock(_M_lock); return _M_links.count(); } /// /// Returns an iterator to the first element in the source_link_manager object. /// /// /// The end state of the iterator is indicated by a NULL link. /// /// /// An iterator addressing the first element in the source_link_manager object. /// /**/ iterator begin() { return (iterator(this, 0)); } private: // Called by the iterator. This routine takes a snapshot of the links // in the registry and copies it to the array provided. void _To_array(::Concurrency::details::_Dynamic_array<_EType>& _Array) { _LockHolder _Lock(_M_lock); _M_iteratorCount++; for(_LinkRegistry::iterator _Link = _M_links.begin(); *_Link != NULL; _Link++) { _Array._Push_back(*_Link); } } // Internal lock used for synchronization _LockType _M_lock; // Count to indicate that an iterator is active volatile long _M_iteratorCount; // A vector of all pending link remove operations ::Concurrency::details::_Dynamic_array<_EType> _M_pendingRemove; // Underlying link registry _LinkRegistry _M_links; // Target block holding this source link manager ITarget * volatile _M_pLinkedTarget; }; /// /// The valid responses for an offer of a message object to a block. /// /**/ enum message_status { /// /// The target accepted the message. /// /**/ accepted, /// /// The target did not accept the message. /// /**/ declined, /// /// The target postponed the message. /// /**/ postponed, /// /// The target tried to accept the message, but it was no longer available. /// /**/ missed }; /// /// The basic message envelope containing the data payload being passed between /// messaging blocks. /// /// /// The data type of the payload within the message. /// /// /// For more information, see . /// /**/ template class message : public ::Concurrency::details::_Runtime_object { friend class ::Concurrency::details::_Queue>; public: /// /// Constructs a message object. /// /// /// The payload of this message. /// /// /// The constructor that takes a pointer to a message object as an argument /// throws an invalid_argument exception /// if the parameter is NULL. /// /**/ message(_Type const &_P) : payload(_P), _M_pNext(NULL), _M_refCount(0) { } /// /// Constructs a message object. /// /// /// The payload of this message. /// /// /// The unique ID of this message. /// /// /// The constructor that takes a pointer to a message object as an argument /// throws an invalid_argument exception /// if the parameter is NULL. /// /**/ message(_Type const &_P, runtime_object_identity _Id) : ::Concurrency::details::_Runtime_object(_Id), payload(_P), _M_pNext(NULL), _M_refCount(0) { } /// /// Constructs a message object. /// /// /// A reference or pointer to a message object. /// /// /// The constructor that takes a pointer to a message object as an argument /// throws an invalid_argument exception /// if the parameter is NULL. /// /**/ message(message const & _Msg) : payload(_Msg.payload), _M_pNext(NULL), _M_refCount(0) { } /// /// Constructs a message object. /// /// /// A reference or pointer to a message object. /// /// /// This method throws an invalid_argument exception /// if the parameter is NULL. /// /**/ message(_In_ message const * _Msg) : payload((_Msg == NULL) ? NULL : _Msg->payload), _M_pNext(NULL), _M_refCount(0) { if (_Msg == NULL) { throw std::invalid_argument("_Msg"); } } /// /// Destroys the message object. /// /**/ virtual ~message() { } /// /// Returns the ID of the message object. /// /// /// The runtime_object_identity of the message object. /// /**/ runtime_object_identity msg_id() const { return _M_id; } /// /// The payload of the message object. /// /**/ _Type const payload; /// /// Adds to the reference count for the message object. Used for message blocks that /// need reference counting to determine message lifetimes. /// /// /// The new value of the reference count. /// /**/ long add_ref() { return _InterlockedIncrement(&_M_refCount); } /// /// Subtracts from the reference count for the message object. Used for message blocks that /// need reference counting to determine message lifetimes. /// /// /// The new value of the reference count. /// /**/ long remove_ref() { return _InterlockedDecrement(&_M_refCount); } /// /// A type alias for . /// /**/ typedef typename _Type type; private: // The intrusive next pointer used by blocks that need // to chain messages it's holding together message * _M_pNext; // Avoid warnings about not generating assignment operators. message<_Type> const &operator =(message<_Type> const &); // A reference count for the message volatile long _M_refCount; }; //************************************************************************** // Message processor: //************************************************************************** /// /// The message_processor class is the abstract base class for processing of /// message objects. There is no guarantee on the ordering of the messages. /// /// /// The data type of the payload within messages handled by this message_processor object. /// /// /**/ template class message_processor { public: /// /// A type alias for . /// /**/ typedef typename _Type type; /// /// When overridden in a derived class, places messages into the block asynchronously. /// /// /// A message object to send asynchronously. /// /// /// Processor implementations should override this method. /// /**/ virtual void async_send(_Inout_opt_ message<_Type> * _Msg) = 0; /// /// When overridden in a derived class, places messages into the block synchronously. /// /// /// A message object to send synchronously. /// /// /// Processor implementations should override this method. /// /**/ virtual void sync_send(_Inout_opt_ message<_Type> * _Msg) = 0; /// /// When overridden in a derived class, waits for all asynchronous operations to complete. /// /// /// Processor implementations should override this method. /// /**/ virtual void wait() = 0; protected: /// /// When overridden in a derived class, performs the forward processing of /// messages into the block. Called once every time a new message is added and /// the queue is found to be empty. /// /// /// Message block implementations should override this method. /// /**/ virtual void process_incoming_message() = 0; /// /// Wrapper for process_incoming_message suitable for use as a argument to /// CreateThread and other similar methods. /// /// /// A pointer to a message processor passed as a void pointer. /// /**/ static void __cdecl _Process_incoming_message_wrapper(void * _Data) { message_processor<_Type> * _PMessageProcessor = (message_processor<_Type> *) _Data; _PMessageProcessor->process_incoming_message(); } }; /// /// An ordered_message_processor is a message_processor that allows message blocks /// to process messages in the order they were received. /// /// /// The payload type of messages handled by the processor. /// /**/ template class ordered_message_processor : public message_processor<_Type> { public: /// /// The signature of the callback method invoked while processing messages. /// /**/ typedef std::tr1::function *)> _Handler_method; /// /// The signature of the callback method invoked while propagating messages. /// /**/ typedef std::tr1::function _Propagator_method; /// /// A type alias for . /// /**/ typedef _Type type; /// /// Constructs an ordered_message_processor object. /// /// /// This ordered_message_processor will not schedule asynchronous or synchronous /// handlers until the initialize function is called. /// /**/ ordered_message_processor() : _M_queuedDataCount(0), _M_stopProcessing(1), _M_lwtCount(0), _M_pScheduler(NULL), _M_pScheduleGroup(NULL), _M_handler(nullptr), _M_processor(nullptr), _M_propagator(nullptr) { } /// /// Destroys the ordered_message_processor object. /// /// /// Waits for all outstanding asynchronous operations before destroying the processor. /// /**/ virtual ~ordered_message_processor() { wait(); } /// /// Initializes the ordered_message_processor object with the appropriate /// callback function, scheduler and schedule group. /// /// /// A pointer to the scheduler to be used for scheduling light-weight tasks. /// /// /// A pointer to the schedule group to be used for scheduling light-weight tasks. /// /// /// The handler functor invoked during callback. /// /// /// /**/ void initialize(_Inout_opt_ Scheduler * _PScheduler, _Inout_opt_ ScheduleGroup * _PScheduleGroup, _Handler_method const& _Handler) { _M_pScheduler = _PScheduler; _M_pScheduleGroup = _PScheduleGroup; _M_handler = _Handler; _M_stopProcessing = 0; } /// /// Initialize batched message processing /// /// /// The processor functor invoked during callback. /// /// /// The propagator functor invoked during callback. /// virtual void initialize_batched_processing(_Handler_method const& _Processor, _Propagator_method const& _Propagator) { _M_processor = _Processor; _M_propagator = _Propagator; } /// /// Synchronously queues up messages and starts a processing task, if this has not been done /// already. /// /// /// A pointer to a message. /// /**/ virtual void sync_send(_Inout_opt_ message<_Type> * _Msg) { if (_M_handler == NULL) { throw invalid_operation("sync_send called without registering a callback"); } _Sync_send_helper(_Msg); } /// /// Asynchronously queues up messages and starts a processing task, if this has not been done /// already. /// /// /// A pointer to a message. /// /**/ virtual void async_send(_Inout_opt_ message<_Type> * _Msg) { if (_M_handler == NULL) { throw invalid_operation("async_send called without registering a callback"); } // // If there is a message to send, enqueue it in the processing queue. // async_send can be sent a NULL message if the block wishes to reprocess // the messages that are in its queue. For example, an unbounded_buffer // that has its head node released after reservation. // if (_Msg != NULL) { _M_queuedMessages.push(_Msg); } if (_InterlockedIncrement(&_M_queuedDataCount) == 1) { // Indicate that an LWT is in progress. This will cause the // destructor to block. _InterlockedIncrement(&_M_lwtCount); if (_M_stopProcessing == 0) { _CONCRT_ASSERT(_M_lwtCount > 0); _Trace_agents(AGENTS_EVENT_SCHEDULE, ::Concurrency::details::_Trace_agents_get_id(this)); TaskProc _Proc = &::Concurrency::ordered_message_processor<_Type>::_Process_incoming_message_wrapper; #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP if (_M_pScheduleGroup != NULL) { _M_pScheduleGroup->ScheduleTask(_Proc, this); } else if (_M_pScheduler != NULL) { _M_pScheduler->ScheduleTask(_Proc, this); } else { #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */ ::Concurrency::details::_CurrentScheduler::_ScheduleTask(_Proc, this); #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP } #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */ // The LWT will decrement _M_lwtCount. return; } // If we get here then no task was scheduled. Decrement LWT count to reflect this fact _InterlockedDecrement(&_M_lwtCount); } } /// /// A processor-specific spin wait used in destructors of message blocks to make sure /// that all asynchronous processing tasks have time to finish before destroying the block. /// /**/ virtual void wait() { // Cease processing of any new messages _InterlockedIncrement(&_M_stopProcessing); // This spin makes sure all previously initiated message processings // will still process correctly. As soon as this count reaches zero, we can // procede with the message block destructor. ::Concurrency::details::_SpinWaitBackoffNone spinWait(::Concurrency::details::_Context::_Yield); while(_M_lwtCount != 0) { spinWait._SpinOnce(); } // Synchronize with sync_send { _NR_lock _Lock(_M_asyncSendLock); _Clear_queued_messages(); } } protected: /// /// The processing function that is called asynchronously. It dequeues messages and begins /// processing them. /// /**/ virtual void process_incoming_message() { _Trace_agents(AGENTS_EVENT_START, ::Concurrency::details::_Trace_agents_get_id(this)); long _Count = _Process_message_helper(); _Trace_agents(AGENTS_EVENT_END, ::Concurrency::details::_Trace_agents_get_id(this), _Count); // Indicate that an LWT completed _InterlockedDecrement(&_M_lwtCount); // Do not access any members here. If the count goes to // 0 as a result of the above decrement, the object // could be immediately deleted. } private: void _Clear_queued_messages() { message<_Type> * _Msg = NULL; while (_M_queuedMessages.try_pop(_Msg)) { delete _Msg; } } void _Sync_send_helper(message<_Type> * _Msg) { _NR_lock _Lock(_M_asyncSendLock); // Message block destructors sets the _M_stopProcessing flag to stop // processing any more messages. This is required to guarantee // that the destructor's wait_for_async_sends will complete if (_M_stopProcessing == 0) { if (_M_queuedDataCount > 0) { long _Count = _InterlockedExchange((volatile long *) &_M_queuedDataCount, 0); _Invoke_handler(_Count); } _Invoke_handler(_Msg); } else { // Destructor is running. Do not process the message // Delete the msg, if any. if (_Msg != NULL) { delete _Msg; } } } // Helper function to dequeue and process messages to any targets long _Process_message_helper() { _NR_lock _Lock(_M_asyncSendLock); long _Messages_processed = 0; // Do batched processing of messages // Read off the number of messages to process in this iteration by snapping a count volatile long _Count = _M_queuedDataCount; bool _StopProcessing = false; // This count could be 0 if there was both a synchronous and asynchronous // send occuring. One of them could have sent all of the messages for the other while (_Count > 0) { // Process _Count number of messages _Invoke_handler(_Count); _Messages_processed += _Count; // Subtract the count and see if there are new things to process volatile long _Orig = _InterlockedExchangeAdd((volatile long *) &_M_queuedDataCount, -_Count); _CONCRT_ASSERT(_Orig >= _Count); if (_Orig == _Count) { // Because _Count did not change, we processed everything there is to process break; } if (_StopProcessing) { break; } // After reading the flag process the currently queued messages // Any messages received after we observe this flag (to be set) will not // be processed. _StopProcessing = (_M_stopProcessing == 0) ? false : true; // Snap the count and try to process more _Count = _M_queuedDataCount; } return _Messages_processed; } // Invoke the handler in the message block for the given // count void _Invoke_handler(long _Count) { // Process _Count number of messages for(int _I = 0; _I < _Count; _I++) { message<_Type> * _Msg = NULL; _M_queuedMessages.try_pop(_Msg); if (_M_processor == NULL) { // If a processor function does not exist, the message processor is using single // message processing rather than batched processing. There should also be no // propagator function defined in this case. _CONCRT_ASSERT(_M_propagator == NULL); _M_handler(_Msg); } else { // Use the batched message processing function _M_processor(_Msg); } } // Call the handler which propagates the message(s) if (_M_propagator != NULL) { _M_propagator(); } } // Invoke the message block handler for the given message void _Invoke_handler(message<_Type> * _Msg) { if (_M_processor == NULL) { // If a processor function does not exist, the message processor is using single // message processing rather than batched processing. There should also be no // propagator function defined in this case. _CONCRT_ASSERT(_M_propagator == NULL); _M_handler(_Msg); } else { // Use the batched message processing function _M_processor(_Msg); // Call the handler which propagates the message(s) if (_M_propagator != NULL) { _M_propagator(); } } } private: /// /// A queue of the messages /// /**/ concurrent_queue *> _M_queuedMessages; /// /// A lock to use for queueing incoming messages. /// /**/ ::Concurrency::details::_NonReentrantPPLLock _M_asyncSendLock; /// /// A count of the current number of messages to process. Used as a flag /// to see if a new process message task needs to be created. /// /**/ volatile long _M_queuedDataCount; /// /// The scheduler to process messages on /// /**/ Scheduler * _M_pScheduler; /// /// The schedule group to process messages on /// /**/ ScheduleGroup * _M_pScheduleGroup; /// /// A flag set in the destructor of a block to cease processing of new messages. /// This is required to guarantee that _M_queuedDataCount will get to 0 eventually. /// /**/ volatile long _M_stopProcessing; /// /// A counter to indicate the number of outstanding LWTs /// /**/ volatile long _M_lwtCount; /// /// A message handler object which exposes the callback to be invoked /// /**/ _Handler_method _M_handler; /// /// A message processing object which exposes the callback to be invoked /// /**/ _Handler_method _M_processor; /// /// A message propagating object which exposes the callback to be invoked /// /**/ _Propagator_method _M_propagator; }; /// /// The ITarget class is the interface for all target blocks. Target blocks /// consume messages offered to them by ISource blocks. /// /// /// The data type of the payload within the messages accepted by the target block. /// /// /// For more information, see . /// /// /**/ template class ITarget { // // ISource is a friend class because calls to Source->link_target() // and Source->unlink_target() need to call their respective // Target->link_source() and Target->unlink_source() on the block they are // linking/unlinking. Those functions are private here because we don't // want users calling link_source() or unlink_source() directly. link_source/ // unlink_source don't call respective link_target/unlink_target because an // infinite loop would occur. // friend class ISource<_Type>; public: /// /// Destroys the ITarget object. /// /**/ virtual ~ITarget() {} // It is important that calls to propagate do *not* take the same lock on an // internal message structure that is used by Consume and the LWT. Doing so could // result in a deadlock with the Consume call. /// /// When overridden in a derived class, asynchronously passes a message from a source block to /// this target block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /// /// The method throws an invalid_argument exception /// if either the or parameter is NULL. /// /**/ virtual message_status propagate(_Inout_opt_ message<_Type> * _PMessage, _Inout_opt_ ISource<_Type> * _PSource) = 0; /// /// When overridden in a derived class, synchronously passes a message to the target block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /// /// The method throws an invalid_argument exception /// if either the or parameter is NULL. /// Using the send method outside of message initiation and to propagate messages /// within a network is dangerous and can lead to deadlock. /// When send returns, the message has either already been accepted, and transferred into /// the target block, or it has been declined by the target. /// /**/ virtual message_status send(_Inout_ message<_Type> * _PMessage, _Inout_ ISource<_Type> * _PSource) = 0; /// /// When overridden in a derived class, returns true or false depending on whether the /// message block accepts messages offered by a source that is not linked to it. If the overridden method returns /// true, the target cannot postpone an offered message, as consumption of a postponed message /// at a later time requires the source to be identified in its sourse link registry. /// /// /// true if the block can accept message from a source that is not linked to it /// false otherwise. /// /**/ virtual bool supports_anonymous_source() { return false; } /// /// A type alias for . /// /**/ typedef typename _Type type; /// /// The signature of any method used by the block that returns a bool value to determine /// whether an offered message should be accepted. /// /**/ typedef std::tr1::function filter_method; protected: /// /// When overridden in a derived class, links a specified source block to this ITarget block. /// /// /// The ISource block being linked to this ITarget block. /// /// /// This function should not be called directly on an ITarget block. Blocks should be connected together /// using the link_target method on ISource blocks, which will invoke the link_source method /// on the corresponding target. /// /**/ virtual void link_source(_Inout_ ISource<_Type> * _PSource) = 0; /// /// When overridden in a derived class, unlinks a specified source block from this ITarget block. /// /// /// The ISource block being unlinked from this ITarget block. /// /// /// This function should not be called directly on an ITarget block. Blocks should be disconnected /// using the unlink_target or unlink_targets methods on ISource blocks, which will invoke /// the unlink_source method on the corresponding target. /// /**/ virtual void unlink_source(_Inout_ ISource<_Type> * _PSource) = 0; /// /// When overridden in a derived class, unlinks all source blocks from this ITarget block. /// /**/ virtual void unlink_sources() = 0; }; /// /// The ISource class is the interface for all source blocks. Source blocks /// propagate messages to ITarget blocks. /// /// /// The data type of the payload within the messages produced by the source block. /// /// /// For more information, see . /// /// /**/ template class ISource { public: /// /// Destroys the ISource object. /// /**/ virtual ~ISource() {} /// /// When overridden in a derived class, links a target block to this ISource block. /// /// /// A pointer to the target block being linked to this ISource block. /// /**/ virtual void link_target(_Inout_ ITarget<_Type> * _PTarget) = 0; /// /// When overridden in a derived class, unlinks a target block from this ISource block, /// if found to be previously linked. /// /// /// A pointer to the target block being unlinked from this ISource block. /// /**/ virtual void unlink_target(_Inout_ ITarget<_Type> * _PTarget) = 0; /// /// When overridden in a derived class, unlinks all target blocks from this /// ISource block. /// /**/ virtual void unlink_targets() = 0; /// /// When overridden in a derived class, accepts a message that was offered by this ISource block, /// transferring ownership to the caller. /// /// /// The runtime_object_identity of the offered message object. /// /// /// A pointer to the target block that is calling the accept method. /// /// /// A pointer to the message that the caller now has ownership of. /// /// /// The accept method is called by a target while a message is being offered by this ISource block. /// The message pointer returned may be different from the one passed into the propagate method /// of the ITarget block, if this source decides to make a copy of the message. /// /**/ virtual message<_Type> * accept(runtime_object_identity _MsgId, _Inout_ ITarget<_Type> * _PTarget) = 0; /// /// When overridden in a derived class, reserves a message previously offered by this ISource block. /// /// /// The runtime_object_identity of the offered message object. /// /// /// A pointer to the target block that is calling the reserve method. /// /// /// true if the message was successfully reserved, false otherwise. Reservations can fail /// for many reasons, including: the message was already reserved or accepted by another target, the source could /// deny reservations, and so forth. /// /// /// After you call reserve, if it succeeds, you must call either consume or release /// in order to take or give up possession of the message, respectively. /// /**/ virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget<_Type> * _PTarget) = 0; /// /// When overridden in a derived class, consumes a message previously offered by this ISource block /// and successfully reserved by the target, transferring ownership to the caller. /// /// /// The runtime_object_identity of the reserved message object. /// /// /// A pointer to the target block that is calling the consume method. /// /// /// A pointer to the message object that the caller now has ownership of. /// /// /// The consume method is similar to accept, but must always be preceded by a call to reserve that /// returned true. /// /**/ virtual message<_Type> * consume(runtime_object_identity _MsgId, _Inout_ ITarget<_Type> * _PTarget) = 0; /// /// When overridden in a derived class, releases a previous successful message reservation. /// /// /// The runtime_object_identity of the reserved message object. /// /// /// A pointer to the target block that is calling the release method. /// /**/ virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget<_Type> * _PTarget) = 0; /// /// When overridden in a derived class, acquires a reference count on this ISource block, to prevent deletion. /// /// /// A pointer to the target block that is calling this method. /// /// /// This method is called by an ITarget object that is being linked to this source /// during the link_target method. /// /**/ virtual void acquire_ref(_Inout_ ITarget<_Type> * _PTarget) = 0; /// /// When overridden in a derived class, releases a reference count on this ISource block. /// /// /// A pointer to the target block that is calling this method. /// /// /// This method is called by an ITarget object that is being unlinked from this source. /// The source block is allowed to release any resources reserved for the target block. /// /**/ virtual void release_ref(_Inout_ ITarget<_Type> * _PTarget) = 0; /// /// A type alias for . /// /**/ typedef typename _Type source_type; protected: /// /// Links this source to a target. /// /// /// A pointer to the target. /// /// /// This function definition is required because ISource blocks the need to call /// Target->link_source(), which is a private memeber of ITarget. ISource is /// declared as a friend class, so this is an way for derived classes of ISource /// to properly link/unlink their targets during link_target(), unlink_target() and /// unlink_targets() /// /**/ void _Invoke_link_source(ITarget<_Type> * _PLinkFrom) { _PLinkFrom->link_source(this); } /// /// Unlinks this source from a target. /// /// /// A pointer to the target. /// /// /// This function definition is required because ISource blocks need to call /// Target->unlink_source(), which is a private memeber of ITarget. ISource is /// declared as a friend class, so this is an way for derived classes of ISource /// to properly link/unlink their targets during link_target(), unlink_target() and /// unlink_targets() /// /**/ void _Invoke_unlink_source(ITarget<_Type> * _PUnlinkFrom) { _PUnlinkFrom->unlink_source(this); } }; //************************************************************************** // Target Block: //************************************************************************** /// /// The target_block class is an abstract base class that provides basic link management /// functionality and error checking for target only blocks. /// /// /// The link registry to be used for holding the source links. /// /// /// The processor type for message processing. /// /// /**/ template> class target_block : public ITarget { public: /// /// The type of the payload for the incoming messages to this target_block object. /// /**/ typedef typename _SourceLinkRegistry::type::source_type _Source_type; /// /// The type of the source_link_manager this target_block object. /// /**/ typedef source_link_manager<_SourceLinkRegistry> _SourceLinkManager; /// /// The type of the iterator for the source_link_manager for this target_block object. /// /**/ typedef typename _SourceLinkManager::iterator source_iterator; /// /// Constructs a target_block object. /// /**/ target_block() : _M_pFilter(NULL), _M_fDeclineMessages(false) { _Trace_agents(AGENTS_EVENT_CREATE, ::Concurrency::details::_Trace_agents_get_id(this), ::Concurrency::details::_Trace_agents_get_id(&_M_messageProcessor)); } /// /// Destroys the target_block object. /// /**/ virtual ~target_block() { // All sources should have been unlinked _CONCRT_ASSERT(_M_connectedSources.count() == 0); delete _M_pFilter; _Trace_agents(AGENTS_EVENT_DESTROY, ::Concurrency::details::_Trace_agents_get_id(this)); } /// /// Asynchronously passes a message from a source block to this target block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /// /// The method throws an invalid_argument exception /// if either the or parameter is NULL. /// /**/ virtual message_status propagate(_Inout_opt_ message<_Source_type> * _PMessage, _Inout_opt_ ISource<_Source_type> * _PSource) { // It is important that calls to propagate do *not* take the same lock on the // internal structure that is used by consume and the LWT. Doing so could // result in a deadlock. if (_PMessage == NULL) { throw std::invalid_argument("_PMessage"); } if (_PSource == NULL) { throw std::invalid_argument("_PSource"); } if (_M_fDeclineMessages) { return declined; } if (_M_pFilter != NULL && !(*_M_pFilter)(_PMessage->payload)) { return declined; } return propagate_message(_PMessage, _PSource); } /// /// Synchronously passes a message from a source block to this target block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /// /// The method throws an invalid_argument exception /// if either the or parameter is NULL. /// Using the send method outside of message initiation and to propagate messages /// within a network is dangerous and can lead to deadlock. /// When send returns, the message has either already been accepted, and transferred into /// the target block, or it has been declined by the target. /// /**/ virtual message_status send(_Inout_ message<_Source_type> * _PMessage, _Inout_ ISource<_Source_type> * _PSource) { if (_PMessage == NULL) { throw std::invalid_argument("_PMessage"); } if (_PSource == NULL) { throw std::invalid_argument("_PSource"); } if (_M_fDeclineMessages) { return declined; } if (_M_pFilter != NULL && !(*_M_pFilter)(_PMessage->payload)) { return declined; } return send_message(_PMessage, _PSource); } protected: /// /// When overridden in a derived class, this method asynchronously passes a message from an ISource /// block to this target_block object. It is invoked by the propagate method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /**/ virtual message_status propagate_message(_Inout_ message<_Source_type> * _PMessage, _Inout_ ISource<_Source_type> * _PSource) = 0; /// /// When overridden in a derived class, this method synchronously passes a message from an ISource /// block to this target_block object. It is invoked by the send method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /// /// By default, this block returns declined unless overridden by a derived class. /// /**/ virtual message_status send_message(_Inout_ message<_Source_type> *, _Inout_ ISource<_Source_type> *) { // By default we do not allow send() return declined; } /// /// Links a specified source block to this target_block object. /// /// /// A pointer to the ISource block that is to be linked. /// /// /// This function should not be called directly on a target_block object. Blocks should be connected together /// using the link_target method on ISource blocks, which will invoke the link_source method /// on the corresponding target. /// /**/ virtual void link_source(_Inout_ ISource<_Source_type> * _PSource) { _M_connectedSources.add(_PSource); _Trace_agents(AGENTS_EVENT_LINK, ::Concurrency::details::_Trace_agents_get_id(_PSource), ::Concurrency::details::_Trace_agents_get_id(this)); } /// /// Unlinks a specified source block from this target_block object. /// /// /// A pointer to the ISource block that is to be unlinked. /// /// This function should not be called directly on n target_block object. Blocks should be disconnected /// using the unlink_target or unlink_targets methods on ISource blocks, which will invoke /// the unlink_source method on the corresponding target. /**/ virtual void unlink_source(_Inout_ ISource<_Source_type> * _PSource) { _Trace_agents(AGENTS_EVENT_UNLINK, ::Concurrency::details::_Trace_agents_get_id(_PSource), ::Concurrency::details::_Trace_agents_get_id(this)); _M_connectedSources.remove(_PSource); } /// /// Unlinks all source blocks from this target_block object. /// /**/ virtual void unlink_sources() { for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter) { ISource<_Source_type> * _PSource = *_Iter; _PSource->unlink_target(this); } } /// /// When overridden in a derived class, processes a message that was accepted by this target_block object. /// /// /// A pointer to the message that is to be handled. /// /**/ virtual void process_message(message<_Source_type> *) { } // // Utility routines // /// /// Registers a filter method that will be invoked on /// every message received. /// /// /// The filter method. /// /**/ void register_filter(filter_method const& _Filter) { if (_Filter != NULL) { _M_pFilter = new filter_method(_Filter); } } /// /// Indicates to the block that new messages should be declined. /// /// /// This method is called by the destructor to ensure that new messages are declined while destruction is in progress. /// /**/ void decline_incoming_messages() { _M_fDeclineMessages = true; } /// /// Initializes the base object. Specifically, the message_processor object needs /// to be initialized. /// /// /// The scheduler to be used for scheduling tasks. /// /// /// The schedule group to be used for scheduling tasks. /// /// /// /**/ void initialize_target(_Inout_opt_ Scheduler * _PScheduler = NULL, _Inout_opt_ ScheduleGroup * _PScheduleGroup = NULL) { // Register a callback with the processor _M_messageProcessor.initialize(_PScheduler, _PScheduleGroup, // Processing and Propagating function used by ordered_message_processors [this](message<_Source_type> * _PMessage) { // Handle message by calling process_message to maintain CRT100 compatibility this->process_message(_PMessage); }); // Register this target block as the owner of the connected sources _M_connectedSources.register_target_block(this); } /// /// Enables batched processing for this block. /// /**/ void enable_batched_processing() { _M_messageProcessor.initialize_batched_processing( // Processing function used by CRT110 [this](message<_Source_type> * _PMessage) { // Handle message through new process_input_message to use CRT110 batch processing this->process_input_messages(_PMessage); }, nullptr); } /// /// Asynchronously sends a message for processing. /// /// /// A pointer to the message being sent. /// /**/ void async_send(_Inout_opt_ message<_Source_type> * _PMessage) { _M_messageProcessor.async_send(_PMessage); } /// /// Synchronously send a message for processing. /// /// /// A pointer to the message being sent. /// /**/ void sync_send(_Inout_opt_ message<_Source_type> * _PMessage) { _M_messageProcessor.sync_send(_PMessage); } /// /// Waits for all asynchronous propagations to complete. /// /// /// This method is used by message block destructors to ensure all asynchronous operations /// have had time to finish before destroying the block. /// /**/ void wait_for_async_sends() { // Decline new messages to ensure that messages are not dropped during the wait decline_incoming_messages(); _M_messageProcessor.wait(); } /// /// Unlinks all sources after waiting for outstanding asynchronous send operations to complete. /// /// /// All target blocks should call this routine to remove the sources in their destructor. /// /**/ void remove_sources() { wait_for_async_sends(); unlink_sources(); } /// /// Processes messages that are received as inputs. /// /**/ virtual void process_input_messages(_Inout_ message<_Source_type> * _PMessage) { throw invalid_operation("To use batched processing, you must override process_input_messages in the message block."); } /// /// The container for all the sources connected to this block. /// /**/ _SourceLinkManager _M_connectedSources; /// /// The filter function which determines whether offered messages should be accepted. /// /**/ filter_method * _M_pFilter; /// /// A bool that is set to indicate that all messages should be declined /// in preparation for deleting the block /// /**/ bool _M_fDeclineMessages; /// /// The message_processor for this target_block. /// /**/ _MessageProcessorType _M_messageProcessor; }; //************************************************************************** // Source Block: //************************************************************************** /// /// The source_block class is an abstract base class for source-only blocks. The class /// provides basic link management functionality as well as common error checks. /// /// /// Link registry to be used for holding the target links. /// /// /// Processor type for message processing. /// /// /// Message blocks should derive from this block to take advantage of link management and /// synchronization provided by this class. /// /// /**/ template> class source_block : public ISource { public: /// /// The payload type of messages handled by this source_block. /// /**/ typedef typename _TargetLinkRegistry::type::type _Target_type; /// /// The iterator to walk the connected targets. /// /**/ typedef typename _TargetLinkRegistry::iterator target_iterator; /// /// Constructs a source_block object. /// /**/ source_block() : _M_pReservedFor(NULL), _M_reservedId(-1), _M_referenceCount(0) { _Trace_agents(AGENTS_EVENT_CREATE, ::Concurrency::details::_Trace_agents_get_id(this), ::Concurrency::details::_Trace_agents_get_id(&_M_messageProcessor)); } /// /// Destroys the source_block object. /// /**/ virtual ~source_block() { // All targets should have been unlinked _CONCRT_ASSERT(_M_connectedTargets.count() == 0); _Trace_agents(AGENTS_EVENT_DESTROY, ::Concurrency::details::_Trace_agents_get_id(this)); } /// /// Links a target block to this source_block object. /// /// /// A pointer to an ITarget block to link to this source_block object. /// /// /// The method throws an invalid_argument exception if the /// parameter is NULL. /// /**/ virtual void link_target(_Inout_ ITarget<_Target_type> * _PTarget) { _R_lock _Lock(_M_internalLock); if (_PTarget == NULL) { throw std::invalid_argument("_PTarget"); } _M_connectedTargets.add(_PTarget); _Invoke_link_source(_PTarget); link_target_notification(_PTarget); } /// /// Unlinks a target block from this source_block object. /// /// /// A pointer to an ITarget block to unlink from this source_block object. /// /// /// The method throws an invalid_argument exception if the /// parameter is NULL. /// /**/ virtual void unlink_target(_Inout_ ITarget<_Target_type> * _PTarget) { _R_lock _Lock(_M_internalLock); if (_PTarget == NULL) { throw std::invalid_argument("_PTarget"); } if (_M_connectedTargets.remove(_PTarget)) { // We were able to remove the target from our list. // Inform the target to unlink from us _Invoke_unlink_source(_PTarget); } } /// /// Unlinks all target blocks from this source_block object. /// /**/ virtual void unlink_targets() { _R_lock _Lock(_M_internalLock); for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter) { ITarget<_Target_type> * _PTarget = *_Iter; _CONCRT_ASSERT(_PTarget != NULL); unlink_target(_PTarget); } // All the targets should be unlinked. _CONCRT_ASSERT(_M_connectedTargets.count() == 0); } /// /// Accepts a message that was offered by this source_block object, transferring ownership to the caller. /// /// /// The runtime_object_identity of the offered message object. /// /// /// A pointer to the target block that is calling the accept method. /// /// /// A pointer to the message object that the caller now has ownership of. /// /// /// The method throws an invalid_argument exception if the /// parameter is NULL. /// /// The accept method is called by a target while a message is being offered by this ISource block. /// The message pointer returned may be different from the one passed into the propagate method /// of the ITarget block, if this source decides to make a copy of the message. /// /// /**/ virtual message<_Target_type> * accept(runtime_object_identity _MsgId, _Inout_ ITarget<_Target_type> * _PTarget) { if (_PTarget == NULL) { throw std::invalid_argument("_PTarget"); } // Assert if the target is not connected _CONCRT_ASSERT(_M_connectedTargets.contains(_PTarget)); return accept_message(_MsgId); } /// /// Reserves a message previously offered by this source_block object. /// /// /// The runtime_object_identity of the offered message object. /// /// /// A pointer to the target block that is calling the reserve method. /// /// /// true if the message was successfully reserved, false otherwise. Reservations can fail /// for many reasons, including: the message was already reserved or accepted by another target, the source could /// deny reservations, and so forth. /// /// /// The method throws an invalid_argument exception if the /// parameter is NULL. /// /// After you call reserve, if it succeeds, you must call either consume or release /// in order to take or give up possession of the message, respectively. /// /// /**/ virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget<_Target_type> * _PTarget) { _R_lock _Lock(_M_internalLock); if (_PTarget == NULL) { throw std::invalid_argument("_PTarget"); } if ( _M_pReservedFor != NULL) { // Someone else is holding the reservation return false; } if (!reserve_message(_MsgId)) { // Failed to reserve the msg ID return false; } // Save the reserving target and the msg ID _M_pReservedFor = _PTarget; _M_reservedId = _MsgId; return true; } /// /// Consumes a message previously offered by this source_block object and successfully reserved by the target, /// transferring ownership to the caller. /// /// /// The runtime_object_identity of the reserved message object. /// /// /// A pointer to the target block that is calling the consume method. /// /// /// A pointer to the message object that the caller now has ownership of. /// /// /// /// The method throws an invalid_argument exception if the /// parameter is NULL. /// /// /// The method throws a bad_target exception if the parameter /// does not represent the target that called reserve. /// /// /// The consume method is similar to accept, but must always be preceded by a call to reserve that /// returned true. /// /// /**/ virtual message<_Target_type> * consume(runtime_object_identity _MsgId, _Inout_ ITarget<_Target_type> * _PTarget) { _R_lock _Lock(_M_internalLock); if (_PTarget == NULL) { throw std::invalid_argument("_PTarget"); } if (_M_pReservedFor == NULL || _PTarget != _M_pReservedFor) { throw bad_target(); } message<_Target_type> * _Msg = consume_message(_MsgId); if (_Msg != NULL) { // Clear the reservation // _M_pReservedId is intentionally not reset so that it can assist in debugging _M_pReservedFor = NULL; // Reservation is assumed to block propagation. Notify that propagation can now be resumed resume_propagation(); } return _Msg; } /// /// Releases a previous successful message reservation. /// /// /// The runtime_object_identity of the reserved message object. /// /// /// A pointer to the target block that is calling the release method. /// /// /// /// The method throws an invalid_argument exception if the /// parameter is NULL. /// /// /// The method throws a bad_target exception if the parameter /// does not represent the target that called reserve. /// /// /**/ virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget<_Target_type> * _PTarget) { _R_lock _Lock(_M_internalLock); if (_PTarget == NULL) { throw std::invalid_argument("_PTarget"); } if (_PTarget != _M_pReservedFor) { throw bad_target(); } release_message(_MsgId); // Clear the reservation // _M_pReservedId is intentionally not reset so that it can assist in debugging _M_pReservedFor = NULL; // Reservation is assumed to block propagation. Notify that propagation can now be resumed resume_propagation(); } /// /// Acquires a reference count on this source_block object, to prevent deletion. /// /// /// A pointer to the target block that is calling this method. /// /// /// This method is called by an ITarget object that is being linked to this source /// during the link_target method. /// /**/ virtual void acquire_ref(_Inout_ ITarget<_Target_type> *) { _InterlockedIncrement(&_M_referenceCount); } /// /// Releases a reference count on this source_block object. /// /// /// A pointer to the target block that is calling this method. /// /// /// This method is called by an ITarget object that is being unlinked from this source. /// The source block is allowed to release any resources reserved for the target block. /// /**/ virtual void release_ref(_Inout_ ITarget<_Target_type> * _PTarget) { if (_PTarget != NULL) { _R_lock _Lock(_M_internalLock); // We assume that each target would keep a single reference on its source, so // we call unlink target notification on every release. Otherwise, we would be // required to keep a reference count per target. // Note: unlink_target_notification can check the value of this _PTarget pointer, but // must not dereference it, as it may have already been deleted. unlink_target_notification(_PTarget); } _InterlockedDecrement(&_M_referenceCount); // It is *unsafe* to touch the "this" pointer after decrementing the reference count } protected: // // Protected methods that a derived class can override to customize // the functionality // /// /// A callback that notifies that a new target has been linked to this source_block object. /// /// /// The ITarget block that was linked. /// /**/ virtual void link_target_notification(_Inout_ ITarget<_Target_type> *) { // By default, we restart propagation if there is no pending resrvation if (_M_pReservedFor == NULL) { propagate_to_any_targets(NULL); } } /// /// A callback that notifies that a target has been unlinked from this source_block object. /// /// /// The ITarget block that was unlinked. /// /**/ virtual void unlink_target_notification(_Inout_ ITarget<_Target_type> * _PTarget) { // At this point, the target has already been disconnected from the // source. It is safe to check the value of this pointer, but not // safe to dereference it, as it may have already been deleted. // If the target being unlinked is the one holding the reservation, // release the reservation if (_M_pReservedFor == _PTarget) { release(_M_reservedId, _PTarget); } } /// /// When overridden in a derived class, accepts an offered message by the source. /// Message blocks should override this method to validate the and /// return a message. /// /// /// The runtime object identity of the message object. /// /// /// A pointer to the message that the caller now has ownership of. /// /// /// To transfer ownership, the original message pointer should be returned. To maintain /// ownership, a copy of message payload needs to be made and returned. /// /**/ virtual message<_Target_type> * accept_message(runtime_object_identity _MsgId) = 0; /// /// When overridden in a derived class, reserves a message previously offered by this /// source_block object. /// /// /// The runtime_object_identity of the message object being reserved. /// /// /// true if the message was successfully reserved, false otherwise. /// /// /// After reserve is called, if it returns true, either consume or release must be called /// to either take or release ownership of the message. /// /**/ virtual bool reserve_message(runtime_object_identity _MsgId) = 0; /// /// When overridden in a derived class, consumes a message that was previously reserved. /// /// /// The runtime_object_identity of the message object being consumed. /// /// /// A pointer to the message that the caller now has ownership of. /// /// /// Similar to accept, but is always preceded by a call to reserve. /// /**/ virtual message<_Target_type> * consume_message(runtime_object_identity _MsgId) = 0; /// /// When overridden in a derived class, releases a previous message reservation. /// /// /// The runtime_object_identity of the message object being released. /// /**/ virtual void release_message(runtime_object_identity _MsgId) = 0; /// /// When overridden in a derived class, resumes propagation after a reservation has been released. /// /**/ virtual void resume_propagation() = 0; /// /// Process input messages. This is only useful for propagator blocks, which derive from source_block /// /**/ virtual void process_input_messages(_Inout_ message<_Target_type> * _PMessage) { // source_blocks do not need to process anything } /// /// Propagate messages to targets. /// /**/ virtual void propagate_output_messages() { throw invalid_operation("To use batched processing, you must override propagate_output_messages in the message block."); } /// /// When overridden in a derived class, propagates the given message to any or all of the linked targets. /// This is the main propagation routine for message blocks. /// /// /// A pointer to the message that is to be propagated. /// /**/ virtual void propagate_to_any_targets(_Inout_opt_ message<_Target_type> * _PMessage) { throw invalid_operation("To use ordered message processing, you must override propagate_to_any_targets in the message block."); } // // Utility routines // /// /// Initializes the message_propagator within this source_block. /// /// /// The scheduler to be used for scheduling tasks. /// /// /// The schedule group to be used for scheduling tasks. /// /// /// /**/ void initialize_source(_Inout_opt_ Scheduler * _PScheduler = NULL, _Inout_opt_ ScheduleGroup * _PScheduleGroup = NULL) { // Register a callback _M_messageProcessor.initialize(_PScheduler, _PScheduleGroup, [this](message<_Target_type> * _PMessage) { this->_Handle_message(_PMessage); }); } /// /// Enables batched processing for this block. /// /**/ void enable_batched_processing() { // Register callbacks for CRT110 batched processing _M_messageProcessor.initialize_batched_processing( // Processing function used by CRT110 [this](message<_Target_type> * _PMessage) { // Handle message through new process_input_message to use CRT110 batch processing this->process_input_messages(_PMessage); }, [this](void) { this->_Propagate_message(); }); } /// /// Synchronously queues up messages and starts a propagation task, if this has not been done /// already. /// /// /// A pointer to a message object to synchronously send. /// /**/ virtual void sync_send(_Inout_opt_ message<_Target_type> * _Msg) { // Caller shall not be holding any locks when calling this routine _M_messageProcessor.sync_send(_Msg); } /// /// Asynchronously queues up messages and starts a propagation task, if this has not been done /// already /// /// /// A pointer to a message object to asynchronously send. /// /**/ virtual void async_send(_Inout_opt_ message<_Target_type> * _Msg) { _M_messageProcessor.async_send(_Msg); } /// /// Waits for all asynchronous propagations to complete. This propagator-specific spin wait is used /// in destructors of message blocks to make sure that all asynchronous propagations have time to finish /// before destroying the block. /// /**/ void wait_for_outstanding_async_sends() { _M_messageProcessor.wait(); } /// /// Removes all target links for this source block. This should be called from the destructor. /// /**/ void remove_targets() { // Wait for outstanding propagation to complete. wait_for_outstanding_async_sends(); unlink_targets(); _Wait_on_ref(); } // // Protected members // /// /// Connected target that is holding a reservation /// /**/ ITarget<_Target_type> * _M_pReservedFor; /// /// Reserved message ID /// /**/ runtime_object_identity _M_reservedId; /// /// Connected targets /// /**/ _TargetLinkRegistry _M_connectedTargets; /// /// Processor used for asynchronous message handling /// /**/ _MessageProcessorType _M_messageProcessor; private: /// Private methods // Message handler callback for the propagator. Invokes propagate_to_any_targets // which derived classes should implement. /**/ void _Handle_message(message<_Target_type> * _PMessage) { // Hold a lock to synchronize with unlink targets _R_lock _Lock(_M_internalLock); propagate_to_any_targets(_PMessage); } // Message handler callback for the processor. Invokes process_input_messages // which derived classes should implement. /**/ void _Process_message(message<_Target_type> * _PMessage) { // Don't need a lock to process the message process_input_messages(_PMessage); } // Message handler callback for the propagator. Invokes propagate_output_messages // which derived classes should implement. /**/ void _Propagate_message() { // Hold a lock to synchronize with unlink targets _R_lock _Lock(_M_internalLock); propagate_output_messages(); } // Wait for the reference on this block to drop to zero /**/ void _Wait_on_ref(long _RefCount = 0) { ::Concurrency::details::_SpinWaitBackoffNone spinWait; while(_M_referenceCount != _RefCount) { spinWait._SpinOnce(); } } // Private Data members /// /// Internal lock used for the following synchronization: /// 1. Synchronize between link and unlink target /// 2. Synchronize between propagate_to_any_targets and unlink_target /// 3. Synchronize between reserve and consume/release /// /**/ ::Concurrency::details::_ReentrantPPLLock _M_internalLock; volatile long _M_referenceCount; }; //************************************************************************** // Propagator (source and target) Block: //************************************************************************** /// /// The propagator_block class is an abstract base class for message blocks that are both a source and target. /// It combines the functionality of both the source_block and target_block classes. /// /// /// The link registry to be used for holding the target links. /// /// /// The link registry to be used for holding the source links. /// /// /// The processor type for message processing. /// /// /// To avoid multiple inheritance, the propagator_block class inherits from the source_block class and ITarget /// abstract class. Most of the functionality in the target_block class is replicated here. /// /// /// /**/ template> class propagator_block : public source_block<_TargetLinkRegistry, _MessageProcessorType>, public ITarget { public: /// /// The type of the payload for the incoming message to this propagator_block. /// /**/ typedef typename _SourceLinkRegistry::type::source_type _Source_type; /// /// The type of the source_link_manager this propagator_block. /// /**/ typedef source_link_manager<_SourceLinkRegistry> _SourceLinkManager; /// /// The type of the iterator for the source_link_manager for this propagator_block. /// /**/ typedef typename _SourceLinkManager::iterator source_iterator; /// /// Constructs a propagator_block object. /// /**/ propagator_block() : _M_pFilter(NULL), _M_fDeclineMessages(false) { } /// /// Destroys a propagator_block object. /// /**/ virtual ~propagator_block() { remove_network_links(); delete _M_pFilter; } /// /// Asynchronously passes a message from a source block to this target block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /// /// The propagate method is invoked on a target block by a linked source block. It queues up an /// asynchronous task to handle the message, if one is not already queued or executing. /// The method throws an invalid_argument exception /// if either the or parameter is NULL. /// /**/ virtual message_status propagate(_Inout_opt_ message<_Source_type> * _PMessage, _Inout_opt_ ISource<_Source_type> * _PSource) { // It is important that calls to propagate do *not* take the same lock on the // internal structure that is used by consume and the LWT. Doing so could // result in a deadlock. if (_PMessage == NULL) { throw std::invalid_argument("_PMessage"); } if (_PSource == NULL) { throw std::invalid_argument("_PSource"); } if (_M_fDeclineMessages) { return declined; } if (_M_pFilter != NULL && !(*_M_pFilter)(_PMessage->payload)) { return declined; } return propagate_message(_PMessage, _PSource); } /// /// Synchronously initiates a message to this block. Called by an ISource block. /// When this function completes, the message will already have propagated into the block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /// /// This method throws an invalid_argument exception if either /// the or parameter is NULL. /// /**/ virtual message_status send(_Inout_ message<_Source_type> * _PMessage, _Inout_ ISource<_Source_type> * _PSource) { if (_PMessage == NULL) { throw std::invalid_argument("_PMessage"); } if (_PSource == NULL) { throw std::invalid_argument("_PSource"); } if (_M_fDeclineMessages) { return declined; } if (_M_pFilter != NULL && !(*_M_pFilter)(_PMessage->payload)) { return declined; } return send_message(_PMessage, _PSource); } protected: /// /// When overridden in a derived class, this method asynchronously passes a message from an ISource /// block to this propagator_block object. It is invoked by the propagate method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /**/ virtual message_status propagate_message(_Inout_ message<_Source_type> * _PMessage, _Inout_ ISource<_Source_type> * _PSource) = 0; /// /// When overridden in a derived class, this method synchronously passes a message from an ISource /// block to this propagator_block object. It is invoked by the send method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /// /// By default, this block returns declined unless overridden by a derived class. /// /**/ virtual message_status send_message(_Inout_ message<_Source_type> *, _Inout_ ISource<_Source_type> *) { // By default we do not allow send() return declined; } /// /// Links a specified source block to this propagator_block object. /// /// /// A pointer to the ISource block that is to be linked. /// /**/ virtual void link_source(_Inout_ ISource<_Source_type> * _PSource) { _M_connectedSources.add(_PSource); _Trace_agents(AGENTS_EVENT_LINK, ::Concurrency::details::_Trace_agents_get_id(_PSource), ::Concurrency::details::_Trace_agents_get_id(this)); } /// /// Unlinks a specified source block from this propagator_block object. /// /// /// A pointer to the ISource block that is to be unlinked. /// /**/ virtual void unlink_source(_Inout_ ISource<_Source_type> * _PSource) { _Trace_agents(AGENTS_EVENT_UNLINK, ::Concurrency::details::_Trace_agents_get_id(_PSource), ::Concurrency::details::_Trace_agents_get_id(this)); _M_connectedSources.remove(_PSource); } /// /// Unlinks all source blocks from this propagator_block object. /// /**/ virtual void unlink_sources() { for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter) { ISource<_Source_type> * _PSource = *_Iter; _PSource->unlink_target(this); } } // // Utility routines // /// /// Process input messages. This is only useful for propagator blocks, which derive from source_block /// /**/ virtual void process_input_messages(_Inout_ message<_Target_type> * _PMessage) { throw invalid_operation("To use batched processing, you must override process_input_messages in the message block."); } /// /// Initializes the base object. Specifically, the message_processor object needs /// to be initialized. /// /// /// The scheduler to be used for scheduling tasks. /// /// /// The schedule group to be used for scheduling tasks. /// /// /// /**/ void initialize_source_and_target(_Inout_opt_ Scheduler * _PScheduler = NULL, _Inout_opt_ ScheduleGroup * _PScheduleGroup = NULL) { initialize_source(_PScheduler, _PScheduleGroup); // Register this propagator block as the owner of the connected sources _M_connectedSources.register_target_block(this); } /// /// Registers a filter method that will be invoked on every received message. /// /// /// The filter method. /// /**/ void register_filter(filter_method const& _Filter) { if (_Filter != NULL) { _M_pFilter = new filter_method(_Filter); } } /// /// Indicates to the block that new messages should be declined. /// /// /// This method is called by the destructor to ensure that new messages are declined while destruction is in progress. /// /**/ void decline_incoming_messages() { _M_fDeclineMessages = true; } /// /// Removes all the source and target network links from this propagator_block object. /// /**/ void remove_network_links() { // Decline messages while the links are being removed decline_incoming_messages(); // Remove all the target links. This waits for // all outstanding async propagation operations. remove_targets(); // unlink all sources. The above steps guarantee that // they can be removed safely. unlink_sources(); } /// /// The container for all the sources connected to this block. /// /**/ _SourceLinkManager _M_connectedSources; /// /// The filter function which determines whether offered messages should be accepted. /// /**/ filter_method * _M_pFilter; /// /// A bool that is set to indicate that all messages should be declined /// in preparation for deleting the block /// /**/ volatile bool _M_fDeclineMessages; }; //************************************************************************** // Unbounded Buffers: //************************************************************************** /// /// An unbounded_buffer messaging block is a multi-target, multi-source, ordered /// propagator_block capable of storing an unbounded number of messages. /// /// /// The payload type of the messages stored and propagated by the buffer. /// /// /// For more information, see . /// /// /// /**/ template class unbounded_buffer : public propagator_block>, multi_link_registry>> { public: /// /// Constructs an unbounded_buffer messaging block. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this unbounded_buffer messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ unbounded_buffer() : _M_fForceRepropagation(false) { initialize_source_and_target(); enable_batched_processing(); } /// /// Constructs an unbounded_buffer messaging block. /// /// /// A filter function which determines whether offered messages should be accepted. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this unbounded_buffer messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ unbounded_buffer(filter_method const& _Filter) : _M_fForceRepropagation(false) { initialize_source_and_target(); enable_batched_processing(); register_filter(_Filter); } #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP /// /// Constructs an unbounded_buffer messaging block. /// /// /// The Scheduler object within which the propagation task for the unbounded_buffer object is scheduled. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this unbounded_buffer messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ unbounded_buffer(Scheduler& _PScheduler) : _M_fForceRepropagation(false) { initialize_source_and_target(&_PScheduler); enable_batched_processing(); } /// /// Constructs an unbounded_buffer messaging block. /// /// /// The Scheduler object within which the propagation task for the unbounded_buffer messaging block is scheduled. /// /// /// A filter function which determines whether offered messages should be accepted. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this unbounded_buffer messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ unbounded_buffer(Scheduler& _PScheduler, filter_method const& _Filter) : _M_fForceRepropagation(false) { initialize_source_and_target(&_PScheduler); enable_batched_processing(); register_filter(_Filter); } /// /// Constructs an unbounded_buffer messaging block. /// /// /// The ScheduleGroup object within which the propagation task for the unbounded_buffer messaging block is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this unbounded_buffer messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ unbounded_buffer(ScheduleGroup& _PScheduleGroup) : _M_fForceRepropagation(false) { initialize_source_and_target(NULL, &_PScheduleGroup); enable_batched_processing(); } /// /// Constructs an unbounded_buffer messaging block. /// /// /// The ScheduleGroup object within which the propagation task for the unbounded_buffer messaging block is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// A filter function which determines whether offered messages should be accepted. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this unbounded_buffer messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ unbounded_buffer(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter) : _M_fForceRepropagation(false) { initialize_source_and_target(NULL, &_PScheduleGroup); enable_batched_processing(); register_filter(_Filter); } #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */ /// /// Destroys the unbounded_buffer messaging block. /// /**/ ~unbounded_buffer() { // Remove all links remove_network_links(); // Clean up any messages left in this message block _Delete_stored_messages(); } /// /// Adds an item to the unbounded_buffer messaging block. /// /// /// The item to add. /// /// /// true if the item was accepted, false otherwise. /// /**/ bool enqueue(_Type const& _Item) { return Concurrency::send<_Type>(this, _Item); } /// /// Removes an item from the unbounded_buffer messaging block. /// /// /// The payload of the message removed from the unbounded_buffer. /// /**/ _Type dequeue() { return receive<_Type>(this); } protected: // // propagator_block protected function implementations // /// /// Asynchronously passes a message from an ISource block to this unbounded_buffer messaging block. /// It is invoked by the propagate method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /**/ virtual message_status propagate_message(_Inout_ message<_Type> * _PMessage, _Inout_ ISource<_Type> * _PSource) { // It is important that calls to propagate do *not* take the same lock on the // internal structure that is used by consume and the LWT. Doing so could // result in a deadlock. message_status _Result = accepted; // Accept the message being propagated _PMessage = _PSource->accept(_PMessage->msg_id(), this); if (_PMessage != NULL) { async_send(_PMessage); } else { _Result = missed; } return _Result; } /// /// Synchronously passes a message from an ISource block to this unbounded_buffer messaging block. /// It is invoked by the send method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /**/ virtual message_status send_message(_Inout_ message<_Type> * _PMessage, _Inout_ ISource<_Type> * _PSource) { _PMessage = _PSource->accept(_PMessage->msg_id(), this); if (_PMessage != NULL) { sync_send(_PMessage); } else { return missed; } return accepted; } /// /// Overrides the supports_anonymous_source method to indicate that this block can /// accept messages offered to it by a source that is not linked. /// /// /// true because the block does not postpone offered messages. /// /**/ virtual bool supports_anonymous_source() { return true; } /// /// Accepts a message that was offered by this unbounded_buffer messaging block, /// transferring ownership to the caller. /// /// /// The runtime_object_identity of the offered message object. /// /// /// A pointer to the message object that the caller now has ownership of. /// /**/ virtual message<_Type> * accept_message(runtime_object_identity _MsgId) { // // Peek at the head message in the message buffer. If the IDs match // dequeue and transfer ownership // message<_Type> * _Msg = NULL; if (_M_messageBuffer._Is_head(_MsgId)) { _Msg = _M_messageBuffer._Dequeue(); } return _Msg; } /// /// Reserves a message previously offered by this unbounded_buffer messaging block. /// /// /// The runtime_object_identity of the message object being reserved. /// /// /// true if the message was successfully reserved, false otherwise. /// /// /// After reserve is called, if it returns true, either consume or release must be called /// to either take or release ownership of the message. /// /**/ virtual bool reserve_message(runtime_object_identity _MsgId) { // Allow reservation if this is the head message return _M_messageBuffer._Is_head(_MsgId); } /// /// Consumes a message previously offered by the unbounded_buffer messaging block and reserved by the target, /// transferring ownership to the caller. /// /// /// The runtime_object_identity of the message object being consumed. /// /// /// A pointer to the message object that the caller now has ownership of. /// /// /// Similar to accept, but is always preceded by a call to reserve. /// /**/ virtual message<_Type> * consume_message(runtime_object_identity _MsgId) { // By default, accept the message return accept_message(_MsgId); } /// /// Releases a previous message reservation. /// /// /// The runtime_object_identity of the message object being released. /// /**/ virtual void release_message(runtime_object_identity _MsgId) { // The head message is the one reserved. if (!_M_messageBuffer._Is_head(_MsgId)) { throw message_not_found(); } } /// /// Resumes propagation after a reservation has been released. /// /**/ virtual void resume_propagation() { // If there are any messages in the buffer, propagate them out if (_M_messageBuffer._Count() > 0) { // Set the flag to force a repropagation. This flag is cleared when a propagation happens // The only functions that call this are release, consume, and link_target, all of which // hold the internal lock, so the flag is guaranteed to be read by propagation, which also // holds the same lock. _M_fForceRepropagation = true; // async send a NULL value to initiate the repropagation async_send(NULL); } } /// /// A callback that notifies that a new target has been linked to this unbounded_buffer messaging block. /// /// /// A pointer to the newly linked target. /// /**/ virtual void link_target_notification(_Inout_ ITarget<_Type> * _PTarget) { // If the message queue is blocked due to reservation // there is no need to do any message propagation if (_M_pReservedFor != NULL) { return; } message<_Type> * _Msg = _M_messageBuffer._Peek(); if (_Msg != NULL) { // Propagate the head message to the new target message_status _Status = _PTarget->propagate(_Msg, this); if (_Status == accepted) { // The target accepted the message, restart propagation. _Propagate_priority_order(_M_messageBuffer); } // If the status is anything other than accepted, then leave // the message queue blocked. } } /// /// Places the message in this unbounded_buffer messaging block and /// tries to offer it to all of the linked targets. /// virtual void process_input_messages(_Inout_ message<_Type> * _PMessage) { if (_PMessage != NULL) { _M_processedMessages._Enqueue(_PMessage); } } /// /// Places the message in this unbounded_buffer messaging block and /// tries to offer it to all of the linked targets. /// /// /// A pointer to a message object that this unbounded_buffer has taken ownership of. /// /// /// If another message is already ahead of this one in the unbounded_buffer, /// propagation to linked targets will not occur until any earlier messages have been accepted /// or consumed. The first linked target to successfully accept or consume the /// message takes ownership, and no other target can then get the message. /// /**/ virtual void propagate_output_messages() { // Move the messages from the processedMessages queue to the internal storage // to make them ready for propagating out // If there are messages in the message queue, the queue is blocked and a // propagation should not happen unless it has been forced using resume_propagation bool _FIsBlocked = (_M_messageBuffer._Count() > 0); for(;;) { message<_Type> * _PInputMessage = _M_processedMessages._Dequeue(); if(_PInputMessage == NULL) { break; } _M_messageBuffer._Enqueue(_PInputMessage); } if (_M_fForceRepropagation == false && _FIsBlocked == true) { return; } // Reset the repropagation flag because a propagation has started. _M_fForceRepropagation = false; // Attempt to propagate messages to all the targets _Propagate_priority_order(_M_messageBuffer); } private: /// /// Propagates messages in priority order. /// /// /// Reference to a message queue with messages to be propagated /// /**/ void _Propagate_priority_order(::Concurrency::details::_Queue> & _MessageBuffer) { message<_Target_type> * _Msg = _MessageBuffer._Peek(); // If someone has reserved the _Head message, don't propagate anymore if (_M_pReservedFor != NULL) { return; } while (_Msg != NULL) { message_status _Status = declined; // Always start from the first target that linked for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter) { ITarget<_Target_type> * _PTarget = *_Iter; _Status = _PTarget->propagate(_Msg, this); // Ownership of message changed. Do not propagate this // message to any other target. if (_Status == accepted) { break; } // If the target just propagated to reserved this message, stop // propagating it to others if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. Thus, nothing after it in the queue can // be propagated out. Cease propagation. if (_Status != accepted) { break; } // Get the next message _Msg = _MessageBuffer._Peek(); } } /// /// Deletes all messages currently stored in this message block. Should be called /// by the destructor to ensure any messages propagated in are cleaned up. /// /**/ void _Delete_stored_messages() { // Input messages for this message block are in the base-class input buffer // All messages in that buffer are guaranteed to have moved to the output // buffer because the destructor first waits for all async sends to finish // before reaching this point // Delete any messages remaining in the output queue for (;;) { message<_Type> * _Msg = _M_messageBuffer._Dequeue(); if (_Msg == NULL) { break; } delete _Msg; } } /// /// Message queue used to store processed messages /// /**/ ::Concurrency::details::_Queue> _M_processedMessages; /// /// Message queue used to store messages /// /**/ ::Concurrency::details::_Queue> _M_messageBuffer; /// /// A bool to signal to the processor to force a repropagation to occur /// /**/ bool _M_fForceRepropagation; private: // // Hide assignment operator and copy constructor // unbounded_buffer const &operator =(unbounded_buffer const&); // no assignment operator unbounded_buffer(unbounded_buffer const &); // no copy constructor }; //************************************************************************** // Overwrite Buffers: //************************************************************************** /// /// An overwrite_buffer messaging block is a multi-target, multi-source, ordered /// propagator_block capable of storing a single message at /// a time. New messages overwrite previously held ones. /// /// /// The payload type of the messages stored and propagated by the buffer. /// /// /// An overwrite_buffer messaging block propagates out copies of its stored message to each of its targets. /// For more information, see . /// /// /// /**/ template class overwrite_buffer : public propagator_block>, multi_link_registry>> { public: /// /// Constructs an overwrite_buffer messaging block. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this overwrite_buffer messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ overwrite_buffer() : _M_fIsInitialized(false), _M_pMessage(NULL), _M_pReservedMessage(NULL) { initialize_source_and_target(); } /// /// Constructs an overwrite_buffer messaging block. /// /// /// A filter function which determines whether offered messages should be accepted. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this overwrite_buffer messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ overwrite_buffer(filter_method const& _Filter) : _M_fIsInitialized(false), _M_pMessage(NULL), _M_pReservedMessage(NULL) { initialize_source_and_target(); register_filter(_Filter); } #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP /// /// Constructs an overwrite_buffer messaging block. /// /// /// The Scheduler object within which the propagation task for the overwrite_buffer messaging block is scheduled. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this overwrite_buffer messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ overwrite_buffer(Scheduler& _PScheduler) : _M_fIsInitialized(false), _M_pMessage(NULL), _M_pReservedMessage(NULL) { initialize_source_and_target(&_PScheduler); } /// /// Constructs an overwrite_buffer messaging block. /// /// /// The Scheduler object within which the propagation task for the overwrite_buffer messaging block is scheduled. /// /// /// A filter function which determines whether offered messages should be accepted. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this overwrite_buffer messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ overwrite_buffer(Scheduler& _PScheduler, filter_method const& _Filter) : _M_fIsInitialized(false), _M_pMessage(NULL), _M_pReservedMessage(NULL) { initialize_source_and_target(&_PScheduler); register_filter(_Filter); } /// /// Constructs an overwrite_buffer messaging block. /// /// /// The ScheduleGroup object within which the propagation task for the overwrite_buffer messaging block is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this overwrite_buffer messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ overwrite_buffer(ScheduleGroup& _PScheduleGroup) : _M_fIsInitialized(false), _M_pMessage(NULL), _M_pReservedMessage(NULL) { initialize_source_and_target(NULL, &_PScheduleGroup); } /// /// Constructs an overwrite_buffer messaging block. /// /// /// The ScheduleGroup object within which the propagation task for the overwrite_buffer messaging block is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// A filter function which determines whether offered messages should be accepted. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this overwrite_buffer messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ overwrite_buffer(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter) : _M_fIsInitialized(false), _M_pMessage(NULL), _M_pReservedMessage(NULL) { initialize_source_and_target(NULL, &_PScheduleGroup); register_filter(_Filter); } #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */ /// /// Destroys the overwrite_buffer messaging block. /// /**/ ~overwrite_buffer() { // Remove all links that are targets of this overwrite_buffer remove_network_links(); // Clean up any messages left in this message block _Delete_stored_messages(); } /// /// Checks whether this overwrite_buffer messaging block has a value yet. /// /// /// true if the block has received a value, false otherwise. /// /**/ bool has_value() const { return _M_fIsInitialized != 0; } /// /// Gets a reference to the current payload of the message being stored in the overwrite_buffer messaging block. /// /// /// The payload of the currently stored message. /// /// /// The value stored in the overwrite_buffer could change immediately after this method returns. This method will /// wait until a message arrives if no message is currently stored in the overwrite_buffer. /// /**/ _Type value() { return receive<_Type>(this); } protected: // // propagator_block protected function implementation // /// /// Asynchronously passes a message from an ISource block to this overwrite_buffer messaging block. /// It is invoked by the propagate method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /**/ virtual message_status propagate_message(_Inout_ message<_Type> * _PMessage, _Inout_ ISource<_Type> * _PSource) { // It is important that calls to propagate do *not* take the same lock on the // internal structure that is used by Consume and the LWT. Doing so could // result in a deadlock with the Consume call. message_status _Result = accepted; _PMessage = _PSource->accept(_PMessage->msg_id(), this); // // If message was accepted, set the member variables for // this block and start the asynchronous propagation task // if (_PMessage != NULL) { // Add a reference for the async_send holding the message _PMessage->add_ref(); async_send(_PMessage); } else { _Result = missed; } return _Result; } /// /// Synchronously passes a message from an ISource block to this overwrite_buffer messaging block. /// It is invoked by the send method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /**/ virtual message_status send_message(_Inout_ message<_Type> * _PMessage, _Inout_ ISource<_Type> * _PSource) { _PMessage = _PSource->accept(_PMessage->msg_id(), this); // // If message was accepted, set the member variables for // this block and start the asynchronous propagation task // if (_PMessage != NULL) { // Add a reference for the sync_send holding the message _PMessage->add_ref(); sync_send(_PMessage); } else { return missed; } return accepted; } /// /// Overrides the supports_anonymous_source method to indicate that this block can /// accept messages offered to it by a source that is not linked. /// /// /// true because the block does not postpone offered messages. /// /**/ virtual bool supports_anonymous_source() { return true; } /// /// Accepts a message that was offered by this overwrite_buffer messaging block, /// returning a copy of the message to the caller. /// /// /// The runtime_object_identity of the offered message object. /// /// /// A pointer to the message object that the caller now has ownership of. /// /// /// The overwrite_buffer messaging block returns copies of the message /// to its targets, rather than transferring ownership of the currently /// held message. /// /**/ virtual message<_Type> * accept_message(runtime_object_identity _MsgId) { // // If the internal message has not yet been initialized yet, return NULL // if (_M_pMessage == NULL) { return NULL; } // // Instead of returning the internal message, we return a copy of the // message stored. // // Because we are returning a copy, the accept routine for an overwritebuffer // does not need to grab the internalLock // message<_Type> * _Msg = NULL; if (_M_pMessage->msg_id() == _MsgId) { _Msg = new message<_Type>(_M_pMessage->payload); } return _Msg; } /// /// Reserves a message previously offered by this overwrite_buffer messaging block. /// /// /// The runtime_object_identity of the message object being reserved. /// /// /// true if the message was successfully reserved, false otherwise. /// /// /// After reserve is called, if it returns true, either consume or release must be called /// to either take or release ownership of the message. /// /**/ virtual bool reserve_message(runtime_object_identity _MsgId) { // Ensure that this message currently exists in the overwrite buffer if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId) { return false; } // Can only reserve one message, any other blocks trying to reserve // will return false _CONCRT_ASSERT(_M_pReservedMessage == NULL); // Save this message away _M_pReservedMessage = _M_pMessage; // Add a reference for this message to prevent deletion _M_pReservedMessage->add_ref(); return true; } /// /// Consumes a message previously offered by the overwrite_buffer messaging block and reserved by the target, /// returning a copy of the message to the caller. /// /// /// The runtime_object_identity of the message object being consumed. /// /// /// A pointer to the message object that the caller now has ownership of. /// /// /// Similar to accept, but is always preceded by a call to reserve. /// /**/ virtual message<_Type> * consume_message(runtime_object_identity _MsgId) { // Leave and return NULL if this msgId doesn't match the reserved message // Otherwise this is a pull of a later overwritten message, and messages // could them appear out of order. if (_M_pReservedMessage != NULL && _M_pReservedMessage->msg_id() != _MsgId) { return NULL; } // This redundant assert is specifically to make the /analyze switch happy, which cannot recognize the same assertion above in if stmnt. _CONCRT_ASSERT( _M_pReservedMessage != NULL ); _Type _Payload = _M_pReservedMessage->payload; // Take the reserved message message<_Type> * _Result = new message<_Type>(_Payload); if (_M_pReservedMessage->remove_ref() == 0) { delete _M_pReservedMessage; } _M_pReservedMessage = NULL; return _Result; } /// /// Releases a previous message reservation. /// /// /// The runtime_object_identity of the message object being released. /// /**/ virtual void release_message(runtime_object_identity _MsgId) { _CONCRT_ASSERT(_M_fIsInitialized); _CONCRT_ASSERT(_M_pReservedMessage != NULL); if (_MsgId != _M_pReservedMessage->msg_id()) { throw message_not_found(); } if (_M_pReservedMessage->remove_ref() == 0) { delete _M_pReservedMessage; } _M_pReservedMessage = NULL; } /// /// Resumes propagation after a reservation has been released. /// /**/ virtual void resume_propagation() { // On reservation we do not stop propagation. So there // is nothing to be done to resume propagation. } /// /// A callback that notifies that a new target has been linked to this overwrite_buffer messaging block. /// /// /// A pointer to the newly linked target. /// /**/ virtual void link_target_notification(_Inout_ ITarget<_Type> * _PTarget) { // If there is a message available already, propagate it if (_M_pMessage != NULL) { _PTarget->propagate(_M_pMessage, this); } } /// /// Places the message in this overwrite_buffer messaging block and /// offers it to all of the linked targets. /// /// /// A pointer to a message object that this overwrite_buffer has taken ownership of. /// /// /// This method overwrites the current message in the overwrite_buffer with the newly /// accepted message . /// /**/ virtual void propagate_to_any_targets(_Inout_ message<_Type> * _PMessage) { // Move the message from the queuedMessages Buffer to the internal storage // Add a reference for the overwrite_buffer holding the message _PMessage->add_ref(); if (_M_pMessage != NULL) { if (_M_pMessage->remove_ref() == 0) { delete _M_pMessage; } } _M_pMessage = _PMessage; // Now that message has been received, set this block as initialized _M_fIsInitialized = true; for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter) { // Overwrite buffers can propagate its message out // to any number of Targets ITarget<_Type> * _PTarget = *_Iter; _PTarget->propagate(_PMessage, this); } if (_PMessage->remove_ref() == 0) { delete _PMessage; } } private: /// /// Deletes all messages currently stored in this message block. Should be called /// by the destructor to ensure any messages propagated in are cleaned up. /// /**/ void _Delete_stored_messages() { // Input messages for this message block are in the base-class input buffer // All messages in that buffer are guaranteed to have moved to the output // buffer because the destructor first waits for all async sends to finish // before reaching this point // The messages for an overwrite buffer are deleted when overwritten // through reference counting. This final check is put in place in // case any message still exists in the buffer when the overwrite_buffer // is deleted. The reference count of this message has not yet reached // zero because it hasn't been overwritten yet. It is safe because of // we have finished all propagation. if (_M_pMessage != NULL) { // A block can only have a reserved message after receiving a message // at some point, so it must be within the above if-clause. // Now delete the reserved message if it is non-NULL and different from // the saved internal message if (_M_pReservedMessage != NULL && _M_pReservedMessage != _M_pMessage) { delete _M_pReservedMessage; } delete _M_pMessage; } } // // Private Data Members // // The message being stored message<_Type> * _M_pMessage; // The message being reserved message<_Type> * _M_pReservedMessage; // The marker for whether the overwrite buffer has already been initialized volatile bool _M_fIsInitialized; private: // // Hide assignment operator and copy constructor // overwrite_buffer const &operator =(overwrite_buffer const&); // no assignment operator overwrite_buffer(overwrite_buffer const &); // no copy constructor }; //************************************************************************** // Call: //************************************************************************** /// /// A call messaging block is a multi-source, ordered target_block that /// invokes a specified function when receiving a message. /// /// /// The payload type of the messages propagated to this block. /// /// /// The signature of functions that this block can accept. /// /// /// For more information, see . /// /// /**/ template> class call : public target_block>> { /// /// The function type that this block executes upon receiving a message. /// /**/ typedef _FunctorType _Call_method; public: /// /// Constructs a call messaging block. /// /// /// A function that will be invoked for each accepted message. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature void (_Type const &) /// which is invoked by this call messaging block to process a message. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this call messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ call(_Call_method const& _Func) : _M_pFunc(_Func) { initialize_target(); enable_batched_processing(); } /// /// Constructs a call messaging block. /// /// /// A filter function which determines whether offered messages should be accepted. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature void (_Type const &) /// which is invoked by this call messaging block to process a message. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this call messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ call(_Call_method const& _Func, filter_method const& _Filter) : _M_pFunc(_Func) { initialize_target(); enable_batched_processing(); register_filter(_Filter); } #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP /// /// Constructs a call messaging block. /// /// /// The Scheduler object within which the propagation task for the call messaging block is scheduled. /// /// /// A function that will be invoked for each accepted message. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature void (_Type const &) /// which is invoked by this call messaging block to process a message. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this call messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ call(Scheduler& _PScheduler, _Call_method const& _Func) : _M_pFunc(_Func) { initialize_target(&_PScheduler); enable_batched_processing(); } /// /// Constructs a call messaging block. /// /// /// The Scheduler object within which the propagation task for the call messaging block is scheduled. /// /// /// A function that will be invoked for each accepted message. /// /// /// A filter function which determines whether offered messages should be accepted. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature void (_Type const &) /// which is invoked by this call messaging block to process a message. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this call messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ call(Scheduler& _PScheduler, _Call_method const& _Func, filter_method const& _Filter) : _M_pFunc(_Func) { initialize_target(&_PScheduler); enable_batched_processing(); register_filter(_Filter); } /// /// Constructs a call messaging block. /// /// /// The ScheduleGroup object within which the propagation task for the call messaging block is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// A function that will be invoked for each accepted message. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature void (_Type const &) /// which is invoked by this call messaging block to process a message. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this call messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ call(ScheduleGroup& _PScheduleGroup, _Call_method const& _Func) : _M_pFunc(_Func) { initialize_target(NULL, &_PScheduleGroup); enable_batched_processing(); } /// /// Constructs a call messaging block. /// /// /// The ScheduleGroup object within which the propagation task for the call messaging block is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// A function that will be invoked for each accepted message. /// /// /// A filter function which determines whether offered messages should be accepted. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature void (_Type const &) /// which is invoked by this call messaging block to process a message. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this call messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ call(ScheduleGroup& _PScheduleGroup, _Call_method const& _Func, filter_method const& _Filter) : _M_pFunc(_Func) { initialize_target(NULL, &_PScheduleGroup); enable_batched_processing(); register_filter(_Filter); } #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */ /// /// Destroys the call messaging block. /// /**/ ~call() { remove_sources(); } protected: // // target_block protected function implementations // /// /// Asynchronously passes a message from an ISource block to this call messaging block. It is invoked /// by the propagate method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /**/ virtual message_status propagate_message(_Inout_ message<_Type> * _PMessage, _Inout_ ISource<_Type> * _PSource) { // It is important that calls to propagate do *not* take the same lock on the // internal structure that is used by Consume and the LWT. Doing so could // result in a deadlock with the Consume call. message_status _Result = accepted; // // Accept the message being propagated // Note: depending on the source block propagating the message // this may not necessarily be the same message (pMessage) first // passed into the function. // _PMessage = _PSource->accept(_PMessage->msg_id(), this); if (_PMessage != NULL) { async_send(_PMessage); } else { _Result = missed; } return _Result; } /// /// Synchronously passes a message from an ISource block to this call messaging block. It is invoked /// by the send method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /**/ virtual message_status send_message(_Inout_ message<_Type> * _PMessage, _Inout_ ISource<_Type> * _PSource) { message_status _Result = accepted; // // Accept the message being propagated // Note: depending on the source block propagating the message // this may not necessarily be the same message (pMessage) first // passed into the function. // _PMessage = _PSource->accept(_PMessage->msg_id(), this); if (_PMessage != NULL) { sync_send(_PMessage); } else { _Result = missed; } return _Result; } /// /// Overrides the supports_anonymous_source method to indicate that this block can /// accept messages offered to it by a source that is not linked. /// /// /// true because the block does not postpone offered messages. /// /**/ virtual bool supports_anonymous_source() { return true; } /// /// Processes a message that was accepted by this call messaging block. /// /// /// A pointer to the message that is to be handled. /// /**/ virtual void process_message(_Inout_ message<_Type> * _PMessage) { // No longer necessary with CRT110 change } /// /// Executes the call function on the input messages. /// /**/ virtual void process_input_messages(_Inout_ message<_Type> * _PMessage) { // Invoke the function provided by the user _CONCRT_ASSERT(_PMessage != NULL); _M_pFunc(_PMessage->payload); delete _PMessage; } private: // // Private Data Members // // The call method called by this block _Call_method _M_pFunc; private: // // Hide assignment operator and copy constructor // call const &operator =(call const&); // no assignment operator call(call const &); // no copy constructor }; //************************************************************************** // Transformer: //************************************************************************** /// /// A transformer messaging block is a single-target, multi-source, ordered /// propagator_block which can accept messages of one type and is /// capable of storing an unbounded number of messages of a different type. /// /// /// The payload type of the messages accepted by the buffer. /// /// /// The payload type of the messages stored and propagated out by the buffer. /// /// /// For more information, see . /// /// /**/ template class transformer : public propagator_block>, multi_link_registry>> { typedef std::tr1::function<_Output(_Input const&)> _Transform_method; public: /// /// Constructs a transformer messaging block. /// /// /// A function that will be invoked for each accepted message. /// /// /// A pointer to a target block to link with the transformer. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature _Output (_Input const &) /// which is invoked by this transformer messaging block to process a message. /// The type is a functor with signature bool (_Input const &) /// which is invoked by this transformer messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ transformer(_Transform_method const& _Func, _Inout_opt_ ITarget<_Output> * _PTarget = NULL) : _M_pFunc(_Func) { initialize_source_and_target(); if (_PTarget != NULL) { link_target(_PTarget); } } /// /// Constructs a transformer messaging block. /// /// /// A function that will be invoked for each accepted message. /// /// /// A pointer to a target block to link with the transform. /// /// /// A filter function which determines whether offered messages should be accepted. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature _Output (_Input const &) /// which is invoked by this transformer messaging block to process a message. /// The type is a functor with signature bool (_Input const &) /// which is invoked by this transformer messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ transformer(_Transform_method const& _Func, _Inout_opt_ ITarget<_Output> * _PTarget, filter_method const& _Filter) : _M_pFunc(_Func) { initialize_source_and_target(); register_filter(_Filter); if (_PTarget != NULL) { link_target(_PTarget); } } #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP /// /// Constructs a transformer messaging block. /// /// /// The Scheduler object within which the propagation task for the transformer messaging block is scheduled. /// /// /// A function that will be invoked for each accepted message. /// /// /// A pointer to a target block to link with the transformer. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature _Output (_Input const &) /// which is invoked by this transformer messaging block to process a message. /// The type is a functor with signature bool (_Input const &) /// which is invoked by this transformer messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ transformer(Scheduler& _PScheduler, _Transform_method const& _Func, _Inout_opt_ ITarget<_Output> * _PTarget = NULL) : _M_pFunc(_Func) { initialize_source_and_target(&_PScheduler); if (_PTarget != NULL) { link_target(_PTarget); } } /// /// Constructs a transformer messaging block. /// /// /// The Scheduler object within which the propagation task for the transformer messaging block is scheduled. /// /// /// A function that will be invoked for each accepted message. /// /// /// A pointer to a target block to link with the transformer. /// /// /// A filter function which determines whether offered messages should be accepted. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature _Output (_Input const &) /// which is invoked by this transformer messaging block to process a message. /// The type is a functor with signature bool (_Input const &) /// which is invoked by this transformer messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ transformer(Scheduler& _PScheduler, _Transform_method const& _Func, _Inout_opt_ ITarget<_Output> * _PTarget, filter_method const& _Filter) : _M_pFunc(_Func) { initialize_source_and_target(&_PScheduler); register_filter(_Filter); if (_PTarget != NULL) { link_target(_PTarget); } } /// /// Constructs a transformer messaging block. /// /// /// The ScheduleGroup object within which the propagation task for the transformer messaging block is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// A function that will be invoked for each accepted message. /// /// /// A pointer to a target block to link with the transformer. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature _Output (_Input const &) /// which is invoked by this transformer messaging block to process a message. /// The type is a functor with signature bool (_Input const &) /// which is invoked by this transformer messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ transformer(ScheduleGroup& _PScheduleGroup, _Transform_method const& _Func, _Inout_opt_ ITarget<_Output> * _PTarget = NULL) : _M_pFunc(_Func) { initialize_source_and_target(NULL, &_PScheduleGroup); if (_PTarget != NULL) { link_target(_PTarget); } } /// /// Constructs a transformer messaging block. /// /// /// The ScheduleGroup object within which the propagation task for the transformer messaging block is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// A function that will be invoked for each accepted message. /// /// /// A pointer to a target block to link with the transformer. /// /// /// A filter function which determines whether offered messages should be accepted. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature _Output (_Input const &) /// which is invoked by this transformer messaging block to process a message. /// The type is a functor with signature bool (_Input const &) /// which is invoked by this transformer messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ transformer(ScheduleGroup& _PScheduleGroup, _Transform_method const& _Func, _Inout_opt_ ITarget<_Output> * _PTarget, filter_method const& _Filter) : _M_pFunc(_Func) { initialize_source_and_target(NULL, &_PScheduleGroup); register_filter(_Filter); if (_PTarget != NULL) { link_target(_PTarget); } } #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */ /// /// Destroys the transformer messaging block. /// /**/ ~transformer() { // Remove all links remove_network_links(); // Clean up any messages left in this message block _Delete_stored_messages(); } protected: // Propagator_block protected function implementations /// /// Asynchronously passes a message from an ISource block to this transformer messaging block. /// It is invoked by the propagate method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /**/ virtual message_status propagate_message(_Inout_ message<_Input> * _PMessage, _Inout_ ISource<_Input> * _PSource) { // It is important that calls to propagate do *not* take the same lock on the // internal structure that is used by Consume and the LWT. Doing so could // result in a deadlock with the Consume call. message_status _Result = accepted; // // Accept the message being propagated // Note: depending on the source block propagating the message // this may not necessarily be the same message (pMessage) first // passed into the function. // _PMessage = _PSource->accept(_PMessage->msg_id(), this); if (_PMessage != NULL) { // Enqueue the input message _M_inputMessages.push(_PMessage); async_send(NULL); } else { _Result = missed; } return _Result; } /// /// Synchronously passes a message from an ISource block to this transformer messaging block. /// It is invoked by the send method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /**/ virtual message_status send_message(_Inout_ message<_Input> * _PMessage, _Inout_ ISource<_Input> * _PSource) { _PMessage = _PSource->accept(_PMessage->msg_id(), this); if (_PMessage != NULL) { // Enqueue the input message _M_inputMessages.push(_PMessage); sync_send(NULL); } else { return missed; } return accepted; } /// /// Overrides the supports_anonymous_source method to indicate that this block can /// accept messages offered to it by a source that is not linked. /// /// /// true because the block does not postpone offered messages. /// /**/ virtual bool supports_anonymous_source() { return true; } /// /// Accepts a message that was offered by this transformer messaging block, /// transferring ownership to the caller. /// /// /// The runtime_object_identity of the offered message object. /// /// /// A pointer to the message object that the caller now has ownership of. /// /**/ virtual message<_Output> * accept_message(runtime_object_identity _MsgId) { // // Peek at the head message in the message buffer. If the IDs match // dequeue and transfer ownership // message<_Output> * _Msg = NULL; if (_M_messageBuffer._Is_head(_MsgId)) { _Msg = _M_messageBuffer._Dequeue(); } return _Msg; } /// /// Reserves a message previously offered by this transformer messaging block. /// /// /// The runtime_object_identity of the message object being reserved. /// /// /// true if the message was successfully reserved, false otherwise. /// /// /// After reserve is called, if it returns true, either consume or release must be called /// to either take or release ownership of the message. /// /**/ virtual bool reserve_message(runtime_object_identity _MsgId) { // Allow reservation if this is the head message return _M_messageBuffer._Is_head(_MsgId); } /// /// Consumes a message previously offered by the transformer and reserved by the target, /// transferring ownership to the caller. /// /// /// The runtime_object_identity of the message object being consumed. /// /// /// A pointer to the message object that the caller now has ownership of. /// /// /// Similar to accept, but is always preceded by a call to reserve. /// /**/ virtual message<_Output> * consume_message(runtime_object_identity _MsgId) { // By default, accept the message return accept_message(_MsgId); } /// /// Releases a previous message reservation. /// /// /// The runtime_object_identity of the message object being released. /// /**/ virtual void release_message(runtime_object_identity _MsgId) { // The head message is the one reserved. if (!_M_messageBuffer._Is_head(_MsgId)) { throw message_not_found(); } } /// /// Resumes propagation after a reservation has been released. /// /**/ virtual void resume_propagation() { // If there are any messages in the buffer, propagate them out if (_M_messageBuffer._Count() > 0) { // async send a NULL value to initiate the repropagation async_send(NULL); } } /// /// A callback that notifies that a new target has been linked to this transformer messaging block. /// /// /// A pointer to the newly linked target. /// /**/ virtual void link_target_notification(_Inout_ ITarget<_Output> *) { // If the message queue is blocked due to reservation // there is no need to do any message propagation if (_M_pReservedFor != NULL) { return; } _Propagate_priority_order(_M_messageBuffer); } /// /// Executes the transformer function on the input messages. /// /**/ virtual void propagate_to_any_targets(_Inout_opt_ message<_Output> *) { message<_Output> * _Msg = NULL; // Process input message. message<_Input> * _PInputMessage = NULL; _M_inputMessages.try_pop(_PInputMessage); if (_PInputMessage != NULL) { // Invoke the TransformMethod on the data // Let exceptions flow _Output _Out = _M_pFunc(_PInputMessage->payload); // Reuse the input message ID _Msg = new message<_Output>(_Out, _PInputMessage->msg_id()); _M_messageBuffer._Enqueue(_Msg); // Message cleanup delete _PInputMessage; if (!_M_messageBuffer._Is_head(_Msg->msg_id())) { return; } } _Propagate_priority_order(_M_messageBuffer); } private: /// /// Propagates messages in priority order. /// /// /// Reference to a message queue with messages to be propagated /// /**/ void _Propagate_priority_order(::Concurrency::details::_Queue> & _MessageBuffer) { message<_Target_type> * _Msg = _MessageBuffer._Peek(); // If someone has reserved the _Head message, don't propagate anymore if (_M_pReservedFor != NULL) { return; } while (_Msg != NULL) { message_status _Status = declined; // Always start from the first target that linked for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter) { ITarget<_Target_type> * _PTarget = *_Iter; _Status = _PTarget->propagate(_Msg, this); // Ownership of message changed. Do not propagate this // message to any other target. if (_Status == accepted) { break; } // If the target just propagated to reserved this message, stop // propagating it to others if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. Thus, nothing after it in the queue can // be propagated out. Cease propagation. if (_Status != accepted) { break; } // Get the next message _Msg = _MessageBuffer._Peek(); } } /// /// Deletes all messages currently stored in this message block. Should be called /// by the destructor to ensure any messages propagated in are cleaned up. /// /**/ void _Delete_stored_messages() { // Delete input messages // Because the transformer uses its own input queue, it's possible there are messages // in this queue and no LWT will be executed to handle them. message<_Input> * _PInputQueueMessage = NULL; while (_M_inputMessages.try_pop(_PInputQueueMessage)) { // Message cleanup delete _PInputQueueMessage; } // Delete any messages remaining in the output queue for (;;) { message<_Output> * _Msg = _M_messageBuffer._Dequeue(); if (_Msg == NULL) { break; } delete _Msg; } } // // Private Data Members // // The transformer method called by this block _Transform_method _M_pFunc; // The queue of input messages for this Transformer block concurrent_queue *> _M_inputMessages; /// /// Message queue used to store outbound messages /// /**/ ::Concurrency::details::_Queue> _M_messageBuffer; private: // // Hide assignment operator and copy constructor // transformer const &operator =(transformer const &); // no assignment operator transformer(transformer const &); // no copy constructor }; //************************************************************************** // Timer: //************************************************************************** /// /// A timer messaging block is a single-target source_block capable of sending /// a message to its target after a specified time period has elapsed /// or at specific intervals. /// /// /// The payload type of the output messages of this block. /// /// /// For more information, see . /// /**/ template class timer : public Concurrency::details::_Timer, public source_block>> { private: /// /// Tracks the state machine of the timer. /// /**/ enum State { /// /// The timer has been initialized, but not yet started. /// /**/ Initialized, /// /// The timer has been started. /// /**/ Started, /// /// The timer has started and been paused. /// /**/ Paused, /// /// The timer has been stopped. /// /**/ Stopped }; public: /// /// Constructs a timer messaging block that will fire a given message after a specified interval. /// /// /// The number of milliseconds that must elapse after the call to start for the specified message /// to be propagated downstream. /// /// /// The value which will be propagated downstream when the timer elapses. /// /// /// The target to which the timer will propagate its message. /// /// /// If true, indicates that the timer will fire periodically every milliseconds. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// /// /// /**/ timer(unsigned int _Ms, _Type const& _Value, ITarget<_Type> *_PTarget = NULL, bool _Repeating = false) : _Timer(_Ms, _Repeating) { _Initialize(_Value, _PTarget, _Repeating); } #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP /// /// Constructs a timer messaging block that will fire a given message after a specified interval. /// /// /// The Scheduler object within which the propagation task for the timer messaging block is scheduled is scheduled. /// /// /// The number of milliseconds that must elapse after the call to start for the specified message /// to be propagated downstream. /// /// /// The value which will be propagated downstream when the timer elapses. /// /// /// The target to which the timer will propagate its message. /// /// /// If true, indicates that the timer will fire periodically every milliseconds. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// /// /// /**/ timer(Scheduler& _Scheduler, unsigned int _Ms, _Type const& _Value, _Inout_opt_ ITarget<_Type> *_PTarget = NULL, bool _Repeating = false) : _Timer(_Ms, _Repeating) { _Initialize(_Value, _PTarget, _Repeating, &_Scheduler); } /// /// Constructs a timer messaging block that will fire a given message after a specified interval. /// /// /// The ScheduleGroup object within which the propagation task for the timer messaging block is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// The number of milliseconds that must elapse after the call to start for the specified message /// to be propagated downstream. /// /// /// The value which will be propagated downstream when the timer elapses. /// /// /// The target to which the timer will propagate its message. /// /// /// If true, indicates that the timer will fire periodically every milliseconds. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// /// /// /**/ timer(ScheduleGroup& _ScheduleGroup, unsigned int _Ms, _Type const& _Value, _Inout_opt_ ITarget<_Type> *_PTarget = NULL, bool _Repeating = false) : _Timer(_Ms, _Repeating) { _Initialize(_Value, _PTarget, _Repeating, NULL, &_ScheduleGroup); } #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */ /// /// Destroys a timer messaging block. /// /**/ ~timer() { // // Make sure there are no more outstanding timer fires. Note that this does not mean that the LWT that was queued is finished, it only // means that no more timers will fire after the return from _Stop. We still *MUST* wait on any outstanding LWTs. // if (_M_state == Started) _Stop(); // Remove all the targets. This will wait for any outstanding LWTs remove_targets(); // // No more asynchronous operations can happen as of this point. // // Clean up any messages left in this message block _Delete_stored_messages(); if (_M_fReferencedScheduler) { ::Concurrency::details::_Scheduler(_M_pScheduler)._Release(); } } /// /// Starts the timer messaging block. The specified number of milliseconds after this is called, the specified value will be propagated /// downstream as a message. /// /**/ void start() { if (_M_state == Initialized || _M_state == Paused) { _M_state = Started; _Start(); } } /// /// Stops the timer messaging block. /// /**/ void stop() { if (_M_state == Started) _Stop(); _M_state = Stopped; } /// /// Stops the timer messaging block. If it is a repeating timer messaging block, it can be restarted with a subsequent /// start() call. For non-repeating timers, this has the same effect as a stop call. /// /**/ void pause() { // // Non repeating timers cannot pause. They go to a final stopped state on pause. // if (!_M_fRepeating) { stop(); } else { // Pause only a started timer. if (_M_state == Started) { _Stop(); _M_state = Paused; } } } protected: /// /// Accepts a message that was offered by this timer messaging block, /// transferring ownership to the caller. /// /// /// The runtime_object_identity of the offered message object. /// /// /// A pointer to the message object that the caller now has ownership of. /// /**/ virtual message<_Type> * accept_message(runtime_object_identity _MsgId) { if (_M_pMessage == NULL || _MsgId != _M_pMessage->msg_id()) { return NULL; } message<_Type> *_PMessage = _M_pMessage; _M_pMessage = NULL; return _PMessage; } /// /// Reserves a message previously offered by this timer messaging block. /// /// /// The runtime_object_identity of the message object being reserved. /// /// /// true if the message was successfully reserved, false otherwise. /// /// /// After reserve is called, if it returns true, either consume or release must be called /// to either take or release ownership of the message. /// /**/ virtual bool reserve_message(runtime_object_identity _MsgId) { // // Semantically, every timer tick is the same value -- it doesn't matter the message ID. Because we can only // have one target as well, we do not need to track anything here. // if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId) { return false; } return true; } /// /// Consumes a message previously offered by the timer and reserved by the target, /// transferring ownership to the caller. /// /// /// The runtime_object_identity of the message object being consumed. /// /// /// A pointer to the message object that the caller now has ownership of. /// /// /// Similar to accept, but is always preceded by a call to reserve. /// /**/ virtual message<_Type> * consume_message(runtime_object_identity _MsgId) { return accept_message(_MsgId); } /// /// Releases a previous message reservation. /// /// /// The runtime_object_identity of the message object being released. /// /**/ virtual void release_message(runtime_object_identity _MsgId) { if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId) { throw message_not_found(); } delete _M_pMessage; _M_pMessage = NULL; } /// /// Resumes propagation after a reservation has been released. /// /**/ virtual void resume_propagation() { // Because reservation doesn't prevent propagation there is // no need to resume on consume/release. } /// /// A callback that notifies that a new target has been linked to this timer messaging block. /// /// /// A pointer to the newly linked target. /// /**/ virtual void link_target_notification(_Inout_ ITarget<_Type> * _PTarget) { // If there is a timer message sitting around, it must be propagated to the target now. if (_M_pMessage != NULL) { _PTarget->propagate(_M_pMessage, this); } } /// /// Tries to offer the message produced by the timer block to all of the linked targets. /// /**/ virtual void propagate_to_any_targets(_Inout_opt_ message<_Type> *) { if (_M_pMessage == NULL) { _M_pMessage = _NewMessage(); for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter) { ITarget<_Type> * _PTarget = *_Iter; _PTarget->propagate(_M_pMessage, this); } } } private: // The timer message we contain message<_Type> *_M_pMessage; // Current state of the timer. State _M_state; // The value to send on elapse of the timer. _Type _M_value; // An indication of whether the timer is repeating. bool _M_fRepeating; // A flag for whether we need to release a reference on the scheduler. bool _M_fReferencedScheduler; // Scheduler used for the timer Scheduler * _M_pScheduler; /// /// Allocates a new message. /// /**/ message<_Type>* _NewMessage() const { return new message<_Type>(_M_value); } /// /// Called when the timer fires. /// /**/ virtual void _Fire() { async_send(NULL); } /// /// Common initialization. /// /// /// The value which will be propagated downstream when the timer elapses. /// /// /// The target to which the timer will propagate its message. /// /// /// If true, indicates that the timer will fire periodically every _Ms milliseconds. /// /**/ void _Initialize(const _Type& _Value, _Inout_ ITarget<_Type> *_PTarget, bool _Repeating, _Inout_opt_ Scheduler * _PScheduler = NULL, _Inout_opt_ ScheduleGroup * _PScheduleGroup = NULL) { _M_pMessage = NULL; _M_value = _Value; _M_fRepeating = _Repeating; _M_state = Initialized; _M_fReferencedScheduler = false; // // If we are going to utilize the current scheduler for timer firing, we need to capture it now. Otherwise, // the timer threads fired from Windows (what _Fire executes within) will wind up with a default scheduler // attached -- probably not the semantic we want. // if (_PScheduleGroup == NULL && _PScheduler == NULL) { ::Concurrency::details::_Scheduler _sched = ::Concurrency::details::_CurrentScheduler::_Get(); _PScheduler = _sched._GetScheduler(); _sched._Reference(); _M_fReferencedScheduler = true; } _M_pScheduler = _PScheduler; initialize_source(_PScheduler, _PScheduleGroup); if (_PTarget != NULL) { link_target(_PTarget); } } /// /// Deletes all messages currently stored in this message block. Should be called /// by the destructor to ensure any messages propagated in are cleaned up. /// /**/ void _Delete_stored_messages() { // Input messages for this message block are in the base-class input buffer // All messages in that buffer are guaranteed to have moved to the output // buffer because the destructor first waits for all async sends to finish // before reaching this point // Delete the message remaining in the output queue if (_M_pMessage != NULL) { delete _M_pMessage; } } private: // // Hide assignment operator and copy constructor // timer const &operator =(timer const &); // no assignment operator timer(timer const &); // no copy constructor }; //************************************************************************** // Single assignment: //************************************************************************** /// /// A single_assignment messaging block is a multi-target, multi-source, ordered /// propagator_block capable of storing a single, write-once /// message. /// /// /// The payload type of the message stored and propagated by the buffer. /// /// /// A single_assignment messaging block propagates out copies of its message to each target. /// For more information, see . /// /// /// /**/ template class single_assignment : public propagator_block>, multi_link_registry>> { public: /// /// Constructs a single_assignment messaging block. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this single_assignment messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ single_assignment() : _M_fIsInitialized(false), _M_pMessage(NULL) { initialize_source_and_target(); } /// /// Constructs a single_assignment messaging block. /// /// /// A filter function which determines whether offered messages should be accepted. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this single_assignment messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ single_assignment(filter_method const& _Filter) : _M_fIsInitialized(false), _M_pMessage(NULL) { initialize_source_and_target(); register_filter(_Filter); } #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP /// /// Constructs a single_assignment messaging block. /// /// /// The Scheduler object within which the propagation task for the single_assignment messaging block is scheduled. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this single_assignment messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ single_assignment(Scheduler& _PScheduler) : _M_fIsInitialized(false), _M_pMessage(NULL) { initialize_source_and_target(&_PScheduler); } /// /// Constructs a single_assignment messaging block. /// /// /// The Scheduler object within which the propagation task for the single_assignment messaging block is scheduled. /// /// /// A filter function which determines whether offered messages should be accepted. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this single_assignment messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ single_assignment(Scheduler& _PScheduler, filter_method const& _Filter) : _M_fIsInitialized(false), _M_pMessage(NULL) { initialize_source_and_target(&_PScheduler); register_filter(_Filter); } /// /// Constructs a single_assignment messaging block. /// /// /// The ScheduleGroup object within which the propagation task for the single_assignment messaging block is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this single_assignment messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ single_assignment(ScheduleGroup& _PScheduleGroup) : _M_fIsInitialized(false), _M_pMessage(NULL) { initialize_source_and_target(NULL, &_PScheduleGroup); } /// /// Constructs a single_assignment messaging block. /// /// /// The ScheduleGroup object within which the propagation task for the single_assignment messaging block is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// A filter function which determines whether offered messages should be accepted. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this single_assignment messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ single_assignment(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter) : _M_fIsInitialized(false), _M_pMessage(NULL) { initialize_source_and_target(NULL, &_PScheduleGroup); register_filter(_Filter); } #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */ /// /// Destroys the single_assignment messaging block. /// /**/ ~single_assignment() { // Remove all links remove_network_links(); // Clean up any messages left in this message block _Delete_stored_messages(); } /// /// Checks whether this single_assignment messaging block has been initialized with a value yet. /// /// /// true if the block has received a value, false otherwise. /// /**/ bool has_value() const { return (_M_pMessage != NULL); } /// /// Gets a reference to the current payload of the message being stored in the single_assignment messaging block. /// /// /// The payload of the stored message. /// /// /// This method will wait until a message arrives if no message is currently stored in the single_assignment messaging block. /// /**/ _Type const & value() { if (_M_pMessage == NULL) { receive<_Type>(this); } _CONCRT_ASSERT(_M_pMessage != NULL); return _M_pMessage->payload; } protected: /// /// Asynchronously passes a message from an ISource block to this single_assignment messaging block. /// It is invoked by the propagate method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /**/ virtual message_status propagate_message(_Inout_ message<_Type> * _PMessage, _Inout_ ISource<_Type> * _PSource) { // It is important that calls to propagate do *not* take the same lock on the // internal structure that is used by Consume and the LWT. Doing so could // result in a deadlock with the Consume call. message_status _Result = accepted; // single_assignment messaging block can be initialized only once if (_M_fIsInitialized) { return declined; } { _NR_lock _Lock(_M_propagationLock); if (_M_fIsInitialized) { _Result = declined; } else { _PMessage = _PSource->accept(_PMessage->msg_id(), this); // Set initialized flag only if we have a message if (_PMessage != NULL) { _M_fIsInitialized = true; } else { _Result = missed; } } } // // If message was accepted, set the member variables for // this block and start the asynchronous propagation task // if (_Result == accepted) { async_send(_PMessage); } return _Result; } /// /// Synchronously passes a message from an ISource block to this single_assignment messaging block. /// It is invoked by the send method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /**/ virtual message_status send_message(_Inout_ message<_Type> * _PMessage, _Inout_ ISource<_Type> * _PSource) { message_status _Result = accepted; // single_assignment messaging block can be initialized only once if (_M_fIsInitialized) { return declined; } { _NR_lock _Lock(_M_propagationLock); if (_M_fIsInitialized) { _Result = declined; } else { _PMessage = _PSource->accept(_PMessage->msg_id(), this); // Set initialized flag only if we have a message if (_PMessage != NULL) { _M_fIsInitialized = true; } else { _Result = missed; } } } // // If message was accepted, set the member variables for // this block and start the asynchronous propagation task // if (_Result == accepted) { sync_send(_PMessage); } return _Result; } /// /// Accepts a message that was offered by this single_assignment messaging block, /// returning a copy of the message to the caller. /// /// /// The runtime_object_identity of the offered message object. /// /// /// A pointer to the message object that the caller now has ownership of. /// /// /// The single_assignment messaging block returns copies of the message /// to its targets, rather than transferring ownership of the currently /// held message. /// /**/ virtual message<_Type> * accept_message(runtime_object_identity _MsgId) { // This check is to prevent spoofing and verify that the propagated message is // the one that is accepted at the end. if (_M_pMessage == NULL || _MsgId != _M_pMessage->msg_id()) { return NULL; } // // Instead of returning the internal message, we return a copy of the // message passed in. // // Because we are returning a copy, the accept routine for a single_assignment // does not need to grab the internal lock. // return (new message<_Type>(_M_pMessage->payload)); } /// /// Reserves a message previously offered by this single_assignment messaging block. /// /// /// The runtime_object_identity of the message object being reserved. /// /// /// true if the message was successfully reserved, false otherwise. /// /// /// After reserve is called, if it returns true, either consume or release must be called /// to either take or release ownership of the message. /// /**/ virtual bool reserve_message(runtime_object_identity _MsgId) { if (_M_pMessage == NULL) { return false; } if (_M_pMessage->msg_id() != _MsgId) { throw message_not_found(); } return true; } /// /// Consumes a message previously offered by the single_assignment and reserved by the target, /// returning a copy of the message to the caller. /// /// /// The runtime_object_identity of the message object being consumed. /// /// /// A pointer to the message object that the caller now has ownership of. /// /// /// Similar to accept, but is always preceded by a call to reserve. /// /**/ virtual message<_Type> * consume_message(runtime_object_identity _MsgId) { _CONCRT_ASSERT(_M_fIsInitialized); return accept_message(_MsgId); } /// /// Releases a previous message reservation. /// /// /// The runtime_object_identity of the message object being released. /// /**/ virtual void release_message(runtime_object_identity _MsgId) { _CONCRT_ASSERT(_M_fIsInitialized); if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId) { throw message_not_found(); } } /// /// Resumes propagation after a reservation has been released. /// /**/ virtual void resume_propagation() { // Because reservation doesn't stop propagation, we don't // need to do anything on resume after consume/release. } /// /// A callback that notifies that a new target has been linked to this single_assignment messaging block. /// /// /// A pointer to the newly linked target. /// /**/ virtual void link_target_notification(_Inout_ ITarget<_Type> * _PTarget) { // If there is a message available already, propagate it. if (_M_pMessage != NULL) { _PTarget->propagate(_M_pMessage, this); } } /// /// Places the message in this single_assignment messaging block and /// offers it to all of the linked targets. /// /// /// A pointer to a message that this single_assignment messaging block has taken ownership of. /// /**/ virtual void propagate_to_any_targets(_Inout_opt_ message<_Type> * _PMessage) { // Initialized flag should have been set by the propagate function using interlocked operation. _CONCRT_ASSERT(_M_fIsInitialized); // Move the message to the internal storage _CONCRT_ASSERT(_M_pMessage == NULL); _M_pMessage = _PMessage; for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter) { // Single assignment can propagate its message out // to any number of Targets ITarget<_Type> * _PTarget = *_Iter; _PTarget->propagate(_PMessage, this); } } private: /// /// Deletes all messages currently stored in this message block. Should be called /// by the destructor to ensure any messages propagated in are cleaned up. /// /**/ void _Delete_stored_messages() { // Input messages for this message block are in the base-class input buffer // All messages in that buffer are guaranteed to have moved to the output // buffer because the destructor first waits for all async sends to finish // before reaching this point // The messages for a single_assignment are deleted at the end when // single_assignment is deleted. delete _M_pMessage; } // // Private Data Members // // The message being stored message<_Type> * _M_pMessage; // The lock used to protect propagation ::Concurrency::details::_NonReentrantPPLLock _M_propagationLock; // The marker for whether the single_assignment has already been initialized volatile bool _M_fIsInitialized; private: // // Hide assignment operator and copy constructor // single_assignment const & operator=(single_assignment const &); // no assignment operator single_assignment(single_assignment const &); // no copy constructor }; //************************************************************************** // Join (single-type) //************************************************************************** /// /// The type of a join messaging block. /// /**/ enum join_type { /// /// Greedy join messaging blocks immediately accept a message upon propagation. This is more efficient, /// but has the possibility for live-lock, depending on the network configuration. /// /**/ greedy = 0, /// /// Non-greedy join messaging blocks postpone messages and try and consume them after all have arrived. /// These are guaranteed to work, but slower. /// /**/ non_greedy = 1 }; /// /// A join messaging block is a single-target, multi-source, ordered /// propagator_block which combines together messages of type from each /// of its sources. /// /// /// The payload type of the messages joined and propagated by the block. /// /// /// The kind of join block this is, either greedy or non_greedy /// /// /// For more information, see . /// /// /// /// /**/ template class join : public propagator_block>>, multi_link_registry>> { public: typedef typename std::vector<_Type> _OutputType; /// /// Constructs a join messaging block. /// /// /// The number of inputs this join block will be allowed. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this join messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ join(size_t _NumInputs) : _M_messageArray(_NumInputs), _M_savedMessageIdArray(_NumInputs) { _Initialize(_NumInputs); } /// /// Constructs a join messaging block. /// /// /// The number of inputs this join block will be allowed. /// /// /// A filter function which determines whether offered messages should be accepted. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this join messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ join(size_t _NumInputs, filter_method const& _Filter) : _M_messageArray(_NumInputs), _M_savedMessageIdArray(_NumInputs) { _Initialize(_NumInputs); register_filter(_Filter); } #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP /// /// Constructs a join messaging block. /// /// /// The Scheduler object within which the propagation task for the join messaging block is scheduled. /// /// /// The number of inputs this join block will be allowed. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this join messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ join(Scheduler& _PScheduler, size_t _NumInputs) : _M_messageArray(_NumInputs), _M_savedMessageIdArray(_NumInputs) { _Initialize(_NumInputs, &_PScheduler); } /// /// Constructs a join messaging block. /// /// /// The Scheduler object within which the propagation task for the join messaging block is scheduled. /// /// /// The number of inputs this join block will be allowed. /// /// /// A filter function which determines whether offered messages should be accepted. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this join messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ join(Scheduler& _PScheduler, size_t _NumInputs, filter_method const& _Filter) : _M_messageArray(_NumInputs), _M_savedMessageIdArray(_NumInputs) { _Initialize(_NumInputs, &_PScheduler); register_filter(_Filter); } /// /// Constructs a join messaging block. /// /// /// The ScheduleGroup object within which the propagation task for the join messaging block is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// The number of inputs this join block will be allowed. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this join messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ join(ScheduleGroup& _PScheduleGroup, size_t _NumInputs) : _M_messageArray(_NumInputs), _M_savedMessageIdArray(_NumInputs) { _Initialize(_NumInputs, NULL, &_PScheduleGroup); } /// /// Constructs a join messaging block. /// /// /// The ScheduleGroup object within which the propagation task for the join messaging block is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// The number of inputs this join block will be allowed. /// /// /// A filter function which determines whether offered messages should be accepted. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// The type is a functor with signature bool (_Type const &) /// which is invoked by this join messaging block to determine whether or not it should accept /// an offered message. /// /// /// /**/ join(ScheduleGroup& _PScheduleGroup, size_t _NumInputs, filter_method const& _Filter) : _M_messageArray(_NumInputs), _M_savedMessageIdArray(_NumInputs) { _Initialize(_NumInputs, NULL, &_PScheduleGroup); register_filter(_Filter); } #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */ /// /// Destroys the join block. /// /**/ ~join() { // Remove all links that are targets of this join remove_network_links(); // Clean up any messages left in this message block _Delete_stored_messages(); delete [] _M_savedIdBuffer; } protected: // // propagator_block protected function implementations // /// /// Asynchronously passes a message from an ISource block to this join messaging block. /// It is invoked by the propagate method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /**/ message_status propagate_message(_Inout_ message<_Type> * _PMessage, _Inout_ ISource<_Type> * _PSource) { // It is important that calls to propagate do *not* take the same lock on the // internal structure that is used by Consume and the LWT. Doing so could // result in a deadlock with the Consume call. message_status _Ret_val = accepted; // // Find the slot index of this source // size_t _Slot = 0; bool _Found = false; for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter) { if (*_Iter == _PSource) { _Found = true; break; } _Slot++; } if (!_Found) { // If this source was not found in the array, this is not a connected source // decline the message return declined; } _CONCRT_ASSERT(_Slot < _M_messageArray._M_count); bool fIsGreedy = (_Jtype == greedy); if (fIsGreedy) { // // Greedy type joins immediately accept the message. // { _NR_lock lockHolder(_M_propagationLock); if (_M_messageArray._M_messages[_Slot] != NULL) { _M_savedMessageIdArray._M_savedIds[_Slot] = _PMessage->msg_id(); _Ret_val = postponed; } } if (_Ret_val != postponed) { _M_messageArray._M_messages[_Slot] = _PSource->accept(_PMessage->msg_id(), this); if (_M_messageArray._M_messages[_Slot] != NULL) { if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0) { // If messages have arrived on all links, start a propagation // of the current message async_send(NULL); } } else { _Ret_val = missed; } } } else { // // Non-greedy type joins save the message IDs until they have all arrived // if (_InterlockedExchange((volatile long *) &_M_savedMessageIdArray._M_savedIds[_Slot], _PMessage->msg_id()) == -1) { // Decrement the message remaining count if this thread is switching // the saved ID from -1 to a valid value. if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0) { async_send(NULL); } } // Always return postponed. This message will be consumed // in the LWT _Ret_val = postponed; } return _Ret_val; } /// /// Accepts a message that was offered by this join messaging block, /// transferring ownership to the caller. /// /// /// The runtime_object_identity of the offered message object. /// /// /// A pointer to the message object that the caller now has ownership of. /// /**/ virtual message<_OutputType> * accept_message(runtime_object_identity _MsgId) { // // Peek at the head message in the message buffer. If the IDs match // dequeue and transfer ownership // message<_OutputType> * _Msg = NULL; if (_M_messageBuffer._Is_head(_MsgId)) { _Msg = _M_messageBuffer._Dequeue(); } return _Msg; } /// /// Reserves a message previously offered by this join messaging block. /// /// /// The runtime_object_identity of the offered message object. /// /// /// true if the message was successfully reserved, false otherwise. /// /// /// After reserve is called, if it returns true, either consume or release must be called /// to either take or release ownership of the message. /// /**/ virtual bool reserve_message(runtime_object_identity _MsgId) { // Allow reservation if this is the head message return _M_messageBuffer._Is_head(_MsgId); } /// /// Consumes a message previously offered by the join messaging block and reserved by the target, /// transferring ownership to the caller. /// /// /// The runtime_object_identity of the message object being consumed. /// /// /// A pointer to the message object that the caller now has ownership of. /// /// /// Similar to accept, but is always preceded by a call to reserve. /// /**/ virtual message<_OutputType> * consume_message(runtime_object_identity _MsgId) { // By default, accept the message return accept_message(_MsgId); } /// /// Releases a previous message reservation. /// /// /// The runtime_object_identity of the message object being released. /// /**/ virtual void release_message(runtime_object_identity _MsgId) { // The head message is the one reserved. if (!_M_messageBuffer._Is_head(_MsgId)) { throw message_not_found(); } } /// /// Resumes propagation after a reservation has been released. /// /**/ virtual void resume_propagation() { // If there are any messages in the buffer, propagate them out if (_M_messageBuffer._Count() > 0) { async_send(NULL); } } /// /// A callback that notifies that a new target has been linked to this join messaging block. /// /// /// A pointer to the newly linked target. /// /**/ virtual void link_target_notification(_Inout_ ITarget> *) { // If the message queue is blocked due to reservation // there is no need to do any message propagation if (_M_pReservedFor != NULL) { return; } _Propagate_priority_order(_M_messageBuffer); } /// /// Constructs an output message containing an input message from each source when /// they have all propagated a message. Sends this output message out to each of /// its targets. /// /**/ void propagate_to_any_targets(_Inout_opt_ message<_OutputType> *) { message<_OutputType> * _Msg = NULL; // Create a new message from the input sources // If messagesRemaining == 0, we have a new message to create. Otherwise, this is coming from // a consume or release from the target. In that case we don't want to create a new message. if (_M_messagesRemaining == 0) { // A greedy join can immediately create the message, a non-greedy // join must try and consume all the messages it has postponed _Msg = _Create_new_message(); } if (_Msg == NULL) { // Create message failed. This happens in non_greedy joins when the // reserve/consumption of a postponed message failed. _Propagate_priority_order(_M_messageBuffer); return; } bool fIsGreedy = (_Jtype == greedy); // For a greedy join, reset the number of messages remaining // Check to see if multiple messages have been passed in on any of the links, // and postponed. If so, try and reserve/consume them now if (fIsGreedy) { // Look at the saved IDs and reserve/consume any that have passed in while // this join was waiting to complete _CONCRT_ASSERT(_M_messageArray._M_count == _M_savedMessageIdArray._M_count); for (size_t i = 0; i < _M_messageArray._M_count; i++) { for(;;) { runtime_object_identity _Saved_id; // Grab the current saved ID value. This value could be changing from based on any // calls of source->propagate(this). If the message ID is different than what is snapped // here, that means, the reserve below must fail. This is because reserve is trying // to get the same source lock the propagate(this) call must be holding. { _NR_lock lockHolder(_M_propagationLock); _CONCRT_ASSERT(_M_messageArray._M_messages[i] != NULL); _Saved_id = _M_savedMessageIdArray._M_savedIds[i]; if (_Saved_id == -1) { _M_messageArray._M_messages[i] = NULL; break; } else { _M_savedMessageIdArray._M_savedIds[i] = -1; } } if (_Saved_id != -1) { source_iterator _Iter = _M_connectedSources.begin(); ISource<_Type> * _PSource = _Iter[i]; if ((_PSource != NULL) && _PSource->reserve(_Saved_id, this)) { _M_messageArray._M_messages[i] = _PSource->consume(_Saved_id, this); _InterlockedDecrementSizeT(&_M_messagesRemaining); break; } } } } // If messages have all been received, async_send again, this will start the // LWT up to create a new message if (_M_messagesRemaining == 0) { async_send(NULL); } } // Add the new message to the outbound queue _M_messageBuffer._Enqueue(_Msg); if (!_M_messageBuffer._Is_head(_Msg->msg_id())) { // another message is at the head of the outbound message queue and blocked // simply return return; } _Propagate_priority_order(_M_messageBuffer); } private: // // Private Methods // /// /// Propagate messages in priority order. /// /// /// Reference to a message queue with messages to be propagated /// /**/ void _Propagate_priority_order(::Concurrency::details::_Queue> & _MessageBuffer) { message<_Target_type> * _Msg = _MessageBuffer._Peek(); // If someone has reserved the _Head message, don't propagate anymore if (_M_pReservedFor != NULL) { return; } while (_Msg != NULL) { message_status _Status = declined; // Always start from the first target that linked for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter) { ITarget<_Target_type> * _PTarget = *_Iter; _Status = _PTarget->propagate(_Msg, this); // Ownership of message changed. Do not propagate this // message to any other target. if (_Status == accepted) { break; } // If the target just propagated to reserved this message, stop // propagating it to others if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. Thus, nothing after it in the queue can // be propagated out. Cease propagation. if (_Status != accepted) { break; } // Get the next message _Msg = _MessageBuffer._Peek(); } } /// /// Constructs a new message from the data output. /// /// /// The created message (NULL if creation failed) /// /**/ message> * __cdecl _Create_new_message() { bool fIsNonGreedy = (_Jtype == non_greedy); // If this is a non-greedy join, check each source and try to consume their message if (fIsNonGreedy) { // The iterator _Iter below will ensure that it is safe to touch // non-NULL source pointers. Take a snapshot. std::vector *> _Sources; source_iterator _Iter = _M_connectedSources.begin(); while (*_Iter != NULL) { ISource<_Type> * _PSource = *_Iter; if (_PSource == NULL) { break; } _Sources.push_back(_PSource); ++_Iter; } if (_Sources.size() != _M_messageArray._M_count) { // Some of the sources were unlinked. The join is broken return NULL; } // First, try and reserve all the messages. If a reservation fails, // then release any reservations that had been made. for (size_t i = 0; i < _M_savedMessageIdArray._M_count; i++) { // Snap the current saved ID into a buffer. This value can be changing behind the scenes from // other source->propagate(msg, this) calls, but if so, that just means the reserve below will // fail. _InterlockedIncrementSizeT(&_M_messagesRemaining); _M_savedIdBuffer[i] = _InterlockedExchange((volatile long *) &_M_savedMessageIdArray._M_savedIds[i], -1); _CONCRT_ASSERT(_M_savedIdBuffer[i] != -1); if (!_Sources[i]->reserve(_M_savedIdBuffer[i], this)) { // A reservation failed, release all reservations made up until // this block, and wait for another message to arrive on this link for (size_t j = 0; j < i; j++) { _Sources[j]->release(_M_savedIdBuffer[j], this); if (_InterlockedCompareExchange((volatile long *) &_M_savedMessageIdArray._M_savedIds[j], _M_savedIdBuffer[j], -1) == -1) { if (_InterlockedDecrementSizeT(&_M_messagesRemaining) == 0) { async_send(NULL); } } } // Return NULL to indicate that the create failed return NULL; } } // Because everything has been reserved, consume all the messages. // This is guaranteed to return true. for (size_t i = 0; i < _M_messageArray._M_count; i++) { _M_messageArray._M_messages[i] = _Sources[i]->consume(_M_savedIdBuffer[i], this); _M_savedIdBuffer[i] = -1; } } if (!fIsNonGreedy) { // Reinitialize how many messages are being waited for. // This is safe because all messages have been received, thus no new async_sends for // greedy joins can be called. _M_messagesRemaining = _M_messageArray._M_count; } std::vector<_Type> _OutputVector; for (size_t i = 0; i < _M_messageArray._M_count; i++) { _CONCRT_ASSERT(_M_messageArray._M_messages[i] != NULL); _OutputVector.push_back(_M_messageArray._M_messages[i]->payload); delete _M_messageArray._M_messages[i]; if (fIsNonGreedy) { _M_messageArray._M_messages[i] = NULL; } } return (new message>(_OutputVector)); } /// /// Initializes the join messaging block. /// /// /// The number of inputs. /// /// /// The scheduler onto which the task to propagate the join block's message will be scheduled. /// If unspecified, the join messaging block uses the default scheduler. /// /// /// The schedule group into which the task to propagate the join block's message will be scheduled. /// The scheduler used is implied by the schedule group. If unspecified, the join uses a schedule /// group of the scheduler's choosing. /// /**/ void _Initialize(size_t _NumInputs, Scheduler * _PScheduler = NULL, ScheduleGroup * _PScheduleGroup = NULL) { initialize_source_and_target(_PScheduler, _PScheduleGroup); _M_connectedSources.set_bound(_NumInputs); _M_messagesRemaining = _NumInputs; bool fIsNonGreedy = (_Jtype == non_greedy); if (fIsNonGreedy) { // Non greedy joins need a buffer to snap off saved message IDs to. _M_savedIdBuffer = new runtime_object_identity[_NumInputs]; memset(_M_savedIdBuffer, -1, sizeof(runtime_object_identity) * _NumInputs); } else { _M_savedIdBuffer = NULL; } } /// /// Deletes all messages currently stored in this message block. Should be called /// by the destructor to ensure any messages propagated in are cleaned up. /// /**/ void _Delete_stored_messages() { // Input messages for this message block are in the base-class input buffer // All messages in that buffer are guaranteed to have moved to the output // buffer because the destructor first waits for all async sends to finish // before reaching this point // Delete any messages remaining in the output queue for (;;) { message> * _Msg = _M_messageBuffer._Dequeue(); if (_Msg == NULL) { break; } delete _Msg; } } // The current number of messages remaining volatile size_t _M_messagesRemaining; // An array containing the accepted messages of this join. // Wrapped in a struct to enable debugger visualization. struct _MessageArray { size_t _M_count; message<_Type>** _M_messages; _MessageArray(size_t _NumInputs) : _M_count(_NumInputs), _M_messages(new message<_Type>*[_NumInputs]) { memset(_M_messages, 0, sizeof(message<_Type> *) * _NumInputs); } ~_MessageArray() { for (size_t i = 0; i < _M_count; i++) delete _M_messages[i]; delete [] _M_messages; } }; _MessageArray _M_messageArray; // An array containing the msg IDs of messages propagated to the array // For greedy joins, this contains a log of other messages passed to this // join after the first has been accepted // For non-greedy joins, this contains the message ID of any message // passed to it. // Wrapped in a struct to enable debugger visualization. struct _SavedMessageIdArray { size_t _M_count; runtime_object_identity * _M_savedIds; _SavedMessageIdArray(size_t _NumInputs) : _M_count(_NumInputs), _M_savedIds(new runtime_object_identity[_NumInputs]) { memset(_M_savedIds, -1, sizeof(runtime_object_identity) * _NumInputs); } ~_SavedMessageIdArray() { delete [] _M_savedIds; } }; _SavedMessageIdArray _M_savedMessageIdArray; // Buffer for snapping saved IDs in non-greedy joins runtime_object_identity * _M_savedIdBuffer; // A lock for modifying the buffer or the connected blocks ::Concurrency::details::_NonReentrantPPLLock _M_propagationLock; // Queue to hold output messages ::Concurrency::details::_Queue>> _M_messageBuffer; }; //************************************************************************** // Multi-Type Choice and Join helper node: //************************************************************************** /// /// Base class for Helper node used in multi-type join and choice blocks /// Order node is a single-target, single-source ordered propagator block /// The main property of an order node is that it accepts a message of _Type /// and outputs a message of int, with some unique assigned index number. /// /// /// The payload type /// /**/ template class _Order_node_base: public propagator_block>, multi_link_registry>> { public: /// /// Constructs a _Order_node_base within the default scheduler, and places it on any schedule /// group of the scheduler's choosing. /// /**/ _Order_node_base() : _M_index(0), _M_pReceiveMessage(NULL), _M_pSendMessage(NULL) { } /// /// Cleans up any resources that may have been created by the _Order_node. /// /**/ ~_Order_node_base() { // The messages for an _Order_node_base are deleted at the end when // _Order_node_base is deleted. delete _M_pReceiveMessage; delete _M_pSendMessage; } /// /// Checks whether this block has been initialized yet. /// /// /// true, if the block has received a value, false otherwise. /// /**/ bool has_value() const { return (_M_pReceiveMessage != NULL); } /// /// Gets a reference to the current payload of the message being stored. /// /// /// The incoming payload. /// /**/ _Type const & value() { _CONCRT_ASSERT(_M_pReceiveMessage != NULL); return _M_pReceiveMessage->payload; } /// /// Resets the _Order_node_base and prepares it for the next propagation /// /// /// _Reset is called from Populate_destination_tuple through propagate_to_any_targets() /// thus, it always has the internal lock held. This is only used for _Greedy_node and /// _Non_greedy_node. /// /**/ virtual void _Reset() = 0; /// /// Reserves a message previously offered by the source. /// /// /// The runtime object identity of the message. /// /// /// A bool indicating whether the reservation worked or not /// /// /// After 'reserve' is called, either 'consume' or 'release' must be called. /// /**/ virtual bool reserve_message(runtime_object_identity) { // reserve should never be called for this block. _CONCRT_ASSERT(false); return false; } /// /// Consumes a message previously offered by the source and reserved by the target, /// transferring ownership to the caller. /// /// /// The runtime object identity of the message. /// /// /// A pointer to the message that the caller now has ownership of. /// /// /// Similar to 'accept', but is always preceded by a call to 'reserve' /// /**/ virtual message * consume_message(runtime_object_identity) { // consume should never be called for this block. _CONCRT_ASSERT(false); return NULL; } /// /// Releases a previous message reservation. /// /// /// The runtime object identity of the message. /// /**/ virtual void release_message(runtime_object_identity) { // release should never be called for this block. _CONCRT_ASSERT(false); } protected: /// /// Resumes propagation after a reservation has been released /// /**/ virtual void resume_propagation() { // Because there is only a single target, nothing needs // to be done on resume } /// /// Notification that a target was linked to this source. /// /// /// A pointer to the newly linked target. /// /**/ virtual void link_target_notification(_Inout_ ITarget *) { if (_M_pSendMessage != NULL) { propagate_to_any_targets(NULL); } } /// /// Create a message that contains an index used to determine the source message /// /**/ void _Create_send_message() { _M_pSendMessage = new message(_M_index); } /// /// Validate constructor arguments and fully connect this _Order_node_base. /// /**/ void _Initialize_order_node(ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget, Scheduler * _PScheduler = NULL, ScheduleGroup * _PScheduleGroup = NULL) { if (_Index < 0) { throw std::invalid_argument("_Index"); } if (_PSource == NULL) { throw std::invalid_argument("_PSource"); } _M_index = _Index; initialize_source_and_target(_PScheduler, _PScheduleGroup); // Allow only a single source and ensure that they // cannot be unlinked and relinked. _M_connectedSources.set_bound(1); if (_PTarget != NULL) { link_target(_PTarget); } _PSource->link_target(this); } // // Private Data Members // // The message to be received from the source message<_Type> * _M_pReceiveMessage; // The message to be sent to all targets message * _M_pSendMessage; // The index of the _Order_node_base in the user's construct size_t _M_index; private: // // Hide assignment operator and copy constructor // _Order_node_base const & operator=(_Order_node_base const &); // no assignment operator _Order_node_base(_Order_node_base const &); // no copy constructor }; /// /// Helper class used in multi-type choice blocks /// Ordered node is a single-target, single-source ordered propagator block /// /// /// /// The payload type /// /**/ template class _Reserving_node: public _Order_node_base<_Type> { public: /// /// Constructs a _Reserving_node within the default scheduler, and places it on any schedule /// group of the scheduler's choosing. /// /// /// The source of data passed into the node /// /// /// The node's index, assigned from the outside. /// /// /// The target to which the node will signal about having received its input data /// /**/ _Reserving_node(ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget = NULL) : _M_fIsInitialized(false), _M_savedId(-1), _M_pReservedSource(NULL) { _Initialize_order_node(_PSource, _Index, _PTarget); } /// /// Constructs a _Reserving_node within the default scheduler, and places it on any schedule /// group of the scheduler's choosing. /// /// /// The source of data passed into the node /// /// /// The node's index, assigned from the outside. /// /// /// The target to which the node will signal about having received its input data /// /// /// A reference to a filter function. /// /**/ _Reserving_node(ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget, filter_method const& _Filter) : _M_fIsInitialized(false), _M_savedId(-1), _M_pReservedSource(NULL) { register_filter(_Filter); _Initialize_order_node(_PSource, _Index, _PTarget); } /// /// Constructs a _Reserving_node within the specified scheduler, and places it on any schedule /// group of the scheduler's choosing. /// /// /// A reference to a scheduler instance. /// /// /// The source of data passed into the node /// /// /// The node's index, assigned from the outside. /// /// /// The target to which the node will signal about having received its input data /// /**/ _Reserving_node(Scheduler& _PScheduler, ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget = NULL) : _M_fIsInitialized(false), _M_savedId(-1), _M_pReservedSource(NULL) { _Initialize_order_node(_PSource, _Index, _PTarget, &_PScheduler); } /// /// Constructs a _Reserving_node within the specified scheduler, and places it on any schedule /// group of the scheduler's choosing. /// /// /// A reference to a scheduler instance. /// /// /// The source of data passed into the node /// /// /// The node's index, assigned from the outside. /// /// /// The target to which the node will signal about having received its input data /// /// /// A reference to a filter function. /// /**/ _Reserving_node(Scheduler& _PScheduler, ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget, filter_method const& _Filter) : _M_fIsInitialized(false), _M_savedId(-1), _M_pReservedSource(NULL) { register_filter(_Filter); _Initialize_order_node(_PSource, _Index, _PTarget, &_PScheduler); } /// /// Constructs a _Order_node within the specified schedule group. The scheduler is implied /// by the schedule group. /// /// /// A reference to a schedule group. /// /// /// The source of data passed into the node /// /// /// The node's index, assigned from the outside. /// /// /// The target to which the node will signal about having received its input data /// /**/ _Reserving_node(ScheduleGroup& _PScheduleGroup, ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget = NULL) : _M_fIsInitialized(false), _M_savedId(-1), _M_pReservedSource(NULL) { _Initialize_order_node(_PSource, _Index, _PTarget, NULL, &_PScheduleGroup); } /// /// Constructs a _Order_node within the specified schedule group. The scheduler is implied /// by the schedule group. /// /// /// A reference to a schedule group. /// /// /// The source of data passed into the node /// /// /// The node's index, assigned from the outside. /// /// /// The target to which the node will signal about having received its input data /// /// /// A reference to a filter function. /// /**/ _Reserving_node(ScheduleGroup& _PScheduleGroup, ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget, filter_method const& _Filter) : _M_fIsInitialized(false), _M_savedId(-1), _M_pReservedSource(NULL) { register_filter(_Filter); _Initialize_order_node(_PSource, _Index, _PTarget, NULL, &_PScheduleGroup); } /// /// Cleans up any resources that may have been created by the _Reserving_node. /// /**/ ~_Reserving_node() { if (_M_pReservedSource != NULL) { _M_pReservedSource = NULL; _M_connectedSources.release(); } // Remove all links remove_network_links(); } /// /// Resets the _Reserving_node and prepares it for the next propagation /// /// /// This function is not used in a _Reserving_node, which is only used for choice blocks /// /**/ virtual void _Reset() { } protected: // // propagator_block protected function implementation // /// /// Asynchronously passes a message from an ISource block to this ITarget block. It is invoked /// by the propagate method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /// /// It is important that calls to propagate do *not* take the same lock on the /// internal structure that is used by Consume and the light-weight task. Doing so could /// result in a deadlock with the Consume call. /// /**/ virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource) { message_status _Result = postponed; // _Order_node messaging block can be initialized only once, just like single_assignment. if (_M_fIsInitialized) { return declined; } // Reserve a message on the source until this _Order_node gets the feedback from // the single_assignment on whether it has been selected. _M_fIsInitialized = _PSource->reserve(_PMessage->msg_id(), this); // // If message was successfully reserved, set the member variables for // this messaging block and start the asynchronous propagation task. // if (_M_fIsInitialized) { _M_savedId = _PMessage->msg_id(); async_send(NULL); } else { _Result = missed; } return _Result; } /// /// Accept the message by making a copy of the payload. /// /// /// The runtime object identity of the message. /// /// /// A pointer to the message that the caller now has ownership of. /// /**/ virtual message * accept_message(runtime_object_identity _MsgId) { // This check is to prevent spoofing and verify that the propagated message is // the one that is accepted at the end. if (_M_pSendMessage == NULL || _MsgId != _M_pSendMessage->msg_id()) { return NULL; } // If the source has disconnected then we can't allow for accept to succeed. source_iterator _Iter = _M_connectedSources.begin(); ISource<_Type>* _PSource = *_Iter; if (_PSource == NULL) { // source was disconnected. Fail accept. return NULL; } _M_pReceiveMessage = _PSource->consume(_M_savedId, this); _CONCRT_ASSERT(_M_pReceiveMessage != NULL); // // Instead of returning the internal message, we return a copy of the // message passed in. // // Because we are returning a copy, the accept routine for a _Order_node // does not need to grab the internal lock. // return (new message(_M_pSendMessage->payload)); } /// /// Takes the message and propagates it to all the targets of this _Order_node /// /// /// A pointer to a new message. /// /// /// This function packages its _M_index into a message and immediately sends it to the targets. /// /**/ virtual void propagate_to_any_targets(_Inout_opt_ message *) { if (_M_pSendMessage == NULL) { _Create_send_message(); } for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter) { ITarget * _PTarget = *_Iter; _Propagate_to_target(_PTarget); } } private: /// /// Propagate messages to the given target /// /**/ message_status _Propagate_to_target(ITarget * _PTarget) { message_status _Status = _PTarget->propagate(_M_pSendMessage, this); // If the message got rejected we have to release the hold on the source message. if (_Status != accepted) { if (_M_savedId != -1) { // Release the reservation source_iterator _Iter = _M_connectedSources.begin(); ISource<_Type> * _PSource = *_Iter; if (_PSource != NULL) { _PSource->release(_M_savedId, this); } // If the source was disconnected, then it would // automatically release any reservation. So we // should reset our savedId regardless. _M_savedId = -1; } } return _Status; } // // Private Data Members // // The source where we have reserved a message ISource<_Type> * _M_pReservedSource; // For greedy order-nodes, the message ID of subsequent messages sent to this node // For non-greedy order nodes, the message ID of the message to reserve/consume runtime_object_identity _M_savedId; // The marker that indicates that _Reserving_node has reserved a message volatile bool _M_fIsInitialized; private: // // Hide assignment operator and copy constructor // _Reserving_node const & operator=(_Reserving_node const &); // no assignment operator _Reserving_node(_Reserving_node const &); // no copy constructor }; /// /// Helper class used in multi-type greedy join blocks /// Ordered node is a single-target, single-source ordered propagator block /// /// /// /// The payload type /// /**/ template class _Greedy_node: public _Order_node_base<_Type> { public: /// /// Constructs a _Greedy_node within the default scheduler, and places it on any schedule /// group of the scheduler's choosing. /// /// /// The source of data passed into the node /// /// /// The node's index, assigned from the outside. /// /// /// The target to which the node will signal about having received its input data /// /**/ _Greedy_node(ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget = NULL) : _M_savedId(-1), _M_pGreedyMessage(NULL) { _Initialize_order_node(_PSource, _Index, _PTarget); } /// /// Constructs a _Greedy_node within the default scheduler, and places it on any schedule /// group of the scheduler's choosing. /// /// /// The source of data passed into the node /// /// /// The node's index, assigned from the outside. /// /// /// The target to which the node will signal about having received its input data /// /// /// A reference to a filter function. /// /**/ _Greedy_node(ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget, filter_method const& _Filter) : _M_savedId(-1), _M_pGreedyMessage(NULL) { register_filter(_Filter); _Initialize_order_node(_PSource, _Index, _PTarget); } /// /// Constructs a _Greedy_node within the specified scheduler, and places it on any schedule /// group of the scheduler's choosing. /// /// /// A reference to a scheduler instance. /// /// /// The source of data passed into the node /// /// /// The node's index, assigned from the outside. /// /// /// The target to which the node will signal about having received its input data /// /**/ _Greedy_node(Scheduler& _PScheduler, ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget = NULL) : _M_savedId(-1), _M_pGreedyMessage(NULL) { _Initialize_order_node(_PSource, _Index, _PTarget, &_PScheduler); } /// /// Constructs a _Greedy_node within the specified scheduler, and places it on any schedule /// group of the scheduler's choosing. /// /// /// A reference to a scheduler instance. /// /// /// The source of data passed into the node /// /// /// The node's index, assigned from the outside. /// /// /// The target to which the node will signal about having received its input data /// /// /// A reference to a filter function. /// /**/ _Greedy_node(Scheduler& _PScheduler, ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget, filter_method const& _Filter) : _M_savedId(-1), _M_pGreedyMessage(NULL) { register_filter(_Filter); _Initialize_order_node(_PSource, _Index, _PTarget, &_PScheduler); } /// /// Constructs a _Greedy_node within the specified schedule group. The scheduler is implied /// by the schedule group. /// /// /// A reference to a schedule group. /// /// /// The source of data passed into the node /// /// /// The node's index, assigned from the outside. /// /// /// The target to which the node will signal about having received its input data /// /**/ _Greedy_node(ScheduleGroup& _PScheduleGroup, ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget = NULL) : _M_savedId(-1), _M_pGreedyMessage(NULL) { _Initialize_order_node(_PSource, _Index, _PTarget, NULL, &_PScheduleGroup); } /// /// Constructs a _Greedy_node within the specified schedule group. The scheduler is implied /// by the schedule group. /// /// /// A reference to a schedule group. /// /// /// The source of data passed into the node /// /// /// The node's index, assigned from the outside. /// /// /// The target to which the node will signal about having received its input data /// /// /// A reference to a filter function. /// /**/ _Greedy_node(ScheduleGroup& _PScheduleGroup, ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget, filter_method const& _Filter) : _M_savedId(-1), _M_pGreedyMessage(NULL) { register_filter(_Filter); _Initialize_order_node(_PSource, _Index, _PTarget, NULL, &_PScheduleGroup); } /// /// Cleans up any resources that may have been created by the _Greedy_node. /// /**/ ~_Greedy_node() { // Remove all links remove_network_links(); if (_M_pGreedyMessage != _M_pReceiveMessage) { delete _M_pGreedyMessage; } } /// /// Resets the _Greedy_node and prepares it for the next propagation /// /// /// _Reset is called from Populate_destination_tuple through propagate_to_any_targets() /// thus, it always has the internal lock held. /// /**/ void _Reset() { _R_lock _Lock(_M_resetLock); delete _M_pReceiveMessage; _M_pReceiveMessage = NULL; delete _M_pSendMessage; _M_pSendMessage = NULL; // // For greedy type joins, look to see if any other messages have been // passed to this _Greedy_node while the join was waiting for other // messages to arrive. This function is already called with _M_resetLock // held through propagate_to_any_targets(). // for(;;) { // Set the current saved ID as -1. Check to see if something was ready for consumption // (if _Saved_id != -1) and consume it if possible. runtime_object_identity _Saved_id; { _NR_lock lockHolder(_M_propagationLock); _Saved_id = _M_savedId; if (_Saved_id == -1) { _M_pGreedyMessage = NULL; break; } else { _M_savedId = -1; } } if (_Saved_id != -1) { source_iterator _Iter = _M_connectedSources.begin(); ISource<_Type> * _PSource = *_Iter; if ((_PSource != NULL) && _PSource->reserve(_Saved_id, this)) { _M_pGreedyMessage = _PSource->consume(_Saved_id, this); async_send(NULL); break; } } } } protected: // // propagator_block protected function implementation // /// /// Asynchronously passes a message from an ISource block to this ITarget block. It is invoked /// by the propagate method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /// /// It is important that calls to propagate do *not* take the same lock on the /// internal structure that is used by Consume and the light-weight task. Doing so could /// result in a deadlock with the Consume call. /// /**/ virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource) { message_status _Result = postponed; bool _FDone = false; { _NR_lock lockHolder(_M_propagationLock); if (_M_pGreedyMessage != NULL) { _M_savedId = _PMessage->msg_id(); _Result = postponed; _FDone = true; } } if (!_FDone) { _M_pGreedyMessage = _PSource->accept(_PMessage->msg_id(), this); if (_M_pGreedyMessage != NULL) { _Result = accepted; async_send(NULL); } else { _Result = missed; } } return _Result; } /// /// Accept the message by making a copy of the payload. /// /// /// The runtime object identity of the message. /// /// /// A pointer to the message that the caller now has ownership of. /// /**/ virtual message * accept_message(runtime_object_identity _MsgId) { // This check is to prevent spoofing and verify that the propagated message is // the one that is accepted at the end. if (_M_pSendMessage == NULL || _MsgId != _M_pSendMessage->msg_id()) { return NULL; } // // Instead of returning the internal message, we return a copy of the // message passed in. // // Because we are returning a copy, the accept routine for a _Greedy_node // does not need to grab the internal lock. // return (new message(_M_pSendMessage->payload)); } /// /// Takes the message and propagates it to all the targets of this _Greedy_node /// /// /// A pointer to a new message. /// /// /// This function packages its _M_index into a message and immediately sends it to the targets. /// /**/ virtual void propagate_to_any_targets(_Inout_opt_ message *) { _R_lock _Lock(_M_resetLock); if (_M_pSendMessage == NULL) { // Save the incoming message so that it can be consumed in the accept function _M_pReceiveMessage = _M_pGreedyMessage; _Create_send_message(); } for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter) { ITarget * _PTarget = *_Iter; _PTarget->propagate(_M_pSendMessage, this); } } private: // // Private Data Members // // The message to be saved by a greedy order node message<_Type> * _M_pGreedyMessage; // The lock used to protect propagation ::Concurrency::details::_NonReentrantPPLLock _M_propagationLock; // The lock used to protect modification during a reset ::Concurrency::details::_ReentrantPPLLock _M_resetLock; // For greedy order-nodes, the message ID of subsequent messages sent to this node // For non-greedy order nodes, the message ID of the message to reserve/consume runtime_object_identity _M_savedId; private: // // Hide assignment operator and copy constructor // _Greedy_node const & operator=(_Greedy_node const &); // no assignment operator _Greedy_node(_Greedy_node const &); // no copy constructor }; /// /// Helper class used in multi-type non-greedy join blocks /// Ordered node is a single-target, single-source ordered propagator block /// /// /// /// The payload type /// /**/ template class _Non_greedy_node: public _Order_node_base<_Type> { public: /// /// Constructs a _Non_greedy_node within the default scheduler, and places it on any schedule /// group of the scheduler's choosing. /// /// /// The source of data passed into the node /// /// /// The node's index, assigned from the outside. /// /// /// The target to which the node will signal about having received its input data /// /**/ _Non_greedy_node(ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget = NULL) : _M_savedId(-1), _M_reservedId(-1), _M_pReservedSource(NULL) { _Initialize_order_node(_PSource, _Index, _PTarget); } /// /// Constructs a _Non_greedy_node within the default scheduler, and places it on any schedule /// group of the scheduler's choosing. /// /// /// The source of data passed into the node /// /// /// The node's index, assigned from the outside. /// /// /// The target to which the node will signal about having received its input data /// /// /// A reference to a filter function. /// /**/ _Non_greedy_node(ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget, filter_method const& _Filter) : _M_savedId(-1), _M_reservedId(-1), _M_pReservedSource(NULL) { register_filter(_Filter); _Initialize_order_node(_PSource, _Index, _PTarget); } /// /// Constructs a _Non_greedy_node within the specified scheduler, and places it on any schedule /// group of the scheduler's choosing. /// /// /// A reference to a scheduler instance. /// /// /// The source of data passed into the node /// /// /// The node's index, assigned from the outside. /// /// /// The target to which the node will signal about having received its input data /// /**/ _Non_greedy_node(Scheduler& _PScheduler, ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget = NULL) : _M_savedId(-1), _M_reservedId(-1), _M_pReservedSource(NULL) { _Initialize_order_node(_PSource, _Index, _PTarget, &_PScheduler); } /// /// Constructs a _Non_greedy_node within the specified scheduler, and places it on any schedule /// group of the scheduler's choosing. /// /// /// A reference to a scheduler instance. /// /// /// The source of data passed into the node /// /// /// The node's index, assigned from the outside. /// /// /// The target to which the node will signal about having received its input data /// /// /// A reference to a filter function. /// /**/ _Non_greedy_node(Scheduler& _PScheduler, ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget, filter_method const& _Filter) : _M_savedId(-1), _M_reservedId(-1), _M_pReservedSource(NULL) { register_filter(_Filter); _Initialize_order_node(_PSource, _Index, _PTarget, &_PScheduler); } /// /// Constructs a _Non_greedy_node within the specified schedule group. The scheduler is implied /// by the schedule group. /// /// /// A reference to a schedule group. /// /// /// The source of data passed into the node /// /// /// The node's index, assigned from the outside. /// /// /// The target to which the node will signal about having received its input data /// /**/ _Non_greedy_node(ScheduleGroup& _PScheduleGroup, ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget = NULL) : _M_savedId(-1), _M_reservedId(-1), _M_pReservedSource(NULL) { _Initialize_order_node(_PSource, _Index, _PTarget, NULL, &_PScheduleGroup); } /// /// Constructs a _Non_greedy_node within the specified schedule group. The scheduler is implied /// by the schedule group. /// /// /// A reference to a schedule group. /// /// /// The source of data passed into the node /// /// /// The node's index, assigned from the outside. /// /// /// The target to which the node will signal about having received its input data /// /// /// A reference to a filter function. /// /**/ _Non_greedy_node(ScheduleGroup& _PScheduleGroup, ISource<_Type> * _PSource, size_t _Index, ITarget * _PTarget, filter_method const& _Filter) : _M_savedId(-1), _M_reservedId(-1), _M_pReservedSource(NULL) { register_filter(_Filter); _Initialize_order_node(_PSource, _Index, _PTarget, NULL, &_PScheduleGroup); } /// /// Cleans up any resources that may have been created by the _Order_node. /// /**/ ~_Non_greedy_node() { if (_M_pReservedSource != NULL) { _M_pReservedSource = NULL; _M_connectedSources.release(); } // Remove all links remove_network_links(); } /// /// Resets the _Order_node and prepares it for the next propagation /// /// /// _Reset is called from Populate_destination_tuple through propagate_to_any_targets() /// thus, it always has the internal lock held. /// /**/ void _Reset() { _R_lock _Lock(_M_resetLock); delete _M_pReceiveMessage; _M_pReceiveMessage = NULL; delete _M_pSendMessage; _M_pSendMessage = NULL; } /// /// Called for a non_greedy type join block in order to reserve the message /// in this join block /// /// /// A bool indicating whether the reservation worked /// /**/ bool _Reserve_received_message() { bool _Ret_val = false; // Order node has only a single source. // Obtain an iterator to the first source. It will guarantee that the reference // count on the source is maintained source_iterator _Iter = _M_connectedSources.begin(); ISource<_Type> * _PSource = *_Iter; if (_PSource != NULL) { // CAS out the current saved ID, in order to try and reserve it runtime_object_identity _SavedId = _InterlockedExchange((volatile long *) &_M_savedId, -1); _Ret_val = _PSource->reserve(_SavedId, this); // // If this reserved failed, that means we need to wait for another message // to come in on this link. _M_savedID was set to -1 to indicate to the _Order_node // that it needs to async_send when that next message comes through // // If the reserve succeeds, save away the reserved ID. This will be use later in // consume // if (_Ret_val) { _M_reservedId = _SavedId; // Acquire a reference on the source _M_connectedSources.reference(); _M_pReservedSource = _PSource; } } return _Ret_val; } /// /// Called for a non_greedy type join block in order to consume the message /// in this join block that has been reserved /// /**/ void _Consume_received_message() { if (_M_pReservedSource != NULL) { runtime_object_identity _SavedId = _M_reservedId; _M_pReceiveMessage = _M_pReservedSource->consume(_SavedId, this); runtime_object_identity _OldId = NULL; _OldId = _InterlockedExchange((volatile long *) &_M_reservedId, -1); _CONCRT_ASSERT(_OldId == _SavedId); // Release the reference on the source _M_pReservedSource = NULL; _M_connectedSources.release(); } } /// /// Called for a non_greedy type join block release a reservation on this block /// /**/ bool _Release_received_message() { bool retVal = false; if (_M_pReservedSource != NULL) { runtime_object_identity _SavedId = _M_reservedId; // If the _M_savedId is still -1, then swap the succeeded one back _M_pReservedSource->release(_SavedId, this); if (_InterlockedCompareExchange((volatile long *) &_M_savedId, _SavedId, -1) == -1) { retVal = true; } // Release the reference on the source _M_pReservedSource = NULL; _M_connectedSources.release(); } return retVal; } protected: // // propagator_block protected function implementation // /// /// Asynchronously passes a message from an ISource block to this ITarget block. It is invoked /// by the propagate method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /// /// It is important that calls to propagate do *not* take the same lock on the /// internal structure that is used by Consume and the light-weight task. Doing so could /// result in a deadlock with the Consume call. /// /**/ virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> *) { // Change the message ID. If it was -1, that means an async-send needs to occur if (_InterlockedExchange((volatile long *) &_M_savedId, _PMessage->msg_id()) == -1) { async_send(NULL); } // Always return postponed. This message will be consumed // in the LWT return postponed; } /// /// Accept the message by making a copy of the payload. /// /// /// The runtime object identity of the message. /// /// /// A pointer to the message that the caller now has ownership of. /// /**/ virtual message * accept_message(runtime_object_identity _MsgId) { // This check is to prevent spoofing and verify that the propagated message is // the one that is accepted at the end. if (_M_pSendMessage == NULL || _MsgId != _M_pSendMessage->msg_id()) { return NULL; } // // Instead of returning the internal message, we return a copy of the // message passed in. // // Because we are returning a copy, the accept routine for a _Non_greedy_node // does not need to grab the internal lock. // return (new message(_M_pSendMessage->payload)); } /// /// Takes the message and propagates it to all the targets of this _Order_node /// /// /// A pointer to a new message. /// /// /// This function packages its _M_index into a message and immediately sends it to the targets. /// /**/ virtual void propagate_to_any_targets(_Inout_opt_ message *) { _R_lock _Lock(_M_resetLock); if (_M_pSendMessage == NULL) { _Create_send_message(); } for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter) { ITarget * _PTarget = *_Iter; _PTarget->propagate(_M_pSendMessage, this); } } private: // // Private Data Members // // The source where we have reserved a message ISource<_Type> * _M_pReservedSource; // The lock used to protect modification during a reset ::Concurrency::details::_ReentrantPPLLock _M_resetLock; // For non-greedy order nodes, the message ID of the message to reserve/consume runtime_object_identity _M_savedId; // For non-greedy order nodes, the reserved ID of the message that was reserved runtime_object_identity _M_reservedId; // The marker that indicates that _Non_greedy_node has reserved a message volatile bool _M_fIsInitialized; private: // // Hide assignment operator and copy constructor // _Non_greedy_node const & operator=(_Non_greedy_node const &); // no assignment operator _Non_greedy_node(_Non_greedy_node const &); // no copy constructor }; //************************************************************************** // Choice: //************************************************************************** /// /// A choice messaging block is a multi-source, single-target block that represents a control-flow /// interaction with a set of sources. The choice block will wait for any one of multiple sources to /// produce a message and will propagate the index of the source that produced the message. /// /// /// A tuple-based type representing the payloads of the input sources. /// /// /// The choice block ensures that only one of the incoming messages is consumed. /// For more information, see . /// /// /// /// /// /**/ template class choice: public ISource { public: /// /// Constructs a choice messaging block. /// /// /// A tuple of sources for the choice. /// /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// /// /// Move construction is not performed under a lock, which means that it is up to the user /// to make sure that there are no light-weight tasks in flight at the time of moving. /// Otherwise, numerous races can occur, leading to exceptions or inconsistent state. /// /// /// /// /**/ explicit choice(_Type _Tuple) : _M_sourceTuple(_Tuple), _M_pScheduler(NULL), _M_pScheduleGroup(NULL) { _M_pSingleAssignment = new single_assignment(); _Initialize_choices<0>(); } #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP /// /// Constructs a choice messaging block. /// /// /// The Scheduler object within which the propagation task for the choice messaging block is scheduled. /// /// /// A tuple of sources for the choice. /// /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// /// /// Move construction is not performed under a lock, which means that it is up to the user /// to make sure that there are no light-weight tasks in flight at the time of moving. /// Otherwise, numerous races can occur, leading to exceptions or inconsistent state. /// /// /// /// /**/ choice(Scheduler& _PScheduler, _Type _Tuple) : _M_sourceTuple(_Tuple), _M_pScheduler(&_PScheduler), _M_pScheduleGroup(NULL) { _M_pSingleAssignment = new single_assignment(_PScheduler); _Initialize_choices<0>(); } /// /// Constructs a choice messaging block. /// /// /// The ScheduleGroup object within which the propagation task for the choice messaging block is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// A tuple of sources for the choice. /// /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// /// /// Move construction is not performed under a lock, which means that it is up to the user /// to make sure that there are no light-weight tasks in flight at the time of moving. /// Otherwise, numerous races can occur, leading to exceptions or inconsistent state. /// /// /// /// /**/ choice(ScheduleGroup& _PScheduleGroup, _Type _Tuple) : _M_sourceTuple(_Tuple), _M_pScheduler(NULL), _M_pScheduleGroup(&_PScheduleGroup) { _M_pSingleAssignment = new single_assignment(_PScheduleGroup); _Initialize_choices<0>(); } #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */ /// /// Constructs a choice messaging block. /// /// /// A choice messaging block to copy from. /// Note that the original object is orphaned, making this a move constructor. /// /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// /// /// Move construction is not performed under a lock, which means that it is up to the user /// to make sure that there are no light-weight tasks in flight at the time of moving. /// Otherwise, numerous races can occur, leading to exceptions or inconsistent state. /// /// /// /// /**/ choice(choice && _Choice) { // Copy scheduler group or scheduler to the new object. _M_pScheduleGroup = _Choice._M_pScheduleGroup; _M_pScheduler = _Choice._M_pScheduler; // Single assignment is heap allocated, so simply copy the pointer. If it already has // a value, it will be preserved. _M_pSingleAssignment = _Choice._M_pSingleAssignment; _Choice._M_pSingleAssignment = NULL; // Invoke copy assignment for tuple to copy pointers to message blocks. _M_sourceTuple = _Choice._M_sourceTuple; // Copy the pointers to order nodes to a new object and zero out in the old object. memcpy(_M_pSourceChoices, _Choice._M_pSourceChoices, sizeof(_M_pSourceChoices)); memset(_Choice._M_pSourceChoices, 0, sizeof(_M_pSourceChoices)); } /// /// Destroys the choice messaging block. /// /**/ ~choice() { delete _M_pSingleAssignment; _Delete_choices<0>(); } /// /// A type alias for . /// /**/ typedef typename _Type type; /// /// Checks whether this choice messaging block has been initialized with a value yet. /// /// /// true if the block has received a value, false otherwise. /// /**/ bool has_value() const { return _M_pSingleAssignment->has_value(); } /// /// Returns an index into the tuple representing the element selected by the /// choice messaging block. /// /// /// The message index. /// /// /// The message payload can be extracted using the get method. /// /**/ size_t index() { return _M_pSingleAssignment->value(); } /// /// Gets the message whose index was selected by the choice messaging block. /// /// /// The type of the message payload. /// /// /// The payload of the message. /// /// /// Because a choice messaging block can take inputs with different payload types, you must specify /// the type of the payload at the point of retrieval. You can determine the type based on the result of /// the index method. /// /**/ template _Payload_type const & value() { return reinterpret_cast<_Reserving_node<_Payload_type> *>(_M_pSourceChoices[_M_pSingleAssignment->value()])->value(); } // // ISource public function implementations // /// /// Links a target block to this choice messaging block. /// /// /// A pointer to an ITarget block to link to this choice messaging block. /// /**/ virtual void link_target(_Inout_ ITarget * _PTarget) { _M_pSingleAssignment->link_target(_PTarget); } /// /// Unlinks a target block from this choice messaging block. /// /// /// A pointer to an ITarget block to unlink from this choice messaging block. /// /**/ virtual void unlink_target(_Inout_ ITarget * _PTarget) { _M_pSingleAssignment->unlink_target(_PTarget); } /// /// Unlinks all targets from this choice messaging block. /// /// /// This method does not need to be called from the destructor because destructor for the internal /// single_assignment block will unlink properly. /// /**/ virtual void unlink_targets() { _M_pSingleAssignment->unlink_targets(); } /// /// Accepts a message that was offered by this choice block, transferring ownership to the caller. /// /// /// The runtime_object_identity of the offered message object. /// /// /// A pointer to the target block that is calling the accept method. /// /// /// A pointer to the message that the caller now has ownership of. /// /**/ virtual message * accept(runtime_object_identity _MsgId, _Inout_ ITarget * _PTarget) { return _M_pSingleAssignment->accept(_MsgId, _PTarget); } /// /// Reserves a message previously offered by this choice messaging block. /// /// /// The runtime_object_identity of the message object being reserved. /// /// /// A pointer to the target block that is calling the reserve method. /// /// /// true if the message was successfully reserved, false otherwise. Reservations can fail /// for many reasons, including: the message was already reserved or accepted by another target, the source could /// deny reservations, and so forth. /// /// /// After you call reserve, if it succeeds, you must call either consume or release /// in order to take or give up possession of the message, respectively. /// /**/ virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget * _PTarget) { return _M_pSingleAssignment->reserve(_MsgId, _PTarget); } /// /// Consumes a message previously offered by this choice messaging block and successfully reserved by the target, /// transferring ownership to the caller. /// /// /// The runtime_object_identity of the reserved message object. /// /// /// A pointer to the target block that is calling the consume method. /// /// /// A pointer to the message object that the caller now has ownership of. /// /// /// The consume method is similar to accept, but must always be preceded by a call to reserve that /// returned true. /// /**/ virtual message * consume(runtime_object_identity _MsgId, _Inout_ ITarget * _PTarget) { return _M_pSingleAssignment->consume(_MsgId, _PTarget); } /// /// Releases a previous successful message reservation. /// /// /// The runtime_object_identity of the message object being released. /// /// /// A pointer to the target block that is calling the release method. /// /**/ virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget * _PTarget) { _M_pSingleAssignment->release(_MsgId, _PTarget); } /// /// Acquires a reference count on this choice messaging block, to prevent deletion. /// /// /// A pointer to the target block that is calling this method. /// /// /// This method is called by an ITarget object that is being linked to this source /// during the link_target method. /// /**/ virtual void acquire_ref(_Inout_ ITarget * _PTarget) { _M_pSingleAssignment->acquire_ref(_PTarget); } /// /// Releases a reference count on this choice messaging block. /// /// /// A pointer to the target block that is calling this method. /// /// /// This method is called by an ITarget object that is being unlinked from this source. /// The source block is allowed to release any resources reserved for the target block. /// /**/ virtual void release_ref(_Inout_ ITarget * _PTarget) { _M_pSingleAssignment->release_ref(_PTarget); } private: /// /// Constructs and initializes a _Reserving_node for each tuple messaging block passed in. /// /// The highest-number index of the choice's sources /**/ template void _Initialize_choices() { std::tr1::tuple_element<_Index, _Type>::type _Item = std::tr1::get<_Index>(_M_sourceTuple); _Reserving_node::type>::type::source_type> * _Order_node_element = NULL; if (_M_pScheduleGroup != NULL) { _Order_node_element = new _Reserving_node::type>::type::source_type> (*_M_pScheduleGroup, _Item, _Index); } else if (_M_pScheduler != NULL) { _Order_node_element = new _Reserving_node::type>::type::source_type> (*_M_pScheduler, _Item, _Index); } else { _Order_node_element = new _Reserving_node::type>::type::source_type> (_Item, _Index); } _M_pSourceChoices[_Index] = _Order_node_element; _Order_node_element->link_target(_M_pSingleAssignment); _Initialize_choices<_Index + 1>(); } /// /// Provides a sentinel template specialization for _Initialize_choices recursive /// template expansion. /// /**/ template<> void _Initialize_choices::value>() { } /// /// Deletes all _Reserving_node elements that were created in _Initialize_choices. /// /// The highest-number index of the choice's sources /**/ template void _Delete_choices() { delete reinterpret_cast<_Reserving_node::type>::type::source_type> *>(_M_pSourceChoices[_Index]); _M_pSourceChoices[_Index] = NULL; _Delete_choices<_Index + 1>(); } /// /// Provides a sentinel template specialization for _Delete_choices recursive /// template expansion. /// /**/ template<> void _Delete_choices::value>() { } // Array of pointers to _Reserving_node elements representing each source void * _M_pSourceChoices[std::tr1::tuple_size<_Type>::value]; // Single assignment which chooses between source messaging blocks single_assignment * _M_pSingleAssignment; // Tuple of messaging blocks that are sources to this choice _Type _M_sourceTuple; // The scheduler to propagate messages on Scheduler * _M_pScheduler; // The schedule group to propagate messages on ScheduleGroup * _M_pScheduleGroup; private: // // Hide assignment operator // choice const &operator =(choice const &); // no assignment operator choice(choice const &); // no copy constructor }; // Templated factory functions that create a choice, three flavors #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP /// /// Constructs a choice messaging block from an optional Scheduler or ScheduleGroup /// and two or more input sources. /// /// /// The message block type of the first source. /// /// /// The message block type of the second source. /// /// /// The message block types of additional sources. /// /// /// The Scheduler object within which the propagation task for the choice messaging block is scheduled. /// /// /// The first source. /// /// /// The second source. /// /// /// Additional sources. /// /// /// A choice message block with two or more input sources. /// /// /// /**/ template choice> make_choice(Scheduler& _PScheduler, _Type1 _Item1, _Type2 _Item2, _Types... _Items) { return choice>(_PScheduler, std::make_tuple(_Item1, _Item2, _Items...)); } /// /// Constructs a choice messaging block from an optional Scheduler or ScheduleGroup /// and two or more input sources. /// /// /// The message block type of the first source. /// /// /// The message block type of the second source. /// /// /// The message block types of additional sources. /// /// /// The ScheduleGroup object within which the propagation task for the choice messaging block is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// The first source. /// /// /// The second source. /// /// /// Additional sources. /// /// /// A choice message block with two or more input sources. /// /// /// /**/ template choice> make_choice(ScheduleGroup& _PScheduleGroup, _Type1 _Item1, _Type2 _Item2, _Types... _Items) { return choice>(_PScheduleGroup, std::make_tuple(_Item1, _Item2, _Items...)); } #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */ /// /// Constructs a choice messaging block from an optional Scheduler or ScheduleGroup /// and two or more input sources. /// /// /// The message block type of the first source. /// /// /// The message block type of the second source. /// /// /// The message block types of additional sources. /// /// /// The first source. /// /// /// The second source. /// /// /// Additional sources. /// /// /// A choice message block with two or more input sources. /// /// /**/ template choice> make_choice(_Type1 _Item1, _Type2 _Item2, _Types... _Items) { return choice>(std::make_tuple(_Item1, _Item2, _Items...)); } //************************************************************************** // Join: //************************************************************************** // Template specialization used to unwrap the types from within a tuple. /**/ template struct _Unwrap; /// /// Template specialization used to unwrap the types from within a tuple. /// /// /// The types of the elements of the tuple. /// /**/ template struct _Unwrap> { typedef std::tuple::type::source_type...> type; }; /// /// Defines a block allowing sources of distinct types to be joined. /// Join node is a single-target, multi-source ordered propagator block /// /// /// The payload tuple type /// /// /// The kind of join this is, either 'greedy' or 'non-greedy' /// /**/ template class _Join_node: public propagator_block>, multi_link_registry>> { public: /// /// Constructs a join within the default scheduler, and places it on any schedule /// group of the scheduler's choosing. /// /**/ _Join_node() : _M_counter(std::tr1::tuple_size<_Destination_type>::value) { initialize_source_and_target(); } /// /// Constructs a join within the specified scheduler, and places it on any schedule /// group of the scheduler's choosing. /// /// /// A reference to a scheduler instance. /// /**/ _Join_node(Scheduler& _PScheduler) : _M_counter(std::tr1::tuple_size<_Destination_type>::value) { initialize_source_and_target(&_PScheduler); } /// /// Constructs a join within the specified schedule group. The scheduler is implied /// by the schedule group. /// /// /// A reference to a schedule group. /// /**/ _Join_node(ScheduleGroup& _PScheduleGroup) : _M_counter(std::tr1::tuple_size<_Destination_type>::value) { initialize_source_and_target(NULL, &_PScheduleGroup); } /// /// Cleans up any resources that may have been created by the join. /// /**/ ~_Join_node() { // Remove all links remove_network_links(); // Clean up any messages left in this message block _Delete_stored_messages(); } protected: /// /// Asynchronously passes a message from an ISource block to this ITarget block. It is invoked /// by the propagate method, when called by a source block. /// /// /// A pointer to the message object. /// /// /// A pointer to the source block offering the message. /// /// /// A message_status indication of what /// the target decided to do with the message. /// /**/ virtual message_status propagate_message(message * _PMessage, ISource * _PSource) { // This join block is connected to the _Order_node sources, which know not to send // any more messages until join propagates them further. That is why join can // always accept the incoming messages. _PMessage = _PSource->accept(_PMessage->msg_id(), this); // // Source block created an int message only to notify join that the real // payload is available. There is no need to keep this message around. // _CONCRT_ASSERT(_PMessage != NULL); delete _PMessage; long _Ret_val = _InterlockedDecrement(&_M_counter); _CONCRT_ASSERT(_Ret_val >= 0); if (_Ret_val == 0) { // // All source messages are now received so join can propagate them further // async_send(NULL); } return accepted; } /// /// Accepts an offered message by the source, transferring ownership to the caller. /// /// /// The runtime object identity of the message. /// /// /// A pointer to the message that the caller now has ownership of. /// /**/ virtual message<_Destination_type> * accept_message(runtime_object_identity _MsgId) { // // Peek at the head message in the message buffer. If the IDs match // dequeue and transfer ownership // message<_Destination_type> * _Msg = NULL; if (_M_messageBuffer._Is_head(_MsgId)) { _Msg = _M_messageBuffer._Dequeue(); } return _Msg; } /// /// Reserves a message previously offered by the source. /// /// /// The runtime object identity of the message. /// /// /// A bool indicating whether the reservation worked or not. /// /// /// After reserve is called, if it returns true, either consume or release must be called /// to either take or release ownership of the message. /// /**/ virtual bool reserve_message(runtime_object_identity _MsgId) { // Allow reservation if this is the head message return _M_messageBuffer._Is_head(_MsgId); } /// /// Consumes a message previously offered by the source and reserved by the target, /// transferring ownership to the caller. /// /// /// The runtime object identity of the message. /// /// /// A pointer to the message that the caller now has ownership of. /// /// /// consume_message is similar to accept, but is always preceded by a call to reserve. /// /**/ virtual message<_Destination_type> * consume_message(runtime_object_identity _MsgId) { // By default, accept the message return accept_message(_MsgId); } /// /// Releases a previous message reservation. /// /// /// The runtime object identity of the message. /// /**/ virtual void release_message(runtime_object_identity _MsgId) { // The head message is the one reserved. if (!_M_messageBuffer._Is_head(_MsgId)) { throw message_not_found(); } } /// /// Resumes propagation after a reservation has been released /// /**/ virtual void resume_propagation() { // If there are any messages in the buffer, propagate them out if (_M_messageBuffer._Count() > 0) { async_send(NULL); } } /// /// Notification that a target was linked to this source. /// /// /// A pointer to the newly linked target. /// /**/ virtual void link_target_notification(_Inout_ ITarget<_Destination_type> *) { // There is only a single target. _Propagate_priority_order(_M_messageBuffer); } /// /// Takes the message and propagates it to all the targets of this join block. /// /// /// A pointer to a new message. /// /// /// This function packages source payloads into a tuple message and immediately sends it to the targets. /// /**/ virtual void propagate_to_any_targets(_Inout_opt_ message<_Destination_type> *) { message<_Destination_type> * _Msg = NULL; if (_M_counter == 0) { bool fIsNonGreedy = (_Jtype == non_greedy); if (fIsNonGreedy) { if (!_Non_greedy_acquire_messages()) { return; } } if (!fIsNonGreedy) { // Because a greedy join has captured all input, we can reset // the counter to the total number of inputs _InterlockedExchange(&_M_counter, std::tr1::tuple_size<_Destination_type>::value); } _Msg = _Create_send_message(); } if (_Msg != NULL) { _M_messageBuffer._Enqueue(_Msg); if (!_M_messageBuffer._Is_head(_Msg->msg_id())) { // another message is at the head of the outbound message queue and blocked // simply return return; } } _Propagate_priority_order(_M_messageBuffer); } private: /// /// Tries to reserve from all sources. If successful, it will consume all the messages /// /// /// A bool indicating whether the consumption attempt worked. /// /// /// The highest-number index of the join's sources /// /**/ template bool _Try_consume_source_messages(_Destination_type & _Destination_tuple, ISource ** _Sources) { _Non_greedy_node::type>::type::source_type> * _Node = static_cast<_Non_greedy_node::type>::type::source_type> *>(_Sources[_Index]); // Increment the counter once for each reservation _InterlockedIncrement(&_M_counter); if (_Node->_Reserve_received_message()) { bool _Ret_val = _Try_consume_source_messages<_Index + 1>(_Destination_tuple, _Sources); if (_Ret_val) { _Node->_Consume_received_message(); } else { if (_Node->_Release_received_message()) { // If _Release_received_message() restored the ID, decrement the count for that // restoration if (_InterlockedDecrement(&_M_counter) == 0) { async_send(NULL); } } } return _Ret_val; } return false; } /// /// Provides a sentinel template specialization for _Try_consume_source_messages recursive /// template expansion. /// /// /// A bool indicating whether the consumption attempt worked. /// /**/ template<> bool _Try_consume_source_messages::value>(_Destination_type &, ISource **) { return true; } /// /// Tries to acquire all of the messages from the _Non_greedy_nodes. Each node has already /// indicated that it has received a message that it can try to reserve. This function /// starts the reservation and consume process. /// /// /// A bool indicating whether the reserve/consume of all messages succeeded. /// /**/ bool _Non_greedy_acquire_messages() { _Destination_type _Destination_tuple; // Populate the sources buffer ISource * _Sources[std::tr1::tuple_size<_Type>::value]; size_t _Index = 0; // Get an iterator which will keep a reference on the connected sources source_iterator _Iter = _M_connectedSources.begin(); while (*_Iter != NULL) { ISource * _PSource = *_Iter; if (_PSource == NULL) { // One of the sources disconnected break; } if (_Index >= std::tr1::tuple_size<_Type>::value) { // More sources that we expect break; } _Sources[_Index] = _PSource; _Index++; ++_Iter; } // The order nodes should not have unlinked while the join node is // active. if (_Index != std::tr1::tuple_size<_Type>::value) { // On debug build assert to help debugging _CONCRT_ASSERT(_Index == std::tr1::tuple_size<_Type>::value); return false; } bool _IsAcquireSuccessful = _Try_consume_source_messages<0>(_Destination_tuple, _Sources); return _IsAcquireSuccessful; } /// /// Propagate messages in priority order /// /// /// Reference to a message queue with messages to be propagated /// /**/ void _Propagate_priority_order(::Concurrency::details::_Queue> & _MessageBuffer) { message<_Target_type> * _Msg = _MessageBuffer._Peek(); // If someone has reserved the _Head message, don't propagate anymore if (_M_pReservedFor != NULL) { return; } while (_Msg != NULL) { message_status _Status = declined; // Always start from the first target that linked for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter) { ITarget<_Target_type> * _PTarget = *_Iter; _Status = _PTarget->propagate(_Msg, this); // Ownership of message changed. Do not propagate this // message to any other target. if (_Status == accepted) { break; } // If the target just propagated to reserved this message, stop // propagating it to others if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. Thus, nothing after it in the queue can // be propagated out. Cease propagation. if (_Status != accepted) { break; } // Get the next message _Msg = _MessageBuffer._Peek(); } } /// /// Called when all the source messaging blocks have received their messages. The payloads are copied /// into local tuple and then packaged into a message to be propagated: _M_pSendMessage. /// /**/ message<_Destination_type> * _Create_send_message() { _Destination_type _Destination_tuple; // Populate the sources buffer ISource * _Sources[std::tr1::tuple_size<_Type>::value]; size_t _Index = 0; // Get an iterator which will keep a reference on the connected sources source_iterator _Iter = _M_connectedSources.begin(); while (*_Iter != NULL) { ISource * _PSource = *_Iter; if (_PSource == NULL) { // One of the sources disconnected break; } // Avoid buffer overrun if (_Index >= std::tr1::tuple_size<_Type>::value) { // More sources that we expect break; } _Sources[_Index] = *_Iter; _Index++; ++_Iter; } // The order nodes should not have unlinked while the join node is // active. if (_Index != std::tr1::tuple_size<_Type>::value) { // On debug build assert to help debugging _CONCRT_ASSERT(_Index == std::tr1::tuple_size<_Type>::value); return NULL; } _Populate_destination_tuple<0>(_Destination_tuple, _Sources); return new message<_Destination_type>(_Destination_tuple); } /// /// Deletes all messages currently stored in this message block. Should be called /// by the destructor to ensure any messages propagated in are cleaned up. /// /**/ void _Delete_stored_messages() { // Delete any messages remaining in the output queue for (;;) { message<_Destination_type> * _Msg = _M_messageBuffer._Dequeue(); if (_Msg == NULL) { break; } delete _Msg; } } /// /// Copies payloads from all sources to destination tuple. /// /**/ template void _Populate_destination_tuple(_Destination_type & _Destination_tuple, ISource ** _Sources) { _Order_node_base::type>::type::source_type> * _Node = static_cast<_Order_node_base::type>::type::source_type> *>(_Sources[_Index]); std::tr1::get<_Index>(_Destination_tuple) = _Node->value(); _Node->_Reset(); _Populate_destination_tuple<_Index + 1>(_Destination_tuple, _Sources); } /// /// Provides a sentinel template specialization for _Populate_destination_tuple recursive /// template expansion. /// /**/ template<> void _Populate_destination_tuple::value>(_Destination_type &, ISource **) { } // A tuple containing a collection of source messaging blocks _Type _M_sourceTuple; // Counts messages received by sources of this node and is used to trigger propagation to targets // This value starts at the total number of inputs and counts down to zero. When it reaches zero, // a join of the inputs is started. volatile long _M_counter; // Buffer to hold outgoing messages ::Concurrency::details::_Queue> _M_messageBuffer; private: // // Hide assignment operator and copy constructor // _Join_node(const _Join_node & _Join); // no copy constructor _Join_node const &operator =(_Join_node const &); // no assignment operator }; /// /// A multitype_join messaging block is a multi-source, single-target messaging block that /// combines together messages of different types from each of its sources and offers a tuple /// of the combined messages to its targets. /// /// /// The tuple payload type of the messages joined and propagated by the block. /// /// /// The kind of join block this is, either greedy or non_greedy /// /// /// For more information, see . /// /// /// /// /// /// /// /**/ template class multitype_join: public ISource::type> { public: typedef typename _Unwrap<_Type>::type _Destination_type; /// /// Constructs a multitype_join messaging block. /// /// /// A tuple of sources for this multitype_join messaging block. /// /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// /// /// Move construction is not performed under a lock, which means that it is up to the user /// to make sure that there are no light-weight tasks in flight at the time of moving. /// Otherwise, numerous races can occur, leading to exceptions or inconsistent state. /// /// /// /// /**/ explicit multitype_join(_Type _Tuple) : _M_sourceTuple(_Tuple), _M_pScheduler(NULL), _M_pScheduleGroup(NULL) { _M_pJoinNode = new _Join_node<_Type, _Destination_type, _Jtype>(); _Initialize_joins<0>(); } #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP /// /// Constructs a multitype_join messaging block. /// /// /// The Scheduler object within which the propagation task for the multitype_join messaging block is scheduled. /// /// /// A tuple of sources for this multitype_join messaging block. /// /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// /// /// Move construction is not performed under a lock, which means that it is up to the user /// to make sure that there are no light-weight tasks in flight at the time of moving. /// Otherwise, numerous races can occur, leading to exceptions or inconsistent state. /// /// /// /// /**/ multitype_join(Scheduler& _PScheduler, _Type _Tuple) : _M_sourceTuple(_Tuple), _M_pScheduler(&_PScheduler), _M_pScheduleGroup(NULL) { _M_pJoinNode = new _Join_node<_Type, _Destination_type, _Jtype>(_PScheduler); _Initialize_joins<0>(); } /// /// Constructs a multitype_join messaging block. /// /// /// The ScheduleGroup object within which the propagation task for the multitype_join messaging block is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// A tuple of sources for this multitype_join messaging block. /// /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// /// /// Move construction is not performed under a lock, which means that it is up to the user /// to make sure that there are no light-weight tasks in flight at the time of moving. /// Otherwise, numerous races can occur, leading to exceptions or inconsistent state. /// /// /// /// /**/ multitype_join(ScheduleGroup& _PScheduleGroup, _Type _Tuple) : _M_sourceTuple(_Tuple), _M_pScheduler(NULL), _M_pScheduleGroup(&_PScheduleGroup) { _M_pJoinNode = new _Join_node<_Type, _Destination_type, _Jtype>(_PScheduleGroup); _Initialize_joins<0>(); } #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */ /// /// Constructs a multitype_join messaging block. /// /// /// A multitype_join messaging block to copy from. /// Note that the original object is orphaned, making this a move constructor. /// /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// /// /// Move construction is not performed under a lock, which means that it is up to the user /// to make sure that there are no light-weight tasks in flight at the time of moving. /// Otherwise, numerous races can occur, leading to exceptions or inconsistent state. /// /// /// /// /**/ multitype_join(multitype_join && _Join) { // Copy scheduler group or scheduler to the new object. _M_pScheduleGroup = _Join._M_pScheduleGroup; _M_pScheduler = _Join._M_pScheduler; // Single assignment is heap allocated, so simply copy the pointer. If it already has // a value, it will be preserved. _M_pJoinNode = _Join._M_pJoinNode; _Join._M_pJoinNode = NULL; // Invoke copy assignment for tuple to copy pointers to message blocks. _M_sourceTuple = _Join._M_sourceTuple; // Copy the pointers to order nodes to a new object and zero out in the old object. memcpy(_M_pSourceJoins, _Join._M_pSourceJoins, sizeof(_M_pSourceJoins)); memset(_Join._M_pSourceJoins, 0, sizeof(_M_pSourceJoins)); } /// /// Destroys the multitype_join messaging block. /// /**/ ~multitype_join() { delete _M_pJoinNode; _Delete_joins<0>(); } /// /// A type alias for . /// /**/ typedef typename _Type type; // // ISource public function implementations // /// /// Links a target block to this multitype_join messaging block. /// /// /// A pointer to an ITarget block to link to this multitype_join messaging block. /// /**/ virtual void link_target(_Inout_ ITarget<_Destination_type> * _PTarget) { _M_pJoinNode->link_target(_PTarget); } /// /// Unlinks a target block from this multitype_join messaging block. /// /// /// A pointer to an ITarget block to unlink from this multitype_join messaging block. /// /**/ virtual void unlink_target(_Inout_ ITarget<_Destination_type> * _PTarget) { _M_pJoinNode->unlink_target(_PTarget); } /// /// Unlinks all targets from this multitype_join messaging block. /// /**/ virtual void unlink_targets() { _M_pJoinNode->unlink_targets(); } /// /// Accepts a message that was offered by this multitype_join block, transferring ownership to the caller. /// /// /// The runtime_object_identity of the offered message object. /// /// /// A pointer to the target block that is calling the accept method. /// /// /// A pointer to the message that the caller now has ownership of. /// /**/ virtual message<_Destination_type> * accept(runtime_object_identity _MsgId, _Inout_ ITarget<_Destination_type> * _PTarget) { return _M_pJoinNode->accept(_MsgId, _PTarget); } /// /// Reserves a message previously offered by this multitype_join messaging block. /// /// /// The runtime_object_identity of the message object being reserved. /// /// /// A pointer to the target block that is calling the reserve method. /// /// /// true if the message was successfully reserved, false otherwise. Reservations can fail /// for many reasons, including: the message was already reserved or accepted by another target, the source could /// deny reservations, and so forth. /// /// /// After you call reserve, if it succeeds, you must call either consume or release /// in order to take or give up possession of the message, respectively. /// /**/ virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget<_Destination_type> * _PTarget) { return _M_pJoinNode->reserve(_MsgId, _PTarget); } /// /// Consumes a message previously offered by the multitype_join messaging block and successfully reserved by the target, /// transferring ownership to the caller. /// /// /// The runtime_object_identity of the reserved message object. /// /// /// A pointer to the target block that is calling the consume method. /// /// /// A pointer to the message object that the caller now has ownership of. /// /// /// The consume method is similar to accept, but must always be preceded by a call to reserve that /// returned true. /// /**/ virtual message<_Destination_type> * consume(runtime_object_identity _MsgId, _Inout_ ITarget<_Destination_type> * _PTarget) { return _M_pJoinNode->consume(_MsgId, _PTarget); } /// /// Releases a previous successful message reservation. /// /// /// The runtime_object_identity of the message object being released. /// /// /// A pointer to the target block that is calling the release method. /// /**/ virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget<_Destination_type> * _PTarget) { _M_pJoinNode->release(_MsgId, _PTarget); } /// /// Acquires a reference count on this multitype_join messaging block, to prevent deletion. /// /// /// A pointer to the target block that is calling this method. /// /// /// This method is called by an ITarget object that is being linked to this source /// during the link_target method. /// /**/ virtual void acquire_ref(_Inout_ ITarget<_Destination_type> * _PTarget) { _M_pJoinNode->acquire_ref(_PTarget); } /// /// Releases a reference count on this multiple_join messaging block. /// /// /// A pointer to the target block that is calling this method. /// /// /// This method is called by an ITarget object that is being unlinked from this source. /// The source block is allowed to release any resources reserved for the target block. /// /**/ virtual void release_ref(_Inout_ ITarget<_Destination_type> * _PTarget) { _M_pJoinNode->release_ref(_PTarget); } private: /// /// Constructs and initializes a _Order_node for each tuple messaging block passed in. /// /// /// The highest-number index of the multitype_join's sources /// /**/ template void _Initialize_joins() { std::tr1::tuple_element<_Index, _Type>::type _Item = std::tr1::get<_Index>(_M_sourceTuple); _Order_node_base::type>::type::source_type> * _Order_node_element = NULL; bool fIsNonGreedy = (_Jtype == non_greedy); if (fIsNonGreedy) { if (_M_pScheduleGroup != NULL) { _Order_node_element = new _Non_greedy_node::type>::type::source_type> (*_M_pScheduleGroup, _Item, _Index); } else if (_M_pScheduler != NULL) { _Order_node_element = new _Non_greedy_node::type>::type::source_type> (*_M_pScheduler, _Item, _Index); } else { _Order_node_element = new _Non_greedy_node::type>::type::source_type> (_Item, _Index); } } else { if (_M_pScheduleGroup != NULL) { _Order_node_element = new _Greedy_node::type>::type::source_type> (*_M_pScheduleGroup, _Item, _Index); } else if (_M_pScheduler != NULL) { _Order_node_element = new _Greedy_node::type>::type::source_type> (*_M_pScheduler, _Item, _Index); } else { _Order_node_element = new _Greedy_node::type>::type::source_type> (_Item, _Index); } } _M_pSourceJoins[_Index] = _Order_node_element; _Order_node_element->link_target(_M_pJoinNode); _Initialize_joins<_Index + 1>(); } /// /// Provides a sentinel template specialization for _Initialize_joins recursive /// template expansion. /// /**/ template<> void _Initialize_joins::value>() { } /// /// Deletes all _Order_node elements that were created in _Initialize_joins. /// /// /// The highest-number index of the multitype_join's sources /// /**/ template void _Delete_joins() { delete reinterpret_cast<_Order_node_base::type>::type::source_type> *>(_M_pSourceJoins[_Index]); _M_pSourceJoins[_Index] = NULL; _Delete_joins<_Index + 1>(); } /// /// Provides a sentinel template specialization for _Delete_joins recursive /// template expansion. /// /**/ template<> void _Delete_joins::value>() { } // Array of pointers to _Order_node elements representing each source void * _M_pSourceJoins[std::tr1::tuple_size<_Type>::value]; // Join node that collects source messaging block messages _Join_node<_Type, _Destination_type, _Jtype> * _M_pJoinNode; // Tuple of messaging blocks that are sources to this multitype_join _Type _M_sourceTuple; // The scheduler to propagate messages on Scheduler * _M_pScheduler; // The schedule group to propagate messages on ScheduleGroup * _M_pScheduleGroup; private: // // Hide assignment operator // multitype_join const &operator =(multitype_join const &); // no assignment operator multitype_join(multitype_join const &); // no copy constructor }; // Templated factory functions that create a join, three flavors #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP /// /// Constructs a non_greedy multitype_join messaging block from an optional Scheduler /// or ScheduleGroup and two or more input sources. /// /// /// The message block type of the first source. /// /// /// The message block type of the second source. /// /// /// The message block types of additional sources. /// /// /// The Scheduler object within which the propagation task for the multitype_join messaging block is scheduled. /// /// /// The first source. /// /// /// The second source. /// /// /// Additional sources. /// /// /// A non_greedy multitype_join message block with two or more input sources. /// /// /// /**/ template multitype_join> make_join(Scheduler& _PScheduler, _Type1 _Item1, _Type2 _Item2, _Types... _Items) { return multitype_join>(_PScheduler, std::make_tuple(_Item1, _Item2, _Items...)); } /// /// Constructs a non_greedy multitype_join messaging block from an optional Scheduler /// or ScheduleGroup and two or more input sources. /// /// /// The message block type of the first source. /// /// /// The message block type of the second source. /// /// /// The message block types of additional sources. /// /// /// The ScheduleGroup object within which the propagation task for the multitype_join messaging block is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// The first source. /// /// /// The second source. /// /// /// Additional sources. /// /// /// A non_greedy multitype_join message block with two or more input sources. /// /// /// /**/ template multitype_join> make_join(ScheduleGroup& _PScheduleGroup, _Type1 _Item1, _Type2 _Item2, _Types... _Items) { return multitype_join>(_PScheduleGroup, std::make_tuple(_Item1, _Item2, _Items...)); } #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */ /// /// Constructs a non_greedy multitype_join messaging block from an optional Scheduler /// or ScheduleGroup and two or more input sources. /// /// /// The message block type of the first source. /// /// /// The message block type of the second source. /// /// /// The message block types of additional sources. /// /// /// The first source. /// /// /// The second source. /// /// /// Additional sources. /// /// /// A non_greedy multitype_join message block with two or more input sources. /// /// /**/ template multitype_join> make_join(_Type1 _Item1, _Type2 _Item2, _Types... _Items) { return multitype_join>(std::make_tuple(_Item1, _Item2, _Items...)); } // Templated factory functions that create a *greedy* join, three flavors #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP /// /// Constructs a greedy multitype_join messaging block from an optional Scheduler /// or ScheduleGroup and two or more input sources. /// /// /// The message block type of the first source. /// /// /// The message block type of the second source. /// /// /// The message block types of additional sources. /// /// /// The Scheduler object within which the propagation task for the multitype_join messaging block /// is scheduled. /// /// /// The first source. /// /// /// The second source. /// /// /// Additional sources. /// /// /// A greedy multitype_join message block with two or more input sources. /// /// /// /**/ template multitype_join, greedy> make_greedy_join(Scheduler& _PScheduler, _Type1 _Item1, _Type2 _Item2, _Types... _Items) { return multitype_join, greedy>(_PScheduler, std::make_tuple(_Item1, _Item2, _Items...)); } /// /// Constructs a greedy multitype_join messaging block from an optional Scheduler /// or ScheduleGroup and two or more input sources. /// /// /// The message block type of the first source. /// /// /// The message block type of the second source. /// /// /// The message block types of additional sources. /// /// /// The ScheduleGroup object within which the propagation task for the multitype_join messaging block is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// The first source. /// /// /// The second source. /// /// /// Additional sources. /// /// /// A greedy multitype_join message block with two or more input sources. /// /// /// /**/ template multitype_join, greedy> make_greedy_join(ScheduleGroup& _PScheduleGroup, _Type1 _Item1, _Type2 _Item2, _Types... _Items) { return multitype_join, greedy>(_PScheduleGroup, std::make_tuple(_Item1, _Item2, _Items...)); } #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */ /// /// Constructs a greedy multitype_join messaging block from an optional Scheduler /// or ScheduleGroup and two or more input sources. /// /// /// The message block type of the first source. /// /// /// The message block type of the second source. /// /// /// The message block types of additional sources. /// /// /// The first source. /// /// /// The second source. /// /// /// Additional sources. /// /// /// A greedy multitype_join message block with two or more input sources. /// /// /**/ template multitype_join, greedy> make_greedy_join(_Type1 _Item1, _Type2 _Item2, _Types... _Items) { return multitype_join, greedy>(std::make_tuple(_Item1, _Item2, _Items...)); } //************************************************************************** // Agents: //************************************************************************** /// /// The valid states for an agent. /// /// /// For more information, see . /// /**/ enum agent_status { /// /// The agent has been created but not started. /// /**/ agent_created, /// /// The agent has been started, but not entered its run method. /// /**/ agent_runnable, /// /// The agent has started. /// /**/ agent_started, /// /// The agent finished without being canceled. /// /**/ agent_done, /// /// The agent was canceled. /// /**/ agent_canceled }; /// /// A class intended to be used as a base class for all independent agents. It is used to hide /// state from other agents and interact using message-passing. /// /// /// For more information, see . /// /**/ class agent { public: /// /// Constructs an agent. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// /// /// /**/ _CRTIMP2 agent(); #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP /// /// Constructs an agent. /// /// /// The Scheduler object within which the execution task of the agent is scheduled. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// /// /// /**/ _CRTIMP2 agent(Scheduler& _PScheduler); /// /// Constructs an agent. /// /// /// The ScheduleGroup object within which the execution task of the agent is scheduled. /// The Scheduler object used is implied by the schedule group. /// /// /// The runtime uses the default scheduler if you do not specify the /// or parameters. /// /// /// /**/ _CRTIMP2 agent(ScheduleGroup& _PGroup); #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */ /// /// Destroys the agent. /// /// /// It is an error to destroy an agent that is not in a terminal state (either agent_done or /// agent_canceled). This can be avoided by waiting for the agent to reach a terminal state /// in the destructor of a class that inherits from the agent class. /// /**/ _CRTIMP2 virtual ~agent(); /// /// An asynchronous source of status information from the agent. /// /// /// Returns a message source that can send messages about the current state of the agent. /// /**/ _CRTIMP2 ISource * status_port(); /// /// A synchronous source of status information from the agent. /// /// /// Returns the current state of the agent. Note that this returned state could change /// immediately after being returned. /// /// /**/ _CRTIMP2 agent_status status(); /// /// Moves an agent from the agent_created state to the agent_runnable state, and schedules it for execution. /// /// /// true if the agent started correctly, false otherwise. An agent that has been canceled cannot be started. /// /// /**/ _CRTIMP2 bool start(); /// /// Moves an agent from either the agent_created or agent_runnable states to the agent_canceled state. /// /// /// true if the agent was canceled, false otherwise. An agent cannot be canceled if it has already started /// running or has already completed. /// /// /**/ _CRTIMP2 bool cancel(); /// /// Waits for an agent to complete its task. /// /// /// A pointer to the agent to wait for. /// /// /// The maximum time for which to wait, in milliseconds. /// /// /// The agent_status of the agent when the wait completes. This can either be agent_canceled /// or agent_done. /// /// /// An agent task is completed when the agent enters the agent_canceled or agent_done states. /// If the parameter has a value other than the constant COOPERATIVE_TIMEOUT_INFINITE, /// the exception operation_timed_out is thrown if the specified amount /// of time expires before the agent has completed its task. /// /// /// /// /**/ _CRTIMP2 static agent_status __cdecl wait(_Inout_ agent * _PAgent, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE); /// /// Waits for all of the specified agents to complete their tasks. /// /// /// The number of agent pointers present in the array . /// /// /// An array of pointers to the agents to wait for. /// /// /// A pointer to an array of agent statuses. Each status value will represent the status of the corresponding /// agent when the method returns. /// /// /// The maximum time for which to wait, in milliseconds. /// /// /// An agent task is completed when the agent enters the agent_canceled or agent_done states. /// If the parameter has a value other than the constant COOPERATIVE_TIMEOUT_INFINITE, /// the exception operation_timed_out is thrown if the specified amount /// of time expires before the agent has completed its task. /// /// /// /// /**/ _CRTIMP2 static void __cdecl wait_for_all(size_t _Count, _In_reads_(_Count) agent ** _PAgents, _Out_writes_opt_(_Count) agent_status * _PStatus = NULL, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE); /// /// Waits for any one of the specified agents to complete its task. /// /// /// The number of agent pointers present in the array . /// /// /// An array of pointers to the agents to wait for. /// /// /// A reference to a variable where the agent status will be placed. /// /// /// A reference to a variable where the agent index will be placed. /// /// /// The maximum time for which to wait, in milliseconds. /// /// /// An agent task is completed when the agent enters the agent_canceled or agent_done states. /// If the parameter has a value other than the constant COOPERATIVE_TIMEOUT_INFINITE, /// the exception operation_timed_out is thrown if the specified amount /// of time expires before the agent has completed its task. /// /// /// /// /**/ _CRTIMP2 static void __cdecl wait_for_one(size_t _Count, _In_reads_(_Count) agent ** _PAgents, agent_status& _Status, size_t& _Index, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE); protected: /// /// Represents the main task of an agent. run should be overridden in a derived class, and specifies what /// the agent should do after it has been started. /// /// /// The agent status is changed to agent_started right before this method is invoked. The method should /// invoke done on the agent with an appropriate status before returning, and may not throw any /// exceptions. /// /**/ virtual void run() = 0; /// /// Moves an agent into the agent_done state, indicating that the agent has completed. /// /// /// true if the agent is moved to the agent_done state, false otherwise. An agent that has /// been canceled cannot be moved to the agent_done state. /// /// /// This method should be called at the end of the run method, when you know the execution of your agent /// has completed. /// /// /**/ _CRTIMP2 bool done(); /// /// Holds the current status of the agent. /// /**/ overwrite_buffer _M_status; private: // A flag to check of whether the agent can be started // This is initialized to TRUE and there is a race between Start() and Cancel() to set it // to FALSE. Once Started or Canceled, further calls to Start() or Cancel() will return false. /**/ volatile long _M_fStartable; // A flag to check of whether the agent can be canceled // This is initailized to TRUE and there is a race between Cancel() and the LWT executing // a task that has been started to set it to FALSE. If Cancel() wins, the task will not be // executed. If the LWT wins, Cancel() will return false. /**/ volatile long _M_fCancelable; // A static wrapper function that calls the Run() method. Used for scheduling of the task /**/ static void __cdecl _Agent_task_wrapper(void * data); Scheduler * _M_pScheduler; ScheduleGroup * _M_pScheduleGroup; // // Hide assignment operator and copy constructor // agent const &operator =(agent const&); // no assignment operator agent(agent const &); // no copy constructor }; //************************************************************************** // Direct Messaging APIs: //************************************************************************** /// /// A general receive implementation, allowing a context to wait for data from /// exactly one source and filter the values that are accepted. If the specified timeout is not /// COOPERATIVE_TIMEOUT_INFINITE, an exception (operation_timed_out) will be thrown if the specified amount /// of time expires before a message is received. Note that zero length timeouts should likely use /// try_receive as opposed to receive with a timeout of zero as it is more efficient and does not /// throw exceptions on timeouts. /// /// /// The payload type /// /// /// A pointer to the source from which data is expected. /// /// /// The maximum time for which the method should for the data, in milliseconds. /// /// /// A pointer to a filter which will indicate whether to accept the data or not. /// /// /// A value from the source, of the payload type. /// /**/ template _Type _Receive_impl(ISource<_Type> * _Src, unsigned int _Timeout, typename ITarget<_Type>::filter_method const* _Filter_proc) { // The Blocking Recipient messaging block class is internal to the receive function class _Blocking_recipient : public ITarget<_Type> { public: // Create an Blocking Recipient _Blocking_recipient(ISource<_Type> * _PSource, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE) : _M_pFilter(NULL), _M_pConnectedTo(NULL), _M_pMessage(NULL), _M_fState(_NotInitialized), _M_timeout(_Timeout) { _Connect(_PSource); } // Create an Blocking Recipient _Blocking_recipient(ISource<_Type> * _PSource, filter_method const& _Filter, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE) : _M_pFilter(NULL), _M_pConnectedTo(NULL), _M_pMessage(NULL), _M_fState(_NotInitialized), _M_timeout(_Timeout) { if (_Filter != NULL) { _M_pFilter = new filter_method(_Filter); } _Connect(_PSource); } // Cleans up any resources that may have been created by the BlockingRecipient. ~_Blocking_recipient() { _Disconnect(); delete _M_pFilter; delete _M_pMessage; } // Gets the value of the message sent to this BlockingRecipient. Blocks by // spinning until a message has arrived. _Type _Value() { _Wait_for_message(); return _M_pMessage->payload; } // The main propagation function for ITarget blocks. Called by a source // block, generally within an asynchronous task to send messages to its targets. virtual message_status propagate(message<_Type> * _PMessage, ISource<_Type> * _PSource) { // Throw exception if the message being propagated to this block is NULL if (_PMessage == NULL) { throw std::invalid_argument("_PMessage"); } if (_PSource == NULL) { throw std::invalid_argument("_PSource"); } // Reject if the recipient has already received a message if (_M_fState == _Initialized) { return declined; } // Reject if the message does not meet the filter requirements if (_M_pFilter != NULL && !(*_M_pFilter)(_PMessage->payload)) { return declined; } // Accept the message _CONCRT_ASSERT(_PSource != NULL); _M_pMessage = _PSource->accept(_PMessage->msg_id(), this); if (_M_pMessage != NULL) { // Set the initialized flag on this block if (_InterlockedExchange(&_M_fState, _Initialized) == _Blocked) { _M_ev.set(); } return accepted; } return missed; } // Synchronously sends a message to this block. When this function completes the message will // already have propagated into the block. virtual message_status send(message<_Type> * _PMessage, ISource<_Type> * _PSource) { if (_PMessage == NULL) { throw std::invalid_argument("_PMessage"); } if (_PSource == NULL) { throw std::invalid_argument("_PSource"); } // Only the connected source is allowed to send messages // to the blocking recepient. Decline messages without // a source. return declined; } private: // Link a source block virtual void link_source(ISource<_Type> * _PSrc) { _M_pConnectedTo = _PSrc; _PSrc->acquire_ref(this); } // Remove a source messaging block for this BlockingRecipient virtual void unlink_source(ISource<_Type> * _PSource) { if (_InterlockedCompareExchangePointer(reinterpret_cast(&_M_pConnectedTo), (void *)NULL, _PSource) == _PSource) { _PSource->release_ref(this); } } // Remove the source messaging block for this BlockingRecipient virtual void unlink_sources() { ISource<_Type> * _PSource = reinterpret_cast *>(_InterlockedExchangePointer(reinterpret_cast(&_M_pConnectedTo), (void *)NULL)); if (_PSource != NULL) { _PSource->unlink_target(this); _PSource->release_ref(this); } } // Connect the blocking recipient to the source void _Connect(ISource<_Type> * _PSource) { if (_PSource == NULL) { throw std::invalid_argument("_PSource"); } _PSource->link_target(this); } // Cleanup the connection to the blocking recipient's source. There is no need // to do anything about the associated context. void _Disconnect() { unlink_sources(); } // Internal function used to block while waiting for a message to arrive // at this BlockingRecipient void _Wait_for_message() { bool _Timeout = false; // If we haven't received a message yet, cooperatively block. if (_InterlockedCompareExchange(&_M_fState, _Blocked, _NotInitialized) == _NotInitialized) { if (_M_ev.wait(_M_timeout) == COOPERATIVE_WAIT_TIMEOUT) { _Timeout = true; } } // Unlinking from our source guarantees that there are no threads in propagate _Disconnect(); if (_M_fState != _Initialized) { // We had to have timed out if we came out of the wait // without being initialized. _CONCRT_ASSERT(_Timeout); throw operation_timed_out(); } } // States for this block enum { _NotInitialized, _Blocked, _Initialized }; volatile long _M_fState; // The source messaging block connected to this Recipient ISource<_Type> * _M_pConnectedTo; // The message that was received message<_Type> * volatile _M_pMessage; // The timeout. unsigned int _M_timeout; // The event we wait upon event _M_ev; // The filter that is called on this block before accepting a message filter_method * _M_pFilter; }; if (_Filter_proc != NULL) { _Blocking_recipient _Recipient(_Src, *_Filter_proc, _Timeout); return _Recipient._Value(); } else { _Blocking_recipient _Recipient(_Src, _Timeout); return _Recipient._Value(); } } /// /// A general receive implementation, allowing a context to wait for data from /// exactly one source and filter the values that are accepted. /// /// /// The payload type. /// /// /// A pointer or reference to the source from which data is expected. /// /// /// The maximum time for which the method should for the data, in milliseconds. /// /// /// A value from the source, of the payload type. /// /// /// If the parameter has a value other than the constant COOPERATIVE_TIMEOUT_INFINITE, /// the exception operation_timed_out is thrown if the specified amount /// of time expires before a message is received. If you want a zero length timeout, you should use the /// try_receive function, as opposed to calling receive with a timeout /// of 0 (zero), as it is more efficient and does not throw exceptions on timeouts. /// For more information, see . /// /// /// /// /**/ template _Type receive(_Inout_ ISource<_Type> * _Src, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE) { return _Receive_impl(_Src, _Timeout, NULL); } /// /// A general receive implementation, allowing a context to wait for data from /// exactly one source and filter the values that are accepted. /// /// /// The payload type. /// /// /// A pointer or reference to the source from which data is expected. /// /// /// A filter function which determines whether messages should be accepted. /// /// /// The maximum time for which the method should for the data, in milliseconds. /// /// /// A value from the source, of the payload type. /// /// /// If the parameter has a value other than the constant COOPERATIVE_TIMEOUT_INFINITE, /// the exception operation_timed_out is thrown if the specified amount /// of time expires before a message is received. If you want a zero length timeout, you should use the /// try_receive function, as opposed to calling receive with a timeout /// of 0 (zero), as it is more efficient and does not throw exceptions on timeouts. /// For more information, see . /// /// /// /// /**/ template _Type receive(_Inout_ ISource<_Type> * _Src, typename ITarget<_Type>::filter_method const& _Filter_proc, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE) { return _Receive_impl(_Src, _Timeout, &_Filter_proc); } /// /// A general receive implementation, allowing a context to wait for data from /// exactly one source and filter the values that are accepted. /// /// /// The payload type. /// /// /// A pointer or reference to the source from which data is expected. /// /// /// The maximum time for which the method should for the data, in milliseconds. /// /// /// A value from the source, of the payload type. /// /// /// If the parameter has a value other than the constant COOPERATIVE_TIMEOUT_INFINITE, /// the exception operation_timed_out is thrown if the specified amount /// of time expires before a message is received. If you want a zero length timeout, you should use the /// try_receive function, as opposed to calling receive with a timeout /// of 0 (zero), as it is more efficient and does not throw exceptions on timeouts. /// For more information, see . /// /// /// /// /**/ template _Type receive(ISource<_Type> &_Src, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE) { return _Receive_impl(&_Src, _Timeout, NULL); } /// /// A general receive implementation, allowing a context to wait for data from /// exactly one source and filter the values that are accepted. /// /// /// The payload type. /// /// /// A pointer or reference to the source from which data is expected. /// /// /// A filter function which determines whether messages should be accepted. /// /// /// The maximum time for which the method should for the data, in milliseconds. /// /// /// A value from the source, of the payload type. /// /// /// If the parameter has a value other than the constant COOPERATIVE_TIMEOUT_INFINITE, /// the exception operation_timed_out is thrown if the specified amount /// of time expires before a message is received. If you want a zero length timeout, you should use the /// try_receive function, as opposed to calling receive with a timeout /// of 0 (zero), as it is more efficient and does not throw exceptions on timeouts. /// For more information, see . /// /// /// /// /**/ template _Type receive(ISource<_Type> &_Src, typename ITarget<_Type>::filter_method const& _Filter_proc, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE) { return _Receive_impl(&_Src, _Timeout, &_Filter_proc); } /// /// Helper function that implements try_receive /// A general try-receive implementation, allowing a context to look for data from /// exactly one source and filter the values that are accepted. If the data is not /// ready, try_receive will return false. /// /// /// The payload type /// /// /// A pointer to the source from which data is expected. /// /// /// A reference to a location where the result will be placed. /// /// /// A pointer to a filter which will indicate whether to accept the data or not. /// /// /// A bool indicating whether a payload was placed in or not. /// /**/ template bool _Try_receive_impl(ISource<_Type> * _Src, _Type & _value, typename ITarget<_Type>::filter_method const * _Filter_proc) { // The Immediate Recipient messaging block class is internal to the receive function class _Immediate_recipient : public ITarget<_Type> { public: // Create an Immediate Recipient _Immediate_recipient(ISource<_Type> * _PSource) : _M_pFilter(NULL), _M_pConnectedTo(NULL), _M_pMessage(NULL), _M_isInitialized(0) { _Connect(_PSource); } // Create an Immediate Recipient _Immediate_recipient(ISource<_Type> * _PSource, filter_method const& _Filter) : _M_pFilter(NULL), _M_pConnectedTo(NULL), _M_pMessage(NULL), _M_isInitialized(0) { if (_Filter != NULL) { _M_pFilter = new filter_method(_Filter); } _Connect(_PSource); } // Cleans up any resources that may have been created by the ImmediateRecipient. ~_Immediate_recipient() { _Disconnect(); delete _M_pFilter; delete _M_pMessage; } // Gets the value of the message sent to this ImmediateRecipient. bool _Value(_Type & _value) { // Unlinking from our source guarantees that there are no threads in propagate _Disconnect(); if (_M_pMessage != NULL) { _value = _M_pMessage->payload; return true; } return false; } // The main propagation function for ITarget blocks. Called by a source // block, generally within an asynchronous task to send messages to its targets. virtual message_status propagate(message<_Type> * _PMessage, ISource<_Type> * _PSource) { message_status _Result = accepted; // Throw exception if the message being propagated to this block is NULL if (_PMessage == NULL) { throw std::invalid_argument("_PMessage"); } if (_PSource == NULL) { throw std::invalid_argument("_PSource"); } // Reject if the recipient has already received a message if (_M_isInitialized == 1) { return declined; } // Reject if the message does not meet the filter requirements if (_M_pFilter != NULL && !(*_M_pFilter)(_PMessage->payload)) { return declined; } // Accept the message _CONCRT_ASSERT(_PSource != NULL); _M_pMessage = _PSource->accept(_PMessage->msg_id(), this); // Set the initialized flag on this block if (_M_pMessage != NULL) { // Fence to ensure that the above update to _M_pMessage is visible _InterlockedExchange(&_M_isInitialized, 1); _Result = accepted; } else { _Result = missed; } return _Result; } // Synchronously sends a message to this block. When this function completes the message will // already have propagated into the block. virtual message_status send(message<_Type> * _PMessage, ISource<_Type> * _PSource) { if (_PMessage == NULL) { throw std::invalid_argument("_PMessage"); } if (_PSource == NULL) { throw std::invalid_argument("_PSource"); } // Only the connected source is allowed to send messages // to the blocking recepient. Decline messages without // a source. return declined; } private: // Add a source messaging block virtual void link_source(ISource<_Type> * _PSrc) { _M_pConnectedTo = _PSrc; _PSrc->acquire_ref(this); } // Remove a source messaging block for this BlockingRecipient virtual void unlink_source(ISource<_Type> * _PSource) { if (_InterlockedCompareExchangePointer(reinterpret_cast(&_M_pConnectedTo), (void *)NULL, _PSource) == _PSource) { _PSource->release_ref(this); } } // Remove the source messaging block for this BlockingRecipient virtual void unlink_sources() { ISource<_Type> * _PSource = reinterpret_cast *>(_InterlockedExchangePointer(reinterpret_cast(&_M_pConnectedTo), (void *)NULL)); if (_PSource != NULL) { _PSource->unlink_target(this); _PSource->release_ref(this); } } // Connect to a source block void _Connect(ISource<_Type> * _PSource) { if (_PSource == NULL) { throw std::invalid_argument("_PSource"); } _CONCRT_ASSERT(_M_isInitialized == 0); _PSource->link_target(this); } // // Cleanup the connection to the trigger's source. There is no need // to do anything about the associated context. // void _Disconnect() { unlink_sources(); } // The source messaging block connected to this Recipient ISource<_Type> * _M_pConnectedTo; // The message that was received message<_Type> * volatile _M_pMessage; // A flag for whether or not this block has been initialized with a value volatile long _M_isInitialized; // The filter that is called on this block before accepting a message filter_method * _M_pFilter; }; if (_Filter_proc != NULL) { _Immediate_recipient _Recipient(_Src, *_Filter_proc); return _Recipient._Value(_value); } else { _Immediate_recipient _Recipient(_Src); return _Recipient._Value(_value); } } /// /// A general try-receive implementation, allowing a context to look for data from /// exactly one source and filter the values that are accepted. If the data is not /// ready, the method will return false. /// /// /// The payload type. /// /// /// A pointer or reference to the source from which data is expected. /// /// /// A reference to a location where the result will be placed. /// /// /// A bool value indicating whether or not a payload was placed in . /// /// /// For more information, see . /// /// /// /// /**/ template bool try_receive(_Inout_ ISource<_Type> * _Src, _Type & _value) { return _Try_receive_impl(_Src, _value, NULL); } /// /// A general try-receive implementation, allowing a context to look for data from /// exactly one source and filter the values that are accepted. If the data is not /// ready, the method will return false. /// /// /// The payload type. /// /// /// A pointer or reference to the source from which data is expected. /// /// /// A reference to a location where the result will be placed. /// /// /// A filter function which determines whether messages should be accepted. /// /// /// A bool value indicating whether or not a payload was placed in . /// /// /// For more information, see . /// /// /// /// /**/ template bool try_receive(_Inout_ ISource<_Type> * _Src, _Type & _value, typename ITarget<_Type>::filter_method const& _Filter_proc) { return _Try_receive_impl(_Src, _value, &_Filter_proc); } /// /// A general try-receive implementation, allowing a context to look for data from /// exactly one source and filter the values that are accepted. If the data is not /// ready, the method will return false. /// /// /// The payload type /// /// /// A pointer or reference to the source from which data is expected. /// /// /// A reference to a location where the result will be placed. /// /// /// A bool value indicating whether or not a payload was placed in . /// /// /// For more information, see . /// /// /// /// /**/ template bool try_receive(ISource<_Type> & _Src, _Type & _value) { return _Try_receive_impl(&_Src, _value, NULL); } /// /// A general try-receive implementation, allowing a context to look for data from /// exactly one source and filter the values that are accepted. If the data is not /// ready, the method will return false. /// /// /// The payload type /// /// /// A pointer or reference to the source from which data is expected. /// /// /// A reference to a location where the result will be placed. /// /// /// A filter function which determines whether messages should be accepted. /// /// /// A bool value indicating whether or not a payload was placed in . /// /// /// For more information, see . /// /// /// /// /**/ template bool try_receive(ISource<_Type> & _Src, _Type & _value, typename ITarget<_Type>::filter_method const& _Filter_proc) { return _Try_receive_impl(&_Src, _value, &_Filter_proc); } namespace details { //************************************************************************** // Supporting blocks for send and asend //************************************************************************** // Originator block that pushes messages to a target template class _AnonymousOriginator : public ISource<_Type> { public: typedef single_link_registry> _Target_registry; // Create an Originator _AnonymousOriginator() : _M_pMessage(NULL), _M_pTarget(NULL) { } // Cleans up any resources that may have been created by the Originator. virtual ~_AnonymousOriginator() { delete _M_pMessage; } // Removes a target messaging block for this Originator virtual void unlink_target(ITarget<_Type> * _PTarget) { throw invalid_operation("unlink_target is not supported on _AnonymousOriginator"); } // Removes the target messaging block from this Originator virtual void unlink_targets() { throw invalid_operation("unlink_targets is not supported on _AnonymousOriginator"); } // Accept on this Originator is called by a target to take ownership of a // propagated message virtual message<_Type> * accept(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget) { if (_PTarget != _M_pTarget) { return NULL; } if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId) { return NULL; } // The IDs match, actaully transfer ownership of the message and // unlink away from the target message<_Type> * _Result = _M_pMessage; // The ownership of this message has changed. Set the internal pointer to NULL // so it won't be deleted in the destructor _M_pMessage = NULL; return _Result; } // Reserve shall not be called by blocks that supports push virtual bool reserve(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget) { throw invalid_operation("reserve is not supported on _AnonymousOriginator"); } // Consume shall not be called by blocks that supports push virtual message<_Type> * consume(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget) { throw invalid_operation("consume is not supported on _AnonymousOriginator"); } // Release needs to be defined for ISource blocks, but Originator doesn't need to // do anything for reservation release because there can only be one target block to read // the data at a later time. virtual void release(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget) { throw invalid_operation("release is not supported on _AnonymousOriginator"); } virtual void acquire_ref(_Inout_ ITarget<_Type> *) { throw invalid_operation("acquire_ref is not supported on _AnonymousOriginator"); } virtual void release_ref(_Inout_ ITarget<_Type> *) { throw invalid_operation("release_ref is not supported on _AnonymousOriginator"); } private: friend class _Originator; // Send the given value to the target bool _internal_send(ITarget<_Type> * _PTarget, _Type const & _Value) { _M_pTarget = _PTarget; _CONCRT_ASSERT(_M_pTarget != NULL); _CONCRT_ASSERT(_M_pTarget->supports_anonymous_source()); // Create the message message_status _Status = declined; message<_Type> * _Msg = new message<_Type>(_Value); _CONCRT_ASSERT(_M_pMessage == NULL); _M_pMessage = _Msg; // Send the message _Status = _M_pTarget->send(_M_pMessage, this); // If the message is declined, the destructor will // delete the message // status should not be postponed. _CONCRT_ASSERT(_Status != postponed); return (_Status == accepted); } bool _internal_asend(ITarget<_Type> * _PTarget, _Type const & _Value) { _M_pTarget = _PTarget; _CONCRT_ASSERT(_M_pTarget != NULL); _CONCRT_ASSERT(_M_pTarget->supports_anonymous_source()); // Create the message message_status _Status = declined; message<_Type> * _Msg = new message<_Type>(_Value); _CONCRT_ASSERT(_M_pMessage == NULL); _M_pMessage = _Msg; // Send the message _Status = _M_pTarget->propagate(_M_pMessage, this); // If the message is declined, the destructor will // delete the message // status should not be postponed. if (_Status == postponed) { throw invalid_operation("Messages offered by _AnonymousOriginator shall not be postponed"); } return (_Status == accepted); } // Add a target messaging block for this Originator virtual void link_target(ITarget<_Type> * _PTarget) { throw invalid_operation("link_target is not supported on _AnonymousOriginator"); } // The message that will be propagated by the Originator message<_Type> * _M_pMessage; // The single target for this block ITarget<_Type> * _M_pTarget; }; // The Originator messaging block class is internal to the send function. template class _SyncOriginator : public ISource<_Type> { public: typedef single_link_registry> _Target_registry; // Create an Originator _SyncOriginator() : _M_pMessage(NULL), _M_fStatus(postponed), _M_referenceCount(0) { } // Cleans up any resources that may have been created by the Originator. virtual ~_SyncOriginator() { unlink_targets(); _Wait_on_ref(); delete _M_pMessage; } // Removes a target messaging block for this Originator virtual void unlink_target(ITarget<_Type> * _PTarget) { if (_PTarget == NULL) { throw std::invalid_argument("_PTarget"); } { // Hold the lock to ensure that the target doesn't unlink while // propagation is in progress. _R_lock _Lock(_M_internalLock); if (_M_connectedTargets.remove(_PTarget)) { _Invoke_unlink_source(_PTarget); // Indicate that the send is complete _Done(declined); } } } // Removes the target messaging block from this Originator virtual void unlink_targets() { // Hold the lock to ensure that the target doesn't unlink while // propagation is in progress. _R_lock _Lock(_M_internalLock); for (_Target_registry::iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter) { ITarget<_Type> * _PTarget = *_Iter; if (_M_connectedTargets.remove(_PTarget)) { _Invoke_unlink_source(_PTarget); } } // All targets should be unlinked _CONCRT_ASSERT(_M_connectedTargets.count() == 0); // Indicate that the send is complete _Done(declined); } // Accept on this Originator is called by a target to take ownership of a // propagated message virtual message<_Type> * accept(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget) { if (_PTarget == NULL) { return NULL; } if (!_M_connectedTargets.contains(_PTarget)) { return NULL; } if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId) { return NULL; } // The IDs match, actaully transfer ownership of the message and // unlink away from the target message<_Type> * _Result = _M_pMessage; // The ownership of this message has changed. Set the internal pointer to NULL // so it won't be deleted in the destructor _M_pMessage = NULL; // The message has been accepted/consumed, propagate indication that it has succeeded _Done(accepted); return _Result; } // Reserve needs to be defined for ISource blocks, but Originator doesn't need to // do anything for reservation because there can only be one target block to read // the data at a later time. virtual bool reserve(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget) { if (_PTarget == NULL) { throw std::invalid_argument("_PTarget"); } if (!_M_connectedTargets.contains(_PTarget)) { return false; } if (_M_pMessage->msg_id() != _MsgId) { return false; } return true; } // Consume is called by a target messaging block to take ownership of a // previously reserved message. virtual message<_Type> * consume(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget) { if (_PTarget == NULL) { throw std::invalid_argument("_PTarget"); } if (!_M_connectedTargets.contains(_PTarget)) { throw bad_target(); } return accept(_MsgId, _PTarget); } // Release needs to be defined for ISource blocks, but Originator doesn't need to // do anything for reservation release because there can only be one target block to read // the data at a later time. virtual void release(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget) { if (_PTarget == NULL) { throw std::invalid_argument("_PTarget"); } if (!_M_connectedTargets.contains(_PTarget)) { throw bad_target(); } if ((_M_pMessage == NULL) || (_M_pMessage->msg_id() != _MsgId)) { throw message_not_found(); } // If the previously reserved message is released, then propagate // declined to indicate that the message was not accepted. _Done(declined); } virtual void acquire_ref(_Inout_ ITarget<_Type> *) { _InterlockedIncrement(&_M_referenceCount); } virtual void release_ref(_Inout_ ITarget<_Type> *) { _InterlockedDecrement(&_M_referenceCount); } private: friend class _Originator; // Send the given value to the target bool _internal_send(ITarget<_Type> * _PTarget, _Type const & _Value) { // _send should only be called once. if (_PTarget == NULL) { throw std::invalid_argument("_PTarget"); } message_status _Status = declined; message<_Type> * _Msg = new message<_Type>(_Value); { // Hold the lock to ensure that the target doesn't unlink while // propagation is in progress. _R_lock _Lock(_M_internalLock); // link to the target, create a message and send it link_target(_PTarget); _CONCRT_ASSERT(_M_pMessage == NULL); _M_pMessage = _Msg; // Send the message synchronously to the target _Status = _PTarget->send(_M_pMessage, this); } if (_Status == postponed) { // If the target postponed the message, wait for it to // be accepted/declined. _Wait_for_completion(); // Procure the final status _Status = _M_fStatus; } // status should not be postponed. _CONCRT_ASSERT(_Status != postponed); return (_Status == accepted); } // Add a target messaging block for this Originator virtual void link_target(ITarget<_Type> * _PTarget) { if (_PTarget == NULL) { throw std::invalid_argument("_PTarget"); } _M_connectedTargets.add(_PTarget); _Invoke_link_source(_PTarget); // There should be no pending messages to propagate at this time. _CONCRT_ASSERT(_M_pMessage == NULL); } // Wait for the status to reach one of the terminal // states (!= postponed) void _Wait_for_completion() { // Wait for the event to be signalled _M_ev.wait(COOPERATIVE_TIMEOUT_INFINITE); _CONCRT_ASSERT(_M_fStatus != postponed); } void _Wait_on_ref() { ::Concurrency::details::_SpinWaitBackoffNone spinWait; while(_M_referenceCount != 0) { spinWait._SpinOnce(); } } // Indicate that the send operation has completed void _Done(message_status _Status) { // postponed is not a done state _CONCRT_ASSERT(_Status != postponed); _M_fStatus = _Status; _M_ev.set(); } // The message that will be propagated by the Originator message<_Type> * _M_pMessage; // Event to indicate completion event _M_ev; // Final status of the send volatile message_status _M_fStatus; // A lock for modifying the buffer or the connected blocks ::Concurrency::details::_ReentrantPPLLock _M_internalLock; // Connected targets _Target_registry _M_connectedTargets; volatile long _M_referenceCount; }; // The Originator messaging block class is internal to the send function. template class _AsyncOriginator : public ISource<_Type> { public: typedef single_link_registry> _Target_registry; // Cleans up any resources that may have been created by the AsyncOriginator. virtual ~_AsyncOriginator() { unlink_targets(); delete _M_pMessage; } // Removes a target messaging block for this AsyncOriginator virtual void unlink_target(ITarget<_Type> * _PTarget) { if (_PTarget == NULL) { throw std::invalid_argument("_PTarget"); } bool _Unlinked = false; { // Hold the lock to ensure that the target doesn't unlink while // propagation is in progress. _R_lock _Lock(_M_internalLock); if (_M_connectedTargets.remove(_PTarget)) { _Invoke_unlink_source(_PTarget); _Unlinked = true; } } // Release the lock before decrementing the refcount. Otherwise, the // lock release could corrupt memory. if (_Unlinked) { _Release_ref(); } } // Removes the target messaging block from this AsyncOriginator virtual void unlink_targets() { bool _Unlinked = false; { // Hold the lock to ensure that the target doesn't unlink while // propagation is in progress. _R_lock _Lock(_M_internalLock); for (_Target_registry::iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter) { ITarget<_Type> * _PTarget = *_Iter; if (_M_connectedTargets.remove(_PTarget)) { _Invoke_unlink_source(_PTarget); _Unlinked = true; } } // All targets should be unlinked _CONCRT_ASSERT(_M_connectedTargets.count() == 0); } // Release the lock before decrementing the refcount. Otherwise, the // lock release could corrupt memory. if (_Unlinked) { _Release_ref(); } } // Accept on this AsyncOriginator is called by a target to take ownership of a // propagated message. This can only be called from propagate. virtual message<_Type> * accept(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget) { if (_PTarget == NULL) { return NULL; } if (!_M_connectedTargets.contains(_PTarget)) { return NULL; } if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId) { return NULL; } // // If the IDs match, actaully transfer ownership of the message. // message<_Type> * _Result = _M_pMessage; _M_pMessage = NULL; return _Result; } // Reserve needs to be defined for ISource blocks, but AsyncOriginator doesn't need to // do anything for reservation because there can only be one target block to read // the data at a later time. virtual bool reserve(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget) { if (_PTarget == NULL) { throw std::invalid_argument("_PTarget"); } if (!_M_connectedTargets.contains(_PTarget)) { return false; } if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId) { return false; } return true; } // Consume is called by a target messaging block to take ownership of a // previously reserved message. virtual message<_Type> * consume(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget) { if (_PTarget == NULL) { throw std::invalid_argument("_PTarget"); } if (!_M_connectedTargets.contains(_PTarget)) { throw bad_target(); } if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId) { return NULL; } // The ownership of this message has changed. Set the internal pointer to NULL // so it won't be deleted in the destructor message<_Type> * _Result = _M_pMessage; _M_pMessage = NULL; // We are done. Unlink from the target. DO NOT TOUCH "this" pointer after unlink unlink_target(_PTarget); return _Result; } // Release needs to be defined for ISource blocks, but AsyncOriginator doesn't need to // do anything for reservation release because there can only be one target block to read // the data at a later time. virtual void release(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget) { if (_PTarget == NULL) { throw std::invalid_argument("_PTarget"); } if (!_M_connectedTargets.contains(_PTarget)) { throw bad_target(); } if ((_M_pMessage == NULL) || (_M_pMessage->msg_id() != _MsgId)) { throw message_not_found(); } // We can be connected to only 1 target. Unlink from the target. // DO NOT TOUCH "this" pointer after unlink unlink_target(_PTarget); } virtual void acquire_ref(_Inout_ ITarget<_Type> *) { _Acquire_ref(); } virtual void release_ref(_Inout_ ITarget<_Type> *) { _Release_ref(); } private: friend class _Originator; // Create an AsyncOriginator (constructor is private to ensure that // it is allocated on the heap). _AsyncOriginator() : _M_pMessage(NULL), _M_refcount(0) { } // Send the given value to the target bool _internal_send(ITarget<_Type> * _PTarget, _Type const & _Value) { // Keep a refcount so that this object doesn't get deleted if // the target decides to unlink before we release our lock _Acquire_ref(); message_status _Status = declined; message<_Type> * _Msg = new message<_Type>(_Value); { // Hold the lock to ensure that the target doesn't unlink while // propagation is in progress. _R_lock _Lock(_M_internalLock); // link to the target, create a message and send it link_target(_PTarget); _CONCRT_ASSERT(_M_pMessage == NULL); _M_pMessage = _Msg; _Status = _PTarget->propagate(_M_pMessage, this); } // If the status is anything other than postponed, unlink away // from the target and delete the AsyncOriginator. if (_Status != postponed) { unlink_target(_PTarget); } // Release the reference acquired above _Release_ref(); return (_Status == accepted); } // Add a target messaging block for this AsyncOriginator virtual void link_target(ITarget<_Type> * _PTarget) { if (_PTarget == NULL) { throw std::invalid_argument("_PTarget"); } // Acquire a reference that will be released by unlink_target _Acquire_ref(); _M_connectedTargets.add(_PTarget); _Invoke_link_source(_PTarget); // There should be no pending messages to propagate at this time. _CONCRT_ASSERT(_M_pMessage == NULL); } // Acquire a reference on the async originator object void _Acquire_ref() { _InterlockedIncrement(&_M_refcount); } // Release the reference on the async originator object. The object // will be deleted when the reference count goes to 0. void _Release_ref() { _CONCRT_ASSERT(_M_refcount > 0); if (_InterlockedDecrement(&_M_refcount) == 0) { delete this; } } // The message that will be propagated by the AsyncOriginator message<_Type> * _M_pMessage; // Reference count to manage object lifetime volatile long _M_refcount; // The internal lock for this block for its message ::Concurrency::details::_ReentrantPPLLock _M_internalLock; // connected targets _Target_registry _M_connectedTargets; }; // static class that exposes methods to initiate messages into // a dataflow network class _Originator { public: // Synchronous initiation of messages template static bool _send(ITarget<_Type> * _Trg, const _Type& _Data) { if (_Trg != NULL && _Trg->supports_anonymous_source()) { // _send will block until the message is accepted/rejected. // Note that this invokes the send method on the target which // would synchronously process the message. _AnonymousOriginator<_Type> _Send_block; return _Send_block._internal_send(_Trg, _Data); } else { // Create a blocking originator on the stack. _send will block until the // message is accepted/rejected. _SyncOriginator<_Type> _Orig; return _Orig._internal_send(_Trg, _Data); } } // Asynchronous initiation of messages template static bool _asend(ITarget<_Type> * _Trg, const _Type& _Data) { // If the block can participate in posting messages without requiring a call back, use that // method of initiating the message rather for efficiency purposes. if (_Trg != NULL && _Trg->supports_anonymous_source()) { _AnonymousOriginator<_Type> _Asend_block; return _Asend_block._internal_asend(_Trg, _Data); } else { // Needs to be allocated on the heap _AsyncOriginator<_Type> * _AsyncOrig = new _AsyncOriginator<_Type>; return _AsyncOrig->_internal_send(_Trg, _Data); } } }; } // namespace details /// /// A synchronous send operation, which waits until the target either accepts or declines the message. /// /// /// The payload type. /// /// /// A pointer or reference to the target to which data is sent. /// /// /// A reference to the data to be sent. /// /// /// true if the message was accepted, false otherwise. /// /// /// For more information, see . /// /// /// /// /**/ template bool send(_Inout_ ITarget<_Type> * _Trg, const _Type& _Data) { return details::_Originator::_send(_Trg, _Data); } /// /// A synchronous send operation, which waits until the target either accepts or declines the message. /// /// /// The payload type. /// /// /// A pointer or reference to the target to which data is sent. /// /// /// A reference to the data to be sent. /// /// /// true if the message was accepted, false otherwise. /// /// /// For more information, see . /// /// /// /// /**/ template bool send(ITarget<_Type> &_Trg, const _Type &_Data) { return send(&_Trg, _Data); } /// /// An asynchronous send operation, which schedules a task to propagate the data to the target block. /// /// /// The type of the data to be sent. /// /// /// A pointer or reference to the target to which data is sent. /// /// /// A reference to the data to be sent. /// /// /// true if the message was accepted before the method returned, false otherwise. /// /// /// For more information, see . /// /// /// /// /**/ template bool asend(_Inout_ ITarget<_Type> * _Trg, const _Type& _Data) { return details::_Originator::_asend(_Trg, _Data); } /// /// An asynchronous send operation, which schedules a task to propagate the value to the target block. /// /// /// The type of the data to be sent. /// /// /// A pointer or reference to the target to which data is sent. /// /// /// A reference to the data to be sent. /// /// /// true if the message was accepted, false otherwise. /// /// /// For more information, see . /// /// /// /// /**/ template bool asend(ITarget<_Type> &_Trg, const _Type &_Data) { return asend(&_Trg, _Data); } /// /// Associates the given name to the message block or agent in the ETW trace. /// /// /// The type of the object. This is typically a message block or an agent. /// /// /// A pointer to the message block or agent that is being named in the trace. /// /// /// The name for the given object. /// template void Trace_agents_register_name(_Inout_ _Type * _PObject, _In_z_ const wchar_t * _Name) { _Trace_agents(AGENTS_EVENT_NAME, ::Concurrency::details::_Trace_agents_get_id(_PObject), _Name); } } // namespace Concurrency namespace concurrency = Concurrency; #pragma warning(pop) #pragma pack(pop)