package app.mealsmadeeasy.api.job; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.transaction.Transactional; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import java.io.PrintWriter; import java.io.StringWriter; import java.time.OffsetDateTime; import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.ThreadLocalRandom; @Service public class JobService { private static final Logger logger = LoggerFactory.getLogger(JobService.class); private final ApplicationContext applicationContext; private final JobRepository jobRepository; private final ObjectMapper objectMapper; @SuppressWarnings("rawtypes") private Map jobHandlers; public JobService(ApplicationContext applicationContext, JobRepository jobRepository, ObjectMapper objectMapper) { this.applicationContext = applicationContext; this.jobRepository = jobRepository; this.objectMapper = objectMapper; } public Job create(String jobType, @Nullable Object payload) { return this.create(jobType, payload, 10, OffsetDateTime.now()); } public Job create(String jobType, @Nullable Object payload, int maxAttempts, OffsetDateTime runAfter) { final var job = new Job(); job.setCreated(OffsetDateTime.now()); job.setState(Job.State.QUEUED); job.setJobKey(jobType); job.setPayload(this.objectMapper.convertValue(payload, JsonNode.class)); job.setAttempts(0); job.setMaxAttempts(maxAttempts); job.setRunAfter(runAfter); return this.jobRepository.save(job); } @SuppressWarnings("rawtypes") private void initJobHandlers() { final Map handlersByBeanName = this.applicationContext.getBeansOfType(JobHandler.class); this.jobHandlers = new HashMap<>(handlersByBeanName); for (final var jobHandler : handlersByBeanName.values()) { this.jobHandlers.put(jobHandler.getJobKey(), jobHandler); } } @SuppressWarnings("unchecked,rawtypes") @Scheduled(fixedDelay = 200) @Transactional public void runOneJob() { final Optional nextJob = this.jobRepository.claimNext(Thread.currentThread().getName()); if (nextJob.isPresent()) { if (this.jobHandlers == null) { this.initJobHandlers(); } final Job job = nextJob.get(); final JobHandler jobHandler = this.jobHandlers.get(job.getJobKey()); if (jobHandler == null) { throw new RuntimeException("There is no registered job handler for " + job.getJobKey()); } final Object payload = this.objectMapper.convertValue( job.getPayload(), jobHandler.getPayloadType() ); try { jobHandler.handle(job, payload); job.setState(Job.State.DONE); job.setModified(OffsetDateTime.now()); job.setLockedBy(null); job.setLockedAt(null); this.jobRepository.save(job); } catch (Exception e) { logger.error("Job {} {} threw an exception: {}", job.getId(), job.getJobKey(), e.getMessage()); final int attemptCount = job.getAttempts() + 1; final boolean isDead = attemptCount >= job.getMaxAttempts(); final OffsetDateTime runAfter = isDead ? OffsetDateTime.now() : OffsetDateTime.now().plusSeconds(getBackoffSeconds(attemptCount)); final String lastError = formatException(e); job.setState(isDead ? Job.State.DEAD : Job.State.QUEUED); job.setAttempts(attemptCount); job.setRunAfter(runAfter); job.setLastError(lastError); job.setModified(OffsetDateTime.now()); job.setLockedBy(null); job.setLockedAt(null); this.jobRepository.save(job); } } } private static long getBackoffSeconds(int attemptCount) { final long base = (long) Math.min(300, Math.pow(2, attemptCount)); final long jitter = ThreadLocalRandom.current().nextLong(0, 5); return base + jitter; } private static String formatException(Exception e) { final var sw = new StringWriter(); e.printStackTrace(new PrintWriter(sw)); final String s = sw.toString(); return s.length() <= 8000 ? s : s.substring(0, 8000); } }