dm->createQueryBuilder('{{bundleName}}:{{className}}') ->field('queue')->equals($queue) ->eagerCursor(true) ->getQuery() ->execute() ->count(); return $count; } /** * @inheritDoc */ public function createRecord(array $data) { /** @var EntityInterface $entity */ $entity = new {{className}}; return $entity->setQueue($data['queue']) ->setPayload($data['payload']) ->setAttempts($data['attempts']) ->setReservedAt($data['reserved_at']) ->setAvailableAt($data['available_at']) ->setCreatedAt($data['created_at']); } /** * @inheritDoc */ public function delete(EntityInterface $entity) { $entity = $this->dm->merge($entity); $this->dm->remove($entity); $this->dm->flush(); } /** * @inheritDoc */ public function saveInBulk(array $entities) { $batchSize = 20; $i = 1; $em = $this->dm; foreach ($entities as $entity) { $em->persist($entity); if (($i % $batchSize) === 0) { $em->flush(); $em->clear(); // Detaches all objects from Doctrine! } ++$i; } $em->flush(); $em->clear(); } /** * @inheritDoc */ public function save(EntityInterface $entity) { $this->dm->persist($entity); $this->dm->flush(); $this->dm->clear(); } /** * @inheritDoc */ public function getNextAvailableJob($queue, $retryAfter) { $currentTime = new \DateTimeImmutable; $expiration = $currentTime->modify((int) ($retryAfter * -1) . ' seconds')->getTimestamp(); $currentTime = $currentTime->getTimestamp(); $builder = $this->dm->createQueryBuilder('{{bundleName}}:{{className}}'); return $builder->findAndUpdate() ->returnNew() ->field('queue')->equals($queue) ->addOr( $builder->expr()->field('reservedAt')->equals(null)->field('availableAt')->lte($currentTime) ) ->addOr($builder->expr()->field('reservedAt')->lte($expiration)) ->sort('id', 'ASC') ->limit(1) ->field('reservedAt')->set($currentTime) ->getQuery() ->execute(); } /** * @inheritDoc */ public function findById($id) { return $this->find($id); } }