[ActivityPub] Implement Failed Queue
This commit is contained in:
parent
751b23f6fe
commit
82f1fc7ca2
|
@ -44,7 +44,7 @@ class QueueHandler
|
||||||
* and the item is placed back in the queue to be re-run.
|
* and the item is placed back in the queue to be re-run.
|
||||||
*
|
*
|
||||||
* @param mixed $object
|
* @param mixed $object
|
||||||
* @return boolean true on success, false on failure
|
* @return bool true on success, false on failure
|
||||||
*/
|
*/
|
||||||
function handle($object) : bool
|
function handle($object) : bool
|
||||||
{
|
{
|
||||||
|
|
|
@ -111,7 +111,7 @@ abstract class QueueManager extends IoManager
|
||||||
*
|
*
|
||||||
* Must be implemented by any queue manager.
|
* Must be implemented by any queue manager.
|
||||||
*
|
*
|
||||||
* @param Notice $object
|
* @param mixed $object
|
||||||
* @param string $queue
|
* @param string $queue
|
||||||
*/
|
*/
|
||||||
abstract function enqueue($object, $queue);
|
abstract function enqueue($object, $queue);
|
||||||
|
|
|
@ -262,6 +262,8 @@ class ActivityPubPlugin extends Plugin
|
||||||
{
|
{
|
||||||
// Notice distribution
|
// Notice distribution
|
||||||
$qm->connect('activitypub', 'ActivityPubQueueHandler');
|
$qm->connect('activitypub', 'ActivityPubQueueHandler');
|
||||||
|
// Failed Notice distribution
|
||||||
|
$qm->connect('activitypub_failed', 'ActivityPubFailedQueueHandler');
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
158
plugins/ActivityPub/lib/activitypubfailedqueuehandler.php
Normal file
158
plugins/ActivityPub/lib/activitypubfailedqueuehandler.php
Normal file
|
@ -0,0 +1,158 @@
|
||||||
|
<?php
|
||||||
|
// This file is part of GNU social - https://www.gnu.org/software/social
|
||||||
|
//
|
||||||
|
// GNU social is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// GNU social is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with GNU social. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ActivityPub queue handler for notice distribution
|
||||||
|
*
|
||||||
|
* @package GNUsocial
|
||||||
|
* @author Diogo Cordeiro <diogo@fc.up.pt>
|
||||||
|
* @copyright 2020 Free Software Foundation, Inc http://www.fsf.org
|
||||||
|
* @license https://www.gnu.org/licenses/agpl.html GNU AGPL v3 or later
|
||||||
|
*/
|
||||||
|
|
||||||
|
defined('GNUSOCIAL') || die();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @copyright 2020 Free Software Foundation, Inc http://www.fsf.org
|
||||||
|
* @license https://www.gnu.org/licenses/agpl.html GNU AGPL v3 or later
|
||||||
|
*/
|
||||||
|
class ActivityPubFailedQueueHandler extends QueueHandler
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Getter of the queue transport name.
|
||||||
|
*
|
||||||
|
* @return string transport name
|
||||||
|
*/
|
||||||
|
public function transport(): string
|
||||||
|
{
|
||||||
|
return 'activitypub_failed';
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Notice distribution handler.
|
||||||
|
*
|
||||||
|
* @param array $to_failed [string to, Notice].
|
||||||
|
* @return bool true on success, false otherwise
|
||||||
|
* @throws HTTP_Request2_Exception
|
||||||
|
* @throws InvalidUrlException
|
||||||
|
* @throws ServerException
|
||||||
|
* @author Diogo Cordeiro <diogo@fc.up.pt>
|
||||||
|
*/
|
||||||
|
public function handle($to_failed): bool
|
||||||
|
{
|
||||||
|
[$other, $notice] = $to_failed;
|
||||||
|
if (!($notice instanceof Notice)) {
|
||||||
|
common_log(LOG_ERR, 'Got a bogus notice, not distributing');
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
$profile = $notice->getProfile();
|
||||||
|
|
||||||
|
if (!$profile->isLocal()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($notice->source == 'activity') {
|
||||||
|
common_log(LOG_ERR, "Ignoring distribution of notice:{$notice->id}: activity source");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Handling a Create?
|
||||||
|
if (ActivityUtils::compareVerbs($notice->verb, [ActivityVerb::POST, ActivityVerb::SHARE])) {
|
||||||
|
return $this->handle_create($profile, $notice, $other);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handling a Like?
|
||||||
|
if (ActivityUtils::compareVerbs($notice->verb, [ActivityVerb::FAVORITE])) {
|
||||||
|
return $this->onEndFavorNotice($profile, $notice, $other);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handling a Delete Note?
|
||||||
|
if (ActivityUtils::compareVerbs($notice->verb, [ActivityVerb::DELETE])) {
|
||||||
|
return $this->onStartDeleteOwnNotice($profile, $notice, $other);
|
||||||
|
}
|
||||||
|
} catch (Exception $e) {
|
||||||
|
// Postman already re-enqueues for us
|
||||||
|
common_debug('ActivityPub Failed Queue Handler:'.$e->getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private function handle_create($profile, $notice, $other)
|
||||||
|
{
|
||||||
|
// Handling an Announce?
|
||||||
|
if ($notice->isRepeat()) {
|
||||||
|
$repeated_notice = Notice::getKV('id', $notice->repeat_of);
|
||||||
|
if ($repeated_notice instanceof Notice) {
|
||||||
|
// That was it
|
||||||
|
$postman = new Activitypub_postman($profile, $other);
|
||||||
|
$postman->announce($notice, $repeated_notice);
|
||||||
|
}
|
||||||
|
|
||||||
|
// either made the announce or found nothing to repeat
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// That was it
|
||||||
|
$postman = new Activitypub_postman($profile, $other);
|
||||||
|
$postman->create_note($notice);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Notify remote users when their notices get favourited.
|
||||||
|
*
|
||||||
|
* @param Profile $profile of local user doing the faving
|
||||||
|
* @param Notice $notice_liked Notice being favored
|
||||||
|
* @return bool return value
|
||||||
|
* @throws HTTP_Request2_Exception
|
||||||
|
* @throws InvalidUrlException
|
||||||
|
* @author Diogo Cordeiro <diogo@fc.up.pt>
|
||||||
|
*/
|
||||||
|
public function onEndFavorNotice(Profile $profile, Notice $notice, $other)
|
||||||
|
{
|
||||||
|
$postman = new Activitypub_postman($profile, $other);
|
||||||
|
$postman->like($notice);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Notify remote users when their notices get deleted
|
||||||
|
*
|
||||||
|
* @param $user
|
||||||
|
* @param $notice
|
||||||
|
* @return bool hook flag
|
||||||
|
* @throws HTTP_Request2_Exception
|
||||||
|
* @throws InvalidUrlException
|
||||||
|
* @author Diogo Cordeiro <diogo@fc.up.pt>
|
||||||
|
*/
|
||||||
|
public function onStartDeleteOwnNotice($profile, $notice, $other)
|
||||||
|
{
|
||||||
|
// Handle delete locally either because:
|
||||||
|
// 1. There's no undo-share logic yet
|
||||||
|
// 2. The deleting user has privileges to do so (locally)
|
||||||
|
if ($notice->isRepeat() || ($notice->getProfile()->getID() != $profile->getID())) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
$postman = new Activitypub_postman($profile, $other);
|
||||||
|
$postman->delete_note($notice);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,14 +19,15 @@
|
||||||
*
|
*
|
||||||
* @package GNUsocial
|
* @package GNUsocial
|
||||||
* @author Bruno Casteleiro <brunoccast@fc.up.pt>
|
* @author Bruno Casteleiro <brunoccast@fc.up.pt>
|
||||||
* @copyright 2019 Free Software Foundation, Inc http://www.fsf.org
|
* @author Diogo Cordeiro <diogo@fc.up.pt>
|
||||||
|
* @copyright 2019-2020 Free Software Foundation, Inc http://www.fsf.org
|
||||||
* @license https://www.gnu.org/licenses/agpl.html GNU AGPL v3 or later
|
* @license https://www.gnu.org/licenses/agpl.html GNU AGPL v3 or later
|
||||||
*/
|
*/
|
||||||
|
|
||||||
defined('GNUSOCIAL') || die();
|
defined('GNUSOCIAL') || die();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @copyright 2019 Free Software Foundation, Inc http://www.fsf.org
|
* @copyright 2019-2020 Free Software Foundation, Inc http://www.fsf.org
|
||||||
* @license https://www.gnu.org/licenses/agpl.html GNU AGPL v3 or later
|
* @license https://www.gnu.org/licenses/agpl.html GNU AGPL v3 or later
|
||||||
*/
|
*/
|
||||||
class ActivityPubQueueHandler extends QueueHandler
|
class ActivityPubQueueHandler extends QueueHandler
|
||||||
|
@ -73,19 +74,24 @@ class ActivityPubQueueHandler extends QueueHandler
|
||||||
$notice->getAttentionProfiles()
|
$notice->getAttentionProfiles()
|
||||||
);
|
);
|
||||||
|
|
||||||
// Handling a Create?
|
try {
|
||||||
if (ActivityUtils::compareVerbs($notice->verb, [ActivityVerb::POST, ActivityVerb::SHARE])) {
|
// Handling a Create?
|
||||||
return $this->handle_create($profile, $notice, $other);
|
if (ActivityUtils::compareVerbs($notice->verb, [ActivityVerb::POST, ActivityVerb::SHARE])) {
|
||||||
}
|
return $this->handle_create($profile, $notice, $other);
|
||||||
|
}
|
||||||
|
|
||||||
// Handling a Like?
|
// Handling a Like?
|
||||||
if (ActivityUtils::compareVerbs($notice->verb, [ActivityVerb::FAVORITE])) {
|
if (ActivityUtils::compareVerbs($notice->verb, [ActivityVerb::FAVORITE])) {
|
||||||
return $this->onEndFavorNotice($profile, $notice, $other);
|
return $this->onEndFavorNotice($profile, $notice, $other);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handling a Delete Note?
|
// Handling a Delete Note?
|
||||||
if (ActivityUtils::compareVerbs($notice->verb, [ActivityVerb::DELETE])) {
|
if (ActivityUtils::compareVerbs($notice->verb, [ActivityVerb::DELETE])) {
|
||||||
return $this->onStartDeleteOwnNotice($profile, $notice, $other);
|
return $this->onStartDeleteOwnNotice($profile, $notice, $other);
|
||||||
|
}
|
||||||
|
} catch (Exception $e) {
|
||||||
|
// Postman handles issues with the failed queue
|
||||||
|
common_debug('ActivityPub Queue Handler:'.$e->getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -42,6 +42,7 @@ class Activitypub_postman
|
||||||
private $actor;
|
private $actor;
|
||||||
private $actor_uri;
|
private $actor_uri;
|
||||||
private $to = [];
|
private $to = [];
|
||||||
|
private $failed_to = [];
|
||||||
private $client;
|
private $client;
|
||||||
private $headers;
|
private $headers;
|
||||||
|
|
||||||
|
@ -63,6 +64,21 @@ class Activitypub_postman
|
||||||
$this->client = new HTTPClient();
|
$this->client = new HTTPClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When dear postman dies, resurrect him until he finishes what he couldn't in life
|
||||||
|
*
|
||||||
|
* @throws ServerException
|
||||||
|
* @author Diogo Cordeiro <diogo@fc.up.pt>
|
||||||
|
*/
|
||||||
|
public function __destruct()
|
||||||
|
{
|
||||||
|
$qm = QueueManager::get();
|
||||||
|
foreach($this->failed_to as $to => $activity) {
|
||||||
|
$qm->enqueue([$to, $activity], 'activitypub_failed');
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send something to remote instance
|
* Send something to remote instance
|
||||||
*
|
*
|
||||||
|
@ -215,6 +231,7 @@ class Activitypub_postman
|
||||||
$res_body = json_decode($res->getBody(), true);
|
$res_body = json_decode($res->getBody(), true);
|
||||||
$errors[] = isset($res_body['error']) ?
|
$errors[] = isset($res_body['error']) ?
|
||||||
$res_body['error'] : "An unknown error occurred.";
|
$res_body['error'] : "An unknown error occurred.";
|
||||||
|
$to_failed[$inbox] = $notice;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -250,6 +267,7 @@ class Activitypub_postman
|
||||||
$res_body = json_decode($res->getBody(), true);
|
$res_body = json_decode($res->getBody(), true);
|
||||||
$errors[] = isset($res_body['error']) ?
|
$errors[] = isset($res_body['error']) ?
|
||||||
$res_body['error'] : "An unknown error occurred.";
|
$res_body['error'] : "An unknown error occurred.";
|
||||||
|
$to_failed[$inbox] = $notice;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -284,6 +302,7 @@ class Activitypub_postman
|
||||||
$res_body = json_decode($res->getBody(), true);
|
$res_body = json_decode($res->getBody(), true);
|
||||||
$errors[] = isset($res_body['error']) ?
|
$errors[] = isset($res_body['error']) ?
|
||||||
$res_body['error'] : "An unknown error occurred.";
|
$res_body['error'] : "An unknown error occurred.";
|
||||||
|
$to_failed[$inbox] = $notice;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -315,6 +334,7 @@ class Activitypub_postman
|
||||||
$res_body = json_decode($res->getBody(), true);
|
$res_body = json_decode($res->getBody(), true);
|
||||||
$errors[] = isset($res_body['error']) ?
|
$errors[] = isset($res_body['error']) ?
|
||||||
$res_body['error'] : "An unknown error occurred.";
|
$res_body['error'] : "An unknown error occurred.";
|
||||||
|
$to_failed[$inbox] = $message;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -346,6 +366,7 @@ class Activitypub_postman
|
||||||
$res_body = json_decode($res->getBody(), true);
|
$res_body = json_decode($res->getBody(), true);
|
||||||
$errors[] = isset($res_body['error']) ?
|
$errors[] = isset($res_body['error']) ?
|
||||||
$res_body['error'] : "An unknown error occurred.";
|
$res_body['error'] : "An unknown error occurred.";
|
||||||
|
$to_failed[$inbox] = $notice;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -374,6 +395,7 @@ class Activitypub_postman
|
||||||
$res_body = json_decode($res->getBody(), true);
|
$res_body = json_decode($res->getBody(), true);
|
||||||
$errors[] = isset($res_body['error']) ?
|
$errors[] = isset($res_body['error']) ?
|
||||||
$res_body['error'] : "An unknown error occurred.";
|
$res_body['error'] : "An unknown error occurred.";
|
||||||
|
$to_failed[$inbox] = $notice;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!empty($errors)) {
|
if (!empty($errors)) {
|
||||||
|
|
|
@ -31,7 +31,7 @@ class DBQueueManager extends QueueManager
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* Saves an object reference into the queue item table.
|
* Saves an object reference into the queue item table.
|
||||||
* @return boolean true on success
|
* @return bool true on success
|
||||||
* @throws ServerException on failure
|
* @throws ServerException on failure
|
||||||
*/
|
*/
|
||||||
public function enqueue($object, $queue)
|
public function enqueue($object, $queue)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user