Skip to content

Commit

Permalink
Fail Jobs that don't decode as expected
Browse files Browse the repository at this point in the history
Originally, this decoding failure would enter the `Left` case and only
log the error. The Job would hang, neither `ACK`ed nor `FAIL`ed until
its reservation expired.

Now, as long as it is valid JSON, we reach the `Right`-`Just` branch and
attempt to decode it further there. This way, if it can't decode, that
exception occurs in the right place to `FAIL` the job, as desired.

Closes #88.
  • Loading branch information
pbrisbin committed Apr 9, 2024
1 parent febe9bf commit f2d8d61
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
2 changes: 1 addition & 1 deletion library/Faktory/Job.hs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ data Job arg = Job
, jobOptions :: JobOptions
, jobFailure :: Maybe JobFailure
}
deriving stock (Show, Functor)
deriving stock (Show, Functor, Foldable, Traversable)

-- | Perform a Job with the given options
--
Expand Down
10 changes: 7 additions & 3 deletions library/Faktory/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import Faktory.Prelude
import Control.Concurrent (killThread)
import Data.Aeson
import Data.Aeson.Casing
import Data.Aeson.Types (parseEither)
import qualified Data.Text as T
import Faktory.Client
import Faktory.Job (Job, JobId, jobArg, jobJid, jobReserveForMicroseconds)
Expand Down Expand Up @@ -89,7 +90,8 @@ processorLoop
processorLoop client settings workerSettings f = do
let
namespace = connectionInfoNamespace $ settingsConnection settings
processAndAck job = do
processAndAck job' = do
job <- decodeJob job'
mResult <- timeout (jobReserveForMicroseconds job) $ f job
case mResult of
Nothing -> settingsLogError settings "Job reservation period expired."
Expand All @@ -112,14 +114,16 @@ processorLoop client settings workerSettings f = do
failJob client job $ T.pack $ show ex
]

decodeJob :: (HasCallStack, FromJSON arg) => Job Value -> IO (Job arg)
decodeJob = either throwString pure . traverse (parseEither parseJSON)

-- | <https://github.com/contribsys/faktory/wiki/Worker-Lifecycle#heartbeat>
heartBeat :: Client -> WorkerId -> IO ()
heartBeat client workerId = do
threadDelaySeconds 25
command_ client "BEAT" [encode $ BeatPayload workerId]

fetchJob
:: FromJSON args => Client -> Queue -> IO (Either String (Maybe (Job args)))
fetchJob :: Client -> Queue -> IO (Either String (Maybe (Job Value)))
fetchJob client queue = commandJSON client "FETCH" [queueArg queue]

ackJob :: HasCallStack => Client -> Job args -> IO ()
Expand Down

0 comments on commit f2d8d61

Please sign in to comment.