Source code for pfrl.replay_buffers.persistent

import os
import warnings

from pfrl.collections.persistent_collections import PersistentRandomAccessQueue

from .episodic import EpisodicReplayBuffer
from .replay_buffer import ReplayBuffer


[docs]class PersistentReplayBuffer(ReplayBuffer): """Experience replay buffer that are saved to disk storage :py:class:`ReplayBuffer` is used to store sampled experience data, but the data is stored in DRAM memory and removed after program termination. This class add persistence to :py:class:`ReplayBuffer`, so that the learning process can be restarted from a previously saved replay data. Args: dirname (str): Directory name where the buffer data is saved. Please note that it tries to load data from it as well. Also, it would be important to note that it can't be used with ancestor. capacity (int): Capacity in terms of number of transitions ancestor (str): Path to pre-generated replay buffer. The `ancestor` directory is used to load/save, instead of `dirname`. logger: logger object distributed (bool): Use a distributed version for the underlying persistent queue class. You need the private package `pfrlmn` to use this option. group: `torch.distributed` group object. Only used when `distributed=True` and pfrlmn package is available .. note:: Contrary to the original :py:class:`ReplayBuffer` implementation, ``state`` and ``next_state``, ``action`` and ``next_action`` are pickled and stored as different objects even they point to the same object. This may lead to inefficient usage of storage space, but it is recommended to buy more storage - hardware is sometimes cheaper than software. """ def __init__( self, dirname, capacity, *, ancestor=None, logger=None, distributed=False, group=None ): super().__init__(capacity) if not distributed: self.memory = PersistentRandomAccessQueue( dirname, capacity, ancestor=ancestor, logger=logger ) else: try: # Use distributed versions of PersistentRandomAccessQueue import pfrlmn.collections.persistent_collections as mn_coll self.memory = mn_coll.PersistentRandomAccessQueue( dirname, capacity, ancestor=ancestor, logger=logger, group=group ) except ImportError: # "pfrlmn" package is not publicly available as of pfrl release. raise RuntimeError( "`pfrlmn` private package is required " "to enable distributed execution support " "of PersistentReplayBuffer." ) def save(self, _): pass def load(self, _): warnings.warn( "{}.load() has been ignored, as it is persistent replay buffer".format(self) )
[docs]class PersistentEpisodicReplayBuffer(EpisodicReplayBuffer): """Episodic version of :py:class:`PersistentReplayBuffer` Args: dirname (str): Directory name where the buffer data is saved. This cannot be used with `ancestor` capacity (int): Capacity in terms of number of transitions ancestor (str): Path to pre-generated replay buffer. The `ancestor` directory is used to load/save, instead of `dirname`. logger: logger object distributed (bool): Use a distributed version for the underlying persistent queue class. You need the private package `pfrlmn` to use this option. group: `torch.distributed` group object. Only used when `distributed=True` and pfrlmn package is available .. note:: Current implementation is inefficient, as episodic memory and memory data shares the almost same data in :py:class:`EpisodicReplayBuffer` by reference but shows different data structure. Otherwise, persistent version of them does not share the data between them but backing file structure is separated. """ def __init__( self, dirname, capacity, *, ancestor=None, logger=None, distributed=False, group=None ): super().__init__(capacity) self.memory_dir = os.path.join(dirname, "memory") self.episodic_memory_dir = os.path.join(dirname, "episodic_memory") if not distributed: self.memory = PersistentRandomAccessQueue( self.memory_dir, capacity, ancestor=ancestor, logger=logger ) self.episodic_memory = PersistentRandomAccessQueue( self.episodic_memory_dir, capacity, ancestor=ancestor, logger=logger ) else: try: # Use distributed versions of PersistentRandomAccessQueue import pfrlmn.collections.persistent_collections as mn_coll self.memory = mn_coll.PersistentRandomAccessQueue( self.memory_dir, capacity, ancestor=ancestor, logger=logger, group=group, ) self.episodic_memory = mn_coll.PersistentRandomAccessQueue( self.episodic_memory_dir, capacity, ancestor=ancestor, logger=logger, group=group, ) except ImportError: # "pfrlmn" package is not publicly available as of pfrl release. raise RuntimeError( "`pfrlmn` private package is required " "to enable distributed execution support " "of PersistentEpisodicReplayBuffer." ) def save(self, _): pass def load(self, _): warnings.warn( "PersistentEpisodicReplayBuffer.load() is called but it has not effect." )