Summit external ingestion

Reduce DB lock contention to improve
performance

Change-Id: I8adb2b653ef0932890e882787957599121227f80
This commit is contained in:
smarcet 2019-08-31 03:39:20 -03:00
parent 1be8ec1b8e
commit 1ff312481e
8 changed files with 145 additions and 89 deletions

View File

@ -59,6 +59,7 @@ final class ExternalScheduleFeedIngestionCommand extends Command {
public function handle()
{
$this->info("starting summits external ingestion");
$start = time();
$this->service->ingestAllSummits();
$end = time();

View File

@ -50,4 +50,11 @@ interface ISummitEventRepository extends IBaseRepository
* @param int $event_id
*/
public function cleanupScheduleAndFavoritesForEvent($event_id);
/**
* @param Summit $summit
* @param array $external_ids
* @return mixed
*/
public function getPublishedEventsBySummitNotInExternalIds(Summit $summit, array $external_ids);
}

View File

@ -11,7 +11,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
**/
use Illuminate\Support\Facades\Log;
use libs\utils\ITransactionService;
use models\main\IMemberRepository;
@ -126,31 +125,25 @@ final class ResourceServerContext implements IResourceServerContext
public function getCurrentUser(): ?Member
{
return $this->tx_service->transaction(function() {
Log::debug("ResourceServerContext::getCurrentUser");
$member = null;
// legacy test, for new IDP version this value came on null
$id = $this->getCurrentUserExternalId();
if(!is_null($id)){
Log::debug(sprintf("ResourceServerContext::getCurrentUser: getCurrentUserExternalId is %s", $id));
$member = $this->member_repository->getById(intval($id));
if(!is_null($member)) return $member;
}
// is null
if(is_null($member)){
Log::debug("ResourceServerContext::getCurrentUser: getCurrentUserExternalId is null");
// try to get by external id
$id = $this->getCurrentUserId();
if(is_null($id)) {
Log::debug("ResourceServerContext::getCurrentUser: getCurrentUserId is null");
return null;
}
Log::debug(sprintf("ResourceServerContext::getCurrentUser: getCurrentUserId is %s", $id));
$member = $this->member_repository->getByExternalId(intval($id));
}
if(is_null($member)){
Log::debug("ResourceServerContext::getCurrentUser: member is null");
// we assume that is new idp version and claims alreaady exists on context
$user_external_id = $this->getAuthContextVar('user_id');
$user_first_name = $this->getAuthContextVar('user_first_name');

View File

@ -489,8 +489,8 @@ SQL;
public function getByFullName(string $fullname): ?PresentationSpeaker
{
$speakerFullNameParts = explode(" ", $fullname);
$speakerFirstName = trim(trim(array_pop($speakerFullNameParts)));
$speakerLastName = trim(implode(" ", $speakerFullNameParts));
$speakerLastName = trim(trim(array_pop($speakerFullNameParts)));
$speakerFirstName = trim(implode(" ", $speakerFullNameParts));
return $this->getEntityManager()
->createQueryBuilder()

View File

@ -14,6 +14,7 @@
use App\Models\Foundation\Main\IGroup;
use Doctrine\ORM\Tools\Pagination\Paginator;
use models\summit\ISummitEventRepository;
use models\summit\Summit;
use models\summit\SummitEvent;
use App\Repositories\SilverStripeDoctrineRepository;
use utils\DoctrineCaseFilterMapping;
@ -351,4 +352,23 @@ final class DoctrineSummitEventRepository
$data
);
}
/**
* @param Summit $summit,
* @param array $external_ids
* @return mixed
*/
public function getPublishedEventsBySummitNotInExternalIds(Summit $summit, array $external_ids)
{
$query = $this->getEntityManager()->createQueryBuilder()
->select("e")
->from(\models\summit\SummitEvent::class, "e")
->join('e.summit', 's', Join::WITH, " s.id = :summit_id")
->where('e.published = 1')
->andWhere('e.external_id not in (:external_ids)')
->setParameter('summit_id', $summit->getId())
->setParameter('external_ids', $external_ids);
return $query->getQuery()->getResult();
}
}

View File

@ -175,7 +175,7 @@ final class DoctrineSummitRepository
->andWhere("e.api_feed_url <> ''")
->andWhere("e.api_feed_key is not null")
->andWhere("e.api_feed_key <>''")
->orderBy('e.begin_date', 'DESC')
->orderBy('e.id', 'DESC')
->getQuery()
->getResult();
}

View File

@ -105,29 +105,25 @@ final class ScheduleIngestionService
*/
public function ingestAllSummits(): void
{
foreach ($this->summit_repository->getWithExternalFeed() as $summit) {
$processedExternalIds = $this->tx_service->transaction(function () use($summit) {
$summits = $this->tx_service->transaction(function () {
return $this->summit_repository->getWithExternalFeed();
});
foreach ($summits as $summit) {
$processedExternalIds = $this->ingestSummit($summit);
$this->tx_service->transaction(function () use ($summit, $processedExternalIds) {
foreach ($this->event_repository->getPublishedEventsBySummitNotInExternalIds($summit, $processedExternalIds) as $presentation) {
try {
return $this->ingestSummit($summit);
$this->event_repository->delete($presentation);
} catch (Exception $ex) {
Log::error(sprintf("error external feed for summit id %s", $summit->getId()));
Log::error($ex);
}
});
$this->tx_service->transaction(function () use($summit, $processedExternalIds) {
foreach ($summit->getPublishedPresentations() as $presentation) {
try {
if ($presentation instanceof Presentation && !empty($presentation->getExternalId()) && !in_array($presentation->getExternalId(), $processedExternalIds))
$this->event_repository->delete($presentation);
}
catch (Exception $ex) {
Log::error($ex);
}
}
});
}
}
});
}
}
/**
@ -138,20 +134,22 @@ final class ScheduleIngestionService
public function ingestSummit(Summit $summit): array
{
return $this->tx_service->transaction(function () use ($summit) {
$processedExternalIds = [];
$processedExternalIds = [];
try {
Log::debug(sprintf("ingesting summit %s", $summit->getId()));
$feed = $this->feed_factory->build($summit);
if (is_null($feed))
throw new \InvalidArgumentException("invalid feed");
try {
$start = time();
$summit_id = $summit->getId();
Log::debug(sprintf("ScheduleIngestionService::ingestSummit:: ingesting summit %s", $summit->getId()));
$feed = $this->feed_factory->build($summit);
if (is_null($feed))
throw new \InvalidArgumentException("invalid feed");
$this->tx_service->transaction(function () use ($summit_id) {
$summit = $this->summit_repository->getById($summit_id);
$mainVenues = $summit->getMainVenues();
if (count($mainVenues) == 0)
throw new ValidationException(sprintf("summit %s does not has a main venue set!", $summit->getId()));
// get first as default
$mainVenue = $mainVenues[0];
if (is_null($summit->getBeginDate()) || is_null($summit->getEndDate()))
throw new ValidationException(sprintf("summit %s does not has set begin date/end date", $summit->getId()));
@ -159,9 +157,6 @@ final class ScheduleIngestionService
if (is_null($summit->getTimeZone()))
throw new ValidationException(sprintf("summit %s does not has set a valid time zone", $summit->getId()));
$events = $feed->getEvents();
$speakers = $feed->getSpeakers();
// get presentation type from summit
$presentationType = $summit->getEventTypeByType(IPresentationType::Presentation);
if (is_null($presentationType)) {
@ -174,39 +169,40 @@ final class ScheduleIngestionService
$presentationType->setMinModerators(0);
$summit->addEventType($presentationType);
}
});
$trackStorage = [];
$locationStorage = [];
$affiliationStorage = [];
$events = $feed->getEvents();
$speakers = $feed->getSpeakers();
foreach ($events as $event) {
foreach ($events as $event) {
try {
try {
// track
$external_id = $this->tx_service->transaction(function () use ($summit_id, $event, $speakers) {
Log::debug(sprintf("processing event %s - %s for summit %s", $event['external_id'], $event['title'], $summit_id));
// get first as default
$summit = $this->summit_repository->getById($summit_id);
if (is_null($summit) || !$summit instanceof Summit) return null;
$mainVenues = $summit->getMainVenues();
$mainVenue = $mainVenues[0];
$presentationType = $summit->getEventTypeByType(IPresentationType::Presentation);
$track = $summit->getPresentationCategoryByTitle($event['track']);
if (is_null($track) && isset($trackStorage[$event['track']]))
$track = $trackStorage[$event['track']];
if (is_null($track)) {
$track = new PresentationCategory();
$track->setTitle($event['track']);
$summit->addPresentationCategory($track);
$trackStorage[$event['track']] = $track;
}
// location
$location = null;
if (isset($event['location'])) {
$location = $summit->getLocationByName($event['location']);
if (is_null($location) && isset($locationStorage[$event['location']]))
$location = $locationStorage[$event['location']];
if (is_null($location)) {
$location = new SummitVenueRoom();
$location->setName($event['location']);
$mainVenue->addRoom($location);
$locationStorage[$event['location']] = $location;
}
}
@ -219,10 +215,10 @@ final class ScheduleIngestionService
$speakerFirstName = trim(implode(" ", $speakerFullNameParts));
$foundSpeaker = isset($speakers[$speakerFullName]) ? $speakers[$speakerFullName] : null;
if(is_null($foundSpeaker)){
if (is_null($foundSpeaker)) {
// partial match
$result_array = preg_grep("/{$speakerFullName}/i",array_keys($speakers));
if(count($result_array) > 0){
$result_array = preg_grep("/{$speakerFullName}/i", array_keys($speakers));
if (count($result_array) > 0) {
$foundSpeaker = $speakers[array_values($result_array)[0]];
}
}
@ -245,8 +241,6 @@ final class ScheduleIngestionService
// check affiliations
if (!empty($companyName)) {
$affiliation = $member->getAffiliationByOrgName($companyName);
if (is_null($affiliation) && isset($affiliationStorage[sprintf("%s_%s", $member->getId(), $companyName)]))
$affiliation = $affiliationStorage[sprintf("%s_%s", $member->getId(), $companyName)];
if (is_null($affiliation)) {
$affiliation = new Affiliation();
@ -259,7 +253,6 @@ final class ScheduleIngestionService
$affiliation->setOrganization($org);
$affiliation->setIsCurrent(true);
$member->addAffiliation($affiliation);
$affiliationStorage[sprintf("%s_%s", $member->getId(), $companyName)] = $affiliation;
}
}
@ -321,18 +314,26 @@ final class ScheduleIngestionService
if (!$presentation->isPublished())
$presentation->publish();
$processedExternalIds[] = $event['external_id'];
} catch (Exception $ex) {
Log::warning(sprintf("error external feed for summit id %s", $summit->getId()));
Log::warning($ex);
}
}
} catch (Exception $ex) {
Log::warning(sprintf("error external feed for summit id %s", $summit->getId()));
Log::warning($ex);
}
return $event['external_id'];
});
if (!is_null($external_id))
$processedExternalIds[] = $external_id;
} catch (Exception $ex) {
Log::warning(sprintf("error external feed for summit id %s", $summit->getId()));
Log::warning($ex);
}
}
$end = time();
$delta = $end - $start;
log::debug(sprintf("ScheduleIngestionService::ingestSummit execution call %s seconds - summit %s", $delta, $summit->getId()));
} catch (Exception $ex) {
Log::warning(sprintf("error external feed for summit id %s", $summit->getId()));
Log::warning($ex);
}
return $processedExternalIds;
return $processedExternalIds;
});
}
}

View File

@ -15,7 +15,8 @@ use Illuminate\Support\Facades\Log;
use libs\utils\ITransactionService;
use Closure;
use LaravelDoctrine\ORM\Facades\Registry;
use Doctrine\DBAL\Exception\RetryableException;
use Exception;
/**
* Class DoctrineTransactionService
* @package services\utils
@ -27,6 +28,8 @@ final class DoctrineTransactionService implements ITransactionService
*/
private $manager_name;
const MaxRetries = 3;
/**
* DoctrineTransactionService constructor.
* @param string $manager_name
@ -36,7 +39,6 @@ final class DoctrineTransactionService implements ITransactionService
$this->manager_name = $manager_name;
}
/**
* Execute a Closure within a transaction.
*
@ -47,25 +49,57 @@ final class DoctrineTransactionService implements ITransactionService
*/
public function transaction(Closure $callback)
{
$em = Registry::getManager($this->manager_name);
$con = $em->getConnection();
$retry = 0;
$done = false;
$result = null;
if (!$em->isOpen()) {
Log::warning("entity manager closed!, trying to re open...");
$em = $em->create($con->getConnection(), $em->getConfiguration());
$con = $em->getConnection();
while (!$done and $retry < self::MaxRetries) {
try {
$em = Registry::getManager($this->manager_name);
$con = $em->getConnection();
/**
* Some database systems close the connection after a period of time, in MySQL this is system variable
* `wait_timeout`. Given the daemon is meant to run indefinitely we need to make sure we have an open
* connection before working any job. Otherwise we would see `MySQL has gone away` type errors.
*/
if ($con->ping() === false) {
Log::warning("DoctrineTransactionService::transaction: conn is closed... reconecting");
$con->close();
$con->connect();
}
if (!$em->isOpen()) {
Log::warning("DoctrineTransactionService::transaction: entity manager is closed!, trying to re open...");
$em = Registry::resetManager($this->manager_name);
// new entity manager
$con = $em->getConnection();
}
$con->beginTransaction(); // suspend auto-commit
$result = $callback($this);
$em->flush();
$con->commit();
$done = true;
} catch (RetryableException $ex) {
Log::warning("retrying ...");
Registry::resetManager($this->manager_name);
$con->rollBack();
Log::warning($ex);
$retry++;
if ($retry === self::MaxRetries) {
throw $ex;
}
} catch (Exception $ex) {
Log::warning("rolling back transaction");
$em->close();
$con->rollBack();
Log::error($ex);
throw $ex;
}
}
try {
$con->beginTransaction(); // suspend auto-commit
$result = $callback($this);
$em->flush();
$con->commit();
} catch (\Exception $e) {
$con->rollBack();
Log::error($e);
throw $e;
}
return $result;
}
}
}