{-# LANGUAGE CPP, NamedFieldPuns, RecordWildCards, ScopedTypeVariables, RankNTypes, DeriveDataTypeable #-}

#if MIN_VERSION_monad_control(0,3,0)
{-# LANGUAGE FlexibleContexts #-}
#endif

#if !MIN_VERSION_base(4,3,0)
{-# LANGUAGE RankNTypes #-}
#endif

-- |
-- Module:      Data.Pool
-- Copyright:   (c) 2011 MailRank, Inc.
-- License:     BSD3
-- Maintainer:  Bryan O'Sullivan <bos@serpentine.com>,
--              Bas van Dijk <v.dijk.bas@gmail.com>
-- Stability:   experimental
-- Portability: portable
--
-- A high-performance striped pooling abstraction for managing
-- flexibly-sized collections of resources such as database
-- connections.
--
-- \"Striped\" means that a single 'Pool' consists of several
-- sub-pools, each managed independently.  A single stripe is fine for
-- many applications, and probably what you should choose by default.
-- More stripes will lead to reduced contention in high-performance
-- multicore applications, at a trade-off of causing the maximum
-- number of simultaneous resources in use to grow.
module Data.Pool
    (
      Pool(idleTime, maxResources, numStripes)
    , LocalPool
    , createPool
    , withResource
    , takeResource
    , tryWithResource
    , tryTakeResource
    , destroyResource
    , putResource
    , destroyAllResources
    ) where

import Control.Applicative ((<$>))
import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread, myThreadId, threadDelay)
import Control.Concurrent.STM
import Control.Exception (SomeException, onException, mask_)
import Control.Monad (forM_, forever, join, liftM3, unless, when)
import Data.Hashable (hash)
import Data.IORef (IORef, newIORef, mkWeakIORef)
import Data.List (partition)
import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)
import Data.Typeable (Typeable)
import GHC.Conc.Sync (labelThread)
import qualified Control.Exception as E
import qualified Data.Vector as V

#if MIN_VERSION_monad_control(0,3,0)
import Control.Monad.Trans.Control (MonadBaseControl, control)
import Control.Monad.Base (liftBase)
#else
import Control.Monad.IO.Control (MonadControlIO, controlIO)
import Control.Monad.IO.Class (liftIO)
#define control controlIO
#define liftBase liftIO
#endif

#if MIN_VERSION_base(4,3,0)
import Control.Exception (mask)
#else
-- Don't do any async exception protection for older GHCs.
mask :: ((forall a. IO a -> IO a) -> IO b) -> IO b
mask f = f id
#endif

-- | A single resource pool entry.
data Entry a = Entry {
      Entry a -> a
entry :: a
    , Entry a -> UTCTime
lastUse :: UTCTime
    -- ^ Time of last return.
    }

-- | A single striped pool.
data LocalPool a = LocalPool {
      LocalPool a -> TVar Int
inUse :: TVar Int
    -- ^ Count of open entries (both idle and in use).
    , LocalPool a -> TVar [Entry a]
entries :: TVar [Entry a]
    -- ^ Idle entries.
    , LocalPool a -> IORef ()
lfin :: IORef ()
    -- ^ empty value used to attach a finalizer to (internal)
    } deriving (Typeable)

data Pool a = Pool {
      Pool a -> IO a
create :: IO a
    -- ^ Action for creating a new entry to add to the pool.
    , Pool a -> a -> IO ()
destroy :: a -> IO ()
    -- ^ Action for destroying an entry that is now done with.
    , Pool a -> Int
numStripes :: Int
    -- ^ The number of stripes (distinct sub-pools) to maintain.
    -- The smallest acceptable value is 1.
    , Pool a -> NominalDiffTime
idleTime :: NominalDiffTime
    -- ^ Amount of time for which an unused resource is kept alive.
    -- The smallest acceptable value is 0.5 seconds.
    --
    -- The elapsed time before closing may be a little longer than
    -- requested, as the reaper thread wakes at 1-second intervals.
    , Pool a -> Int
maxResources :: Int
    -- ^ Maximum number of resources to maintain per stripe.  The
    -- smallest acceptable value is 1.
    --
    -- Requests for resources will block if this limit is reached on a
    -- single stripe, even if other stripes have idle resources
    -- available.
    , Pool a -> Vector (LocalPool a)
localPools :: V.Vector (LocalPool a)
    -- ^ Per-capability resource pools.
    , Pool a -> IORef ()
fin :: IORef ()
    -- ^ empty value used to attach a finalizer to (internal)
    } deriving (Typeable)

instance Show (Pool a) where
    show :: Pool a -> String
show Pool{..} = "Pool {numStripes = " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
numStripes String -> ShowS
forall a. [a] -> [a] -> [a]
++ ", " String -> ShowS
forall a. [a] -> [a] -> [a]
++
                    "idleTime = " String -> ShowS
forall a. [a] -> [a] -> [a]
++ NominalDiffTime -> String
forall a. Show a => a -> String
show NominalDiffTime
idleTime String -> ShowS
forall a. [a] -> [a] -> [a]
++ ", " String -> ShowS
forall a. [a] -> [a] -> [a]
++
                    "maxResources = " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
maxResources String -> ShowS
forall a. [a] -> [a] -> [a]
++ "}"

-- | Create a striped resource pool.
--
-- Although the garbage collector will destroy all idle resources when
-- the pool is garbage collected it's recommended to manually
-- 'destroyAllResources' when you're done with the pool so that the
-- resources are freed up as soon as possible.
createPool
    :: IO a
    -- ^ Action that creates a new resource.
    -> (a -> IO ())
    -- ^ Action that destroys an existing resource.
    -> Int
    -- ^ The number of stripes (distinct sub-pools) to maintain.
    -- The smallest acceptable value is 1.
    -> NominalDiffTime
    -- ^ Amount of time for which an unused resource is kept open.
    -- The smallest acceptable value is 0.5 seconds.
    --
    -- The elapsed time before destroying a resource may be a little
    -- longer than requested, as the reaper thread wakes at 1-second
    -- intervals.
    -> Int
    -- ^ Maximum number of resources to keep open per stripe.  The
    -- smallest acceptable value is 1.
    --
    -- Requests for resources will block if this limit is reached on a
    -- single stripe, even if other stripes have idle resources
    -- available.
     -> IO (Pool a)
createPool :: IO a
-> (a -> IO ()) -> Int -> NominalDiffTime -> Int -> IO (Pool a)
createPool create :: IO a
create destroy :: a -> IO ()
destroy numStripes :: Int
numStripes idleTime :: NominalDiffTime
idleTime maxResources :: Int
maxResources = do
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
numStripes Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< 1) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    String -> String -> IO ()
forall a. String -> String -> a
modError "pool " (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ "invalid stripe count " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
numStripes
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NominalDiffTime
idleTime NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
< 0.5) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    String -> String -> IO ()
forall a. String -> String -> a
modError "pool " (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ "invalid idle time " String -> ShowS
forall a. [a] -> [a] -> [a]
++ NominalDiffTime -> String
forall a. Show a => a -> String
show NominalDiffTime
idleTime
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
maxResources Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< 1) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    String -> String -> IO ()
forall a. String -> String -> a
modError "pool " (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ "invalid maximum resource count " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
maxResources
  Vector (LocalPool a)
localPools <- Int -> IO (LocalPool a) -> IO (Vector (LocalPool a))
forall (m :: * -> *) a. Monad m => Int -> m a -> m (Vector a)
V.replicateM Int
numStripes (IO (LocalPool a) -> IO (Vector (LocalPool a)))
-> IO (LocalPool a) -> IO (Vector (LocalPool a))
forall a b. (a -> b) -> a -> b
$
                (TVar Int -> TVar [Entry a] -> IORef () -> LocalPool a)
-> IO (TVar Int)
-> IO (TVar [Entry a])
-> IO (IORef ())
-> IO (LocalPool a)
forall (m :: * -> *) a1 a2 a3 r.
Monad m =>
(a1 -> a2 -> a3 -> r) -> m a1 -> m a2 -> m a3 -> m r
liftM3 TVar Int -> TVar [Entry a] -> IORef () -> LocalPool a
forall a. TVar Int -> TVar [Entry a] -> IORef () -> LocalPool a
LocalPool (Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO 0) ([Entry a] -> IO (TVar [Entry a])
forall a. a -> IO (TVar a)
newTVarIO []) (() -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ())
  ThreadId
reaperId <- String -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOLabeledWithUnmask "resource-pool: reaper" (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \unmask :: forall a. IO a -> IO a
unmask ->
                IO () -> IO ()
forall a. IO a -> IO a
unmask (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (a -> IO ()) -> NominalDiffTime -> Vector (LocalPool a) -> IO ()
forall a.
(a -> IO ()) -> NominalDiffTime -> Vector (LocalPool a) -> IO ()
reaper a -> IO ()
destroy NominalDiffTime
idleTime Vector (LocalPool a)
localPools
  IORef ()
fin <- () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
  let p :: Pool a
p = Pool :: forall a.
IO a
-> (a -> IO ())
-> Int
-> NominalDiffTime
-> Int
-> Vector (LocalPool a)
-> IORef ()
-> Pool a
Pool {
            IO a
create :: IO a
create :: IO a
create
          , a -> IO ()
destroy :: a -> IO ()
destroy :: a -> IO ()
destroy
          , Int
numStripes :: Int
numStripes :: Int
numStripes
          , NominalDiffTime
idleTime :: NominalDiffTime
idleTime :: NominalDiffTime
idleTime
          , Int
maxResources :: Int
maxResources :: Int
maxResources
          , Vector (LocalPool a)
localPools :: Vector (LocalPool a)
localPools :: Vector (LocalPool a)
localPools
          , IORef ()
fin :: IORef ()
fin :: IORef ()
fin
          }
  IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
fin (ThreadId -> IO ()
killThread ThreadId
reaperId) IO (Weak (IORef ())) -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
    (LocalPool a -> IO (Weak (IORef ())))
-> Vector (LocalPool a) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> Vector a -> m ()
V.mapM_ (\lp :: LocalPool a
lp -> IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef (LocalPool a -> IORef ()
forall a. LocalPool a -> IORef ()
lfin LocalPool a
lp) ((a -> IO ()) -> LocalPool a -> IO ()
forall a. (a -> IO ()) -> LocalPool a -> IO ()
purgeLocalPool a -> IO ()
destroy LocalPool a
lp)) Vector (LocalPool a)
localPools
  Pool a -> IO (Pool a)
forall (m :: * -> *) a. Monad m => a -> m a
return Pool a
p

-- TODO: Propose 'forkIOLabeledWithUnmask' for the base library.

-- | Sparks off a new thread using 'forkIOWithUnmask' to run the given
-- IO computation, but first labels the thread with the given label
-- (using 'labelThread').
--
-- The implementation makes sure that asynchronous exceptions are
-- masked until the given computation is executed. This ensures the
-- thread will always be labeled which guarantees you can always
-- easily find it in the GHC event log.
--
-- Like 'forkIOWithUnmask', the given computation is given a function
-- to unmask asynchronous exceptions. See the documentation of that
-- function for the motivation of this.
--
-- Returns the 'ThreadId' of the newly created thread.
forkIOLabeledWithUnmask :: String
                        -> ((forall a. IO a -> IO a) -> IO ())
                        -> IO ThreadId
forkIOLabeledWithUnmask :: String -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOLabeledWithUnmask label :: String
label m :: (forall a. IO a -> IO a) -> IO ()
m = IO ThreadId -> IO ThreadId
forall a. IO a -> IO a
mask_ (IO ThreadId -> IO ThreadId) -> IO ThreadId -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \unmask :: forall a. IO a -> IO a
unmask -> do
                                    ThreadId
tid <- IO ThreadId
myThreadId
                                    ThreadId -> String -> IO ()
labelThread ThreadId
tid String
label
                                    (forall a. IO a -> IO a) -> IO ()
m forall a. IO a -> IO a
unmask

-- | Periodically go through all pools, closing any resources that
-- have been left idle for too long.
reaper :: (a -> IO ()) -> NominalDiffTime -> V.Vector (LocalPool a) -> IO ()
reaper :: (a -> IO ()) -> NominalDiffTime -> Vector (LocalPool a) -> IO ()
reaper destroy :: a -> IO ()
destroy idleTime :: NominalDiffTime
idleTime pools :: Vector (LocalPool a)
pools = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  Int -> IO ()
threadDelay (1 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000000)
  UTCTime
now <- IO UTCTime
getCurrentTime
  let isStale :: Entry a -> Bool
isStale Entry{..} = UTCTime
now UTCTime -> UTCTime -> NominalDiffTime
`diffUTCTime` UTCTime
lastUse NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
> NominalDiffTime
idleTime
  Vector (LocalPool a) -> (LocalPool a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ Vector (LocalPool a)
pools ((LocalPool a -> IO ()) -> IO ())
-> (LocalPool a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \LocalPool{..} -> do
    [a]
resources <- STM [a] -> IO [a]
forall a. STM a -> IO a
atomically (STM [a] -> IO [a]) -> STM [a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ do
      (stale :: [Entry a]
stale,fresh :: [Entry a]
fresh) <- (Entry a -> Bool) -> [Entry a] -> ([Entry a], [Entry a])
forall a. (a -> Bool) -> [a] -> ([a], [a])
partition Entry a -> Bool
forall a. Entry a -> Bool
isStale ([Entry a] -> ([Entry a], [Entry a]))
-> STM [Entry a] -> STM ([Entry a], [Entry a])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar [Entry a] -> STM [Entry a]
forall a. TVar a -> STM a
readTVar TVar [Entry a]
entries
      Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([Entry a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Entry a]
stale) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
        TVar [Entry a] -> [Entry a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [Entry a]
entries [Entry a]
fresh
        TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
inUse (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract ([Entry a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Entry a]
stale))
      [a] -> STM [a]
forall (m :: * -> *) a. Monad m => a -> m a
return ((Entry a -> a) -> [Entry a] -> [a]
forall a b. (a -> b) -> [a] -> [b]
map Entry a -> a
forall a. Entry a -> a
entry [Entry a]
stale)
    [a] -> (a -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [a]
resources ((a -> IO ()) -> IO ()) -> (a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \resource :: a
resource -> do
      a -> IO ()
destroy a
resource IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` \(SomeException
_::SomeException) -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Destroy all idle resources of the given 'LocalPool' and remove them from
-- the pool.
purgeLocalPool :: (a -> IO ()) -> LocalPool a -> IO ()
purgeLocalPool :: (a -> IO ()) -> LocalPool a -> IO ()
purgeLocalPool destroy :: a -> IO ()
destroy LocalPool{..} = do
  [a]
resources <- STM [a] -> IO [a]
forall a. STM a -> IO a
atomically (STM [a] -> IO [a]) -> STM [a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ do
    [Entry a]
idle <- TVar [Entry a] -> [Entry a] -> STM [Entry a]
forall a. TVar a -> a -> STM a
swapTVar TVar [Entry a]
entries []
    TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
inUse (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract ([Entry a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Entry a]
idle))
    [a] -> STM [a]
forall (m :: * -> *) a. Monad m => a -> m a
return ((Entry a -> a) -> [Entry a] -> [a]
forall a b. (a -> b) -> [a] -> [b]
map Entry a -> a
forall a. Entry a -> a
entry [Entry a]
idle)
  [a] -> (a -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [a]
resources ((a -> IO ()) -> IO ()) -> (a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \resource :: a
resource ->
    a -> IO ()
destroy a
resource IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` \(SomeException
_::SomeException) -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Temporarily take a resource from a 'Pool', perform an action with
-- it, and return it to the pool afterwards.
--
-- * If the pool has an idle resource available, it is used
--   immediately.
--
-- * Otherwise, if the maximum number of resources has not yet been
--   reached, a new resource is created and used.
--
-- * If the maximum number of resources has been reached, this
--   function blocks until a resource becomes available.
--
-- If the action throws an exception of any type, the resource is
-- destroyed, and not returned to the pool.
--
-- It probably goes without saying that you should never manually
-- destroy a pooled resource, as doing so will almost certainly cause
-- a subsequent user (who expects the resource to be valid) to throw
-- an exception.
withResource ::
#if MIN_VERSION_monad_control(0,3,0)
    (MonadBaseControl IO m)
#else
    (MonadControlIO m)
#endif
  => Pool a -> (a -> m b) -> m b
{-# SPECIALIZE withResource :: Pool a -> (a -> IO b) -> IO b #-}
withResource :: Pool a -> (a -> m b) -> m b
withResource pool :: Pool a
pool act :: a -> m b
act = (RunInBase m IO -> IO (StM m b)) -> m b
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b (StM m a)) -> m a
control ((RunInBase m IO -> IO (StM m b)) -> m b)
-> (RunInBase m IO -> IO (StM m b)) -> m b
forall a b. (a -> b) -> a -> b
$ \runInIO :: RunInBase m IO
runInIO -> ((forall a. IO a -> IO a) -> IO (StM m b)) -> IO (StM m b)
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO (StM m b)) -> IO (StM m b))
-> ((forall a. IO a -> IO a) -> IO (StM m b)) -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ \restore :: forall a. IO a -> IO a
restore -> do
  (resource :: a
resource, local :: LocalPool a
local) <- Pool a -> IO (a, LocalPool a)
forall a. Pool a -> IO (a, LocalPool a)
takeResource Pool a
pool
  StM m b
ret <- IO (StM m b) -> IO (StM m b)
forall a. IO a -> IO a
restore (m b -> IO (StM m b)
RunInBase m IO
runInIO (a -> m b
act a
resource)) IO (StM m b) -> IO () -> IO (StM m b)
forall a b. IO a -> IO b -> IO a
`onException`
            Pool a -> LocalPool a -> a -> IO ()
forall a. Pool a -> LocalPool a -> a -> IO ()
destroyResource Pool a
pool LocalPool a
local a
resource
  LocalPool a -> a -> IO ()
forall a. LocalPool a -> a -> IO ()
putResource LocalPool a
local a
resource
  StM m b -> IO (StM m b)
forall (m :: * -> *) a. Monad m => a -> m a
return StM m b
ret
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE withResource #-}
#endif

-- | Take a resource from the pool, following the same results as
-- 'withResource'. Note that this function should be used with caution, as
-- improper exception handling can lead to leaked resources.
--
-- This function returns both a resource and the @LocalPool@ it came from so
-- that it may either be destroyed (via 'destroyResource') or returned to the
-- pool (via 'putResource').
takeResource :: Pool a -> IO (a, LocalPool a)
takeResource :: Pool a -> IO (a, LocalPool a)
takeResource pool :: Pool a
pool@Pool{..} = do
  local :: LocalPool a
local@LocalPool{..} <- Pool a -> IO (LocalPool a)
forall a. Pool a -> IO (LocalPool a)
getLocalPool Pool a
pool
  a
resource <- IO a -> IO a
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO a -> IO a) -> (STM (IO a) -> IO a) -> STM (IO a) -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (IO a) -> IO a
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO a) -> IO a)
-> (STM (IO a) -> IO (IO a)) -> STM (IO a) -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (IO a) -> IO (IO a)
forall a. STM a -> IO a
atomically (STM (IO a) -> IO a) -> STM (IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ do
    [Entry a]
ents <- TVar [Entry a] -> STM [Entry a]
forall a. TVar a -> STM a
readTVar TVar [Entry a]
entries
    case [Entry a]
ents of
      (Entry{..}:es :: [Entry a]
es) -> TVar [Entry a] -> [Entry a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [Entry a]
entries [Entry a]
es STM () -> STM (IO a) -> STM (IO a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO a -> STM (IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
entry)
      [] -> do
        Int
used <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
inUse
        Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
used Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxResources) STM ()
forall a. STM a
retry
        TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
inUse (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! Int
used Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1
        IO a -> STM (IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (IO a -> STM (IO a)) -> IO a -> STM (IO a)
forall a b. (a -> b) -> a -> b
$
          IO a
create IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`onException` STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
inUse (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract 1))
  (a, LocalPool a) -> IO (a, LocalPool a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
resource, LocalPool a
local)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE takeResource #-}
#endif

-- | Similar to 'withResource', but only performs the action if a resource could
-- be taken from the pool /without blocking/. Otherwise, 'tryWithResource'
-- returns immediately with 'Nothing' (ie. the action function is /not/ called).
-- Conversely, if a resource can be borrowed from the pool without blocking, the
-- action is performed and it's result is returned, wrapped in a 'Just'.
tryWithResource :: forall m a b.
#if MIN_VERSION_monad_control(0,3,0)
    (MonadBaseControl IO m)
#else
    (MonadControlIO m)
#endif
  => Pool a -> (a -> m b) -> m (Maybe b)
tryWithResource :: Pool a -> (a -> m b) -> m (Maybe b)
tryWithResource pool :: Pool a
pool act :: a -> m b
act = (RunInBase m IO -> IO (StM m (Maybe b))) -> m (Maybe b)
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b (StM m a)) -> m a
control ((RunInBase m IO -> IO (StM m (Maybe b))) -> m (Maybe b))
-> (RunInBase m IO -> IO (StM m (Maybe b))) -> m (Maybe b)
forall a b. (a -> b) -> a -> b
$ \runInIO :: RunInBase m IO
runInIO -> ((forall a. IO a -> IO a) -> IO (StM m (Maybe b)))
-> IO (StM m (Maybe b))
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO (StM m (Maybe b)))
 -> IO (StM m (Maybe b)))
-> ((forall a. IO a -> IO a) -> IO (StM m (Maybe b)))
-> IO (StM m (Maybe b))
forall a b. (a -> b) -> a -> b
$ \restore :: forall a. IO a -> IO a
restore -> do
  Maybe (a, LocalPool a)
res <- Pool a -> IO (Maybe (a, LocalPool a))
forall a. Pool a -> IO (Maybe (a, LocalPool a))
tryTakeResource Pool a
pool
  case Maybe (a, LocalPool a)
res of
    Just (resource :: a
resource, local :: LocalPool a
local) -> do
      StM m (Maybe b)
ret <- IO (StM m (Maybe b)) -> IO (StM m (Maybe b))
forall a. IO a -> IO a
restore (m (Maybe b) -> IO (StM m (Maybe b))
RunInBase m IO
runInIO (b -> Maybe b
forall a. a -> Maybe a
Just (b -> Maybe b) -> m b -> m (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> m b
act a
resource)) IO (StM m (Maybe b)) -> IO () -> IO (StM m (Maybe b))
forall a b. IO a -> IO b -> IO a
`onException`
                Pool a -> LocalPool a -> a -> IO ()
forall a. Pool a -> LocalPool a -> a -> IO ()
destroyResource Pool a
pool LocalPool a
local a
resource
      LocalPool a -> a -> IO ()
forall a. LocalPool a -> a -> IO ()
putResource LocalPool a
local a
resource
      StM m (Maybe b) -> IO (StM m (Maybe b))
forall (m :: * -> *) a. Monad m => a -> m a
return StM m (Maybe b)
ret
    Nothing -> IO (StM m (Maybe b)) -> IO (StM m (Maybe b))
forall a. IO a -> IO a
restore (IO (StM m (Maybe b)) -> IO (StM m (Maybe b)))
-> (m (Maybe b) -> IO (StM m (Maybe b)))
-> m (Maybe b)
-> IO (StM m (Maybe b))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Maybe b) -> IO (StM m (Maybe b))
RunInBase m IO
runInIO (m (Maybe b) -> IO (StM m (Maybe b)))
-> m (Maybe b) -> IO (StM m (Maybe b))
forall a b. (a -> b) -> a -> b
$ Maybe b -> m (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe b
forall a. Maybe a
Nothing :: Maybe b)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE tryWithResource #-}
#endif

-- | A non-blocking version of 'takeResource'. The 'tryTakeResource' function
-- returns immediately, with 'Nothing' if the pool is exhausted, or @'Just' (a,
-- 'LocalPool' a)@ if a resource could be borrowed from the pool successfully.
tryTakeResource :: Pool a -> IO (Maybe (a, LocalPool a))
tryTakeResource :: Pool a -> IO (Maybe (a, LocalPool a))
tryTakeResource pool :: Pool a
pool@Pool{..} = do
  local :: LocalPool a
local@LocalPool{..} <- Pool a -> IO (LocalPool a)
forall a. Pool a -> IO (LocalPool a)
getLocalPool Pool a
pool
  Maybe a
resource <- IO (Maybe a) -> IO (Maybe a)
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO (Maybe a) -> IO (Maybe a))
-> (STM (IO (Maybe a)) -> IO (Maybe a))
-> STM (IO (Maybe a))
-> IO (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (IO (Maybe a)) -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO (Maybe a)) -> IO (Maybe a))
-> (STM (IO (Maybe a)) -> IO (IO (Maybe a)))
-> STM (IO (Maybe a))
-> IO (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (IO (Maybe a)) -> IO (IO (Maybe a))
forall a. STM a -> IO a
atomically (STM (IO (Maybe a)) -> IO (Maybe a))
-> STM (IO (Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ do
    [Entry a]
ents <- TVar [Entry a] -> STM [Entry a]
forall a. TVar a -> STM a
readTVar TVar [Entry a]
entries
    case [Entry a]
ents of
      (Entry{..}:es :: [Entry a]
es) -> TVar [Entry a] -> [Entry a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [Entry a]
entries [Entry a]
es STM () -> STM (IO (Maybe a)) -> STM (IO (Maybe a))
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (Maybe a) -> STM (IO (Maybe a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)) -> (a -> Maybe a) -> a -> IO (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just (a -> IO (Maybe a)) -> a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ a
entry)
      [] -> do
        Int
used <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
inUse
        if Int
used Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxResources
          then IO (Maybe a) -> STM (IO (Maybe a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing)
          else do
            TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
inUse (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! Int
used Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1
            IO (Maybe a) -> STM (IO (Maybe a))
forall (m :: * -> *) a. Monad m => a -> m a
return (IO (Maybe a) -> STM (IO (Maybe a)))
-> IO (Maybe a) -> STM (IO (Maybe a))
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> IO a -> IO (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
              IO a
create IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`onException` STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
inUse (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract 1))
  Maybe (a, LocalPool a) -> IO (Maybe (a, LocalPool a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (a, LocalPool a) -> IO (Maybe (a, LocalPool a)))
-> Maybe (a, LocalPool a) -> IO (Maybe (a, LocalPool a))
forall a b. (a -> b) -> a -> b
$ ((a -> LocalPool a -> (a, LocalPool a))
-> LocalPool a -> a -> (a, LocalPool a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip (,) LocalPool a
local) (a -> (a, LocalPool a)) -> Maybe a -> Maybe (a, LocalPool a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe a
resource
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE tryTakeResource #-}
#endif

-- | Get a (Thread-)'LocalPool'
--
-- Internal, just to not repeat code for 'takeResource' and 'tryTakeResource'
getLocalPool :: Pool a -> IO (LocalPool a)
getLocalPool :: Pool a -> IO (LocalPool a)
getLocalPool Pool{..} = do
  Int
i <- IO Int -> IO Int
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO Int -> IO Int) -> IO Int -> IO Int
forall a b. (a -> b) -> a -> b
$ ((Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
numStripes) (Int -> Int) -> (ThreadId -> Int) -> ThreadId -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> Int
forall a. Hashable a => a -> Int
hash) (ThreadId -> Int) -> IO ThreadId -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO ThreadId
myThreadId
  LocalPool a -> IO (LocalPool a)
forall (m :: * -> *) a. Monad m => a -> m a
return (LocalPool a -> IO (LocalPool a))
-> LocalPool a -> IO (LocalPool a)
forall a b. (a -> b) -> a -> b
$ Vector (LocalPool a)
localPools Vector (LocalPool a) -> Int -> LocalPool a
forall a. Vector a -> Int -> a
V.! Int
i
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE getLocalPool #-}
#endif

-- | Destroy a resource. Note that this will ignore any exceptions in the
-- destroy function.
destroyResource :: Pool a -> LocalPool a -> a -> IO ()
destroyResource :: Pool a -> LocalPool a -> a -> IO ()
destroyResource Pool{..} LocalPool{..} resource :: a
resource = do
   a -> IO ()
destroy a
resource IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` \(SomeException
_::SomeException) -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
   STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
inUse (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract 1))
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE destroyResource #-}
#endif

-- | Return a resource to the given 'LocalPool'.
putResource :: LocalPool a -> a -> IO ()
putResource :: LocalPool a -> a -> IO ()
putResource LocalPool{..} resource :: a
resource = do
    UTCTime
now <- IO UTCTime
getCurrentTime
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar [Entry a] -> ([Entry a] -> [Entry a]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar [Entry a]
entries (a -> UTCTime -> Entry a
forall a. a -> UTCTime -> Entry a
Entry a
resource UTCTime
nowEntry a -> [Entry a] -> [Entry a]
forall a. a -> [a] -> [a]
:)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE putResource #-}
#endif

-- | Destroy all resources in all stripes in the pool. Note that this
-- will ignore any exceptions in the destroy function.
--
-- This function is useful when you detect that all resources in the
-- pool are broken. For example after a database has been restarted
-- all connections opened before the restart will be broken. In that
-- case it's better to close those connections so that 'takeResource'
-- won't take a broken connection from the pool but will open a new
-- connection instead.
--
-- Another use-case for this function is that when you know you are
-- done with the pool you can destroy all idle resources immediately
-- instead of waiting on the garbage collector to destroy them, thus
-- freeing up those resources sooner.
destroyAllResources :: Pool a -> IO ()
destroyAllResources :: Pool a -> IO ()
destroyAllResources Pool{..} = Vector (LocalPool a) -> (LocalPool a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ Vector (LocalPool a)
localPools ((LocalPool a -> IO ()) -> IO ())
-> (LocalPool a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ (a -> IO ()) -> LocalPool a -> IO ()
forall a. (a -> IO ()) -> LocalPool a -> IO ()
purgeLocalPool a -> IO ()
destroy

modifyTVar_ :: TVar a -> (a -> a) -> STM ()
modifyTVar_ :: TVar a -> (a -> a) -> STM ()
modifyTVar_ v :: TVar a
v f :: a -> a
f = TVar a -> STM a
forall a. TVar a -> STM a
readTVar TVar a
v STM a -> (a -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a :: a
a -> TVar a -> a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar a
v (a -> STM ()) -> a -> STM ()
forall a b. (a -> b) -> a -> b
$! a -> a
f a
a

modError :: String -> String -> a
modError :: String -> String -> a
modError func :: String
func msg :: String
msg =
    String -> a
forall a. HasCallStack => String -> a
error (String -> a) -> String -> a
forall a b. (a -> b) -> a -> b
$ "Data.Pool." String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
func String -> ShowS
forall a. [a] -> [a] -> [a]
++ ": " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
msg