Clean up formatting and help linter calm down

This commit is contained in:
Agatha Rose 2021-06-05 00:38:36 +03:00
parent dd78364f2d
commit a904587b32
No known key found for this signature in database
GPG key ID: 2DB18BA2E0A80BC3
4 changed files with 92 additions and 81 deletions

View file

@ -8,12 +8,13 @@ from bs4 import BeautifulSoup
from random import randint
import re, multiprocessing, sqlite3, shutil, os, html
def make_sentence(output, cfg):
class nlt_fixed(markovify.NewlineText): #modified version of NewlineText that never rejects sentences
def test_sentence_input(self, sentence):
return True #all sentences are valid <3
shutil.copyfile("toots.db", "toots-copy.db") #create a copy of the database because reply.py will be using the main one
def make_sentence(output, cfg):
class nlt_fixed(markovify.NewlineText): # modified version of NewlineText that never rejects sentences
def test_sentence_input(self, sentence):
return True # all sentences are valid <3
shutil.copyfile("toots.db", "toots-copy.db") # create a copy of the database because reply.py will be using the main one
db = sqlite3.connect("toots-copy.db")
db.text_factory = str
c = db.cursor()
@ -35,8 +36,6 @@ def make_sentence(output, cfg):
db.close()
os.remove("toots-copy.db")
toots_str = None
if cfg['limit_length']:
sentence_len = randint(cfg['length_lower_limit'], cfg['length_upper_limit'])
@ -59,22 +58,24 @@ def make_sentence(output, cfg):
output.send(sentence)
def make_toot(cfg):
toot = None
pin, pout = multiprocessing.Pipe(False)
p = multiprocessing.Process(target = make_sentence, args = [pout, cfg])
p = multiprocessing.Process(target=make_sentence, args=[pout, cfg])
p.start()
p.join(5) #wait 5 seconds to get something
if p.is_alive(): #if it's still trying to make a toot after 5 seconds
p.join(5) # wait 5 seconds to get something
if p.is_alive(): # if it's still trying to make a toot after 5 seconds
p.terminate()
p.join()
else:
toot = pin.recv()
if toot == None:
if toot is None:
toot = "Toot generation failed! Contact Lynne (lynnesbian@fedi.lynnesbian.space) for assistance."
return toot
def extract_toot(toot):
toot = html.unescape(toot) # convert HTML escape codes to text
soup = BeautifulSoup(toot, "html.parser")
@ -87,7 +88,7 @@ def extract_toot(toot):
for ht in soup.select("a.hashtag"): # convert hashtags from links to text
ht.unwrap()
for link in soup.select("a"): #ocnvert <a href='https://example.com>example.com</a> to just https://example.com
for link in soup.select("a"): # convert <a href='https://example.com>example.com</a> to just https://example.com
if 'href' in link:
# apparently not all a tags have a href, which is understandable if you're doing normal web stuff, but on a social media platform??
link.replace_with(link["href"])

12
gen.py
View file

@ -8,9 +8,11 @@ import argparse, json, re
import functions
parser = argparse.ArgumentParser(description='Generate and post a toot.')
parser.add_argument('-c', '--cfg', dest='cfg', default='config.json', nargs='?',
parser.add_argument(
'-c', '--cfg', dest='cfg', default='config.json', nargs='?',
help="Specify a custom location for config.json.")
parser.add_argument('-s', '--simulate', dest='simulate', action='store_true',
parser.add_argument(
'-s', '--simulate', dest='simulate', action='store_true',
help="Print the toot without actually posting it. Use this to make sure your bot's actually working.")
args = parser.parse_args()
@ -32,10 +34,10 @@ if __name__ == '__main__':
toot = re.sub(r"[\[\]\(\)\{\}\"“”«»„]", "", toot)
if not args.simulate:
try:
client.status_post(toot, visibility = 'unlisted', spoiler_text = cfg['cw'])
except Exception as err:
client.status_post(toot, visibility='unlisted', spoiler_text=cfg['cw'])
except Exception:
toot = "An error occurred while submitting the generated post. Contact lynnesbian@fedi.lynnesbian.space for assistance."
client.status_post(toot, visibility = 'unlisted', spoiler_text = "Error!")
client.status_post(toot, visibility='unlisted', spoiler_text="Error!")
try:
print(toot)
except UnicodeEncodeError:

53
main.py
View file

@ -5,20 +5,19 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from mastodon import Mastodon, MastodonUnauthorizedError
from os import path
from bs4 import BeautifulSoup
import os, sqlite3, signal, sys, json, re, shutil, argparse
import sqlite3, signal, sys, json, re, argparse
import requests
import functions
parser = argparse.ArgumentParser(description='Log in and download posts.')
parser.add_argument('-c', '--cfg', dest='cfg', default='config.json', nargs='?',
parser.add_argument(
'-c', '--cfg', dest='cfg', default='config.json', nargs='?',
help="Specify a custom location for config.json.")
args = parser.parse_args()
scopes = ["read:statuses", "read:accounts", "read:follows", "write:statuses", "read:notifications", "write:accounts"]
#cfg defaults
# cfg defaults
cfg = {
"site": "https://botsin.space",
@ -48,7 +47,8 @@ if not cfg['site'].startswith("https://") and not cfg['site'].startswith("http:/
if "client" not in cfg:
print("No application info -- registering application with {}".format(cfg['site']))
client_id, client_secret = Mastodon.create_app("mstdn-ebooks",
client_id, client_secret = Mastodon.create_app(
"mstdn-ebooks",
api_base_url=cfg['site'],
scopes=scopes,
website="https://github.com/Lynnesbian/mstdn-ebooks")
@ -60,8 +60,9 @@ if "client" not in cfg:
if "secret" not in cfg:
print("No user credentials -- logging in to {}".format(cfg['site']))
client = Mastodon(client_id = cfg['client']['id'],
client_secret = cfg['client']['secret'],
client = Mastodon(
client_id=cfg['client']['id'],
client_secret=cfg['client']['secret'],
api_base_url=cfg['site'])
print("Open this URL and authenticate to give mstdn-ebooks access to your bot's account: {}".format(client.auth_request_url(scopes=scopes)))
@ -69,14 +70,16 @@ if "secret" not in cfg:
json.dump(cfg, open(args.cfg, "w+"))
def extract_toot(toot):
toot = functions.extract_toot(toot)
toot = toot.replace("@", "@\u200B") #put a zws between @ and username to avoid mentioning
toot = toot.replace("@", "@\u200B") # put a zws between @ and username to avoid mentioning
return(toot)
client = Mastodon(
client_id=cfg['client']['id'],
client_secret = cfg['client']['secret'],
client_secret=cfg['client']['secret'],
access_token=cfg['secret'],
api_base_url=cfg['site'])
@ -89,7 +92,7 @@ except MastodonUnauthorizedError:
following = client.account_following(me.id)
db = sqlite3.connect("toots.db")
db.text_factory=str
db.text_factory = str
c = db.cursor()
c.execute("CREATE TABLE IF NOT EXISTS `toots` (sortid INTEGER UNIQUE PRIMARY KEY AUTOINCREMENT, id VARCHAR NOT NULL, cw INT NOT NULL DEFAULT 0, userid VARCHAR NOT NULL, uri VARCHAR NOT NULL, content VARCHAR NOT NULL)")
c.execute("CREATE TRIGGER IF NOT EXISTS `dedup` AFTER INSERT ON toots FOR EACH ROW BEGIN DELETE FROM toots WHERE rowid NOT IN (SELECT MIN(sortid) FROM toots GROUP BY uri ); END; ")
@ -115,7 +118,7 @@ if not found:
c.execute("CREATE TABLE `toots_temp` (sortid INTEGER UNIQUE PRIMARY KEY AUTOINCREMENT, id VARCHAR NOT NULL, cw INT NOT NULL DEFAULT 0, userid VARCHAR NOT NULL, uri VARCHAR NOT NULL, content VARCHAR NOT NULL)")
for f in following:
user_toots = c.execute("SELECT * FROM `toots` WHERE userid LIKE ? ORDER BY id", (f.id,)).fetchall()
if user_toots == None:
if user_toots is None:
continue
if columns[-1] == "cw":
@ -131,11 +134,13 @@ if not found:
db.commit()
def handleCtrlC(signal, frame):
print("\nPREMATURE EVACUATION - Saving chunks")
db.commit()
sys.exit(1)
signal.signal(signal.SIGINT, handleCtrlC)
patterns = {
@ -150,7 +155,7 @@ def insert_toot(oii, acc, post, cursor): # extracted to prevent duplication
pid = patterns["pid"].search(oii['object']['id']).group(0)
cursor.execute("REPLACE INTO toots (id, cw, userid, uri, content) VALUES (?, ?, ?, ?, ?)", (
pid,
1 if (oii['object']['summary'] != None and oii['object']['summary'] != "") else 0,
1 if (oii['object']['summary'] is not None and oii['object']['summary'] != "") else 0,
acc.id,
oii['object']['id'],
post
@ -159,16 +164,16 @@ def insert_toot(oii, acc, post, cursor): # extracted to prevent duplication
for f in following:
last_toot = c.execute("SELECT id FROM `toots` WHERE userid LIKE ? ORDER BY sortid DESC LIMIT 1", (f.id,)).fetchone()
if last_toot != None:
if last_toot is not None:
last_toot = last_toot[0]
else:
last_toot = 0
print("Downloading posts for user @{}, starting from {}".format(f.acct, last_toot))
#find the user's activitypub outbox
# find the user's activitypub outbox
print("WebFingering...")
instance = patterns["handle"].search(f.acct)
if instance == None:
if instance is None:
instance = patterns["url"].search(cfg['site']).group(1)
else:
instance = instance.group(1)
@ -182,13 +187,13 @@ for f in following:
r = requests.get("https://{}/.well-known/host-meta".format(instance), timeout=10)
# 2. use webfinger to find user's info page
uri = patterns["uri"].search(r.text).group(1)
uri = uri.format(uri = "{}@{}".format(f.username, instance))
uri = uri.format(uri="{}@{}".format(f.username, instance))
r = requests.get(uri, headers={"Accept": "application/json"}, timeout=10)
j = r.json()
found = False
for link in j['links']:
if link['rel'] == 'self':
#this is a link formatted like "https://instan.ce/users/username", which is what we need
# this is a link formatted like "https://instan.ce/users/username", which is what we need
uri = link['href']
found = True
break
@ -227,7 +232,7 @@ for f in following:
while not done and len(j['orderedItems']) > 0:
for oi in j['orderedItems']:
if oi['type'] != "Create":
continue #this isn't a toot/post/status/whatever, it's a boost or a follow or some other activitypub thing. ignore
continue # this isn't a toot/post/status/whatever, it's a boost or a follow or some other activitypub thing. ignore
# its a toost baby
content = oi['object']['content']
@ -236,8 +241,8 @@ for f in following:
try:
if pleroma:
if c.execute("SELECT COUNT(*) FROM toots WHERE uri LIKE ?", (oi['object']['id'],)).fetchone()[0] > 0:
#we've caught up to the notices we've already downloaded, so we can stop now
#you might be wondering, "lynne, what if the instance ratelimits you after 40 posts, and they've made 60 since main.py was last run? wouldn't the bot miss 20 posts and never be able to see them?" to which i reply, "i know but i don't know how to fix it"
# we've caught up to the notices we've already downloaded, so we can stop now
# you might be wondering, "lynne, what if the instance ratelimits you after 40 posts, and they've made 60 since main.py was last run? wouldn't the bot miss 20 posts and never be able to see them?" to which i reply, "i know but i don't know how to fix it"
done = True
continue
if 'lang' in cfg:
@ -245,13 +250,13 @@ for f in following:
if oi['object']['contentMap'][cfg['lang']]: # filter for language
insert_toot(oi, f, toot, c)
except KeyError:
#JSON doesn't have contentMap, just insert the toot irregardlessly
# JSON doesn't have contentMap, just insert the toot irregardlessly
insert_toot(oi, f, toot, c)
else:
insert_toot(oi, f, toot, c)
pass
except:
pass #ignore any toots that don't successfully go into the DB
pass # ignore any toots that don't successfully go into the DB
# get the next/previous page
try:
@ -285,6 +290,6 @@ for f in following:
print("Done!")
db.commit()
db.execute("VACUUM") #compact db
db.execute("VACUUM") # compact db
db.commit()
db.close()

View file

@ -4,12 +4,12 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import mastodon
import random, re, json, argparse
import re, json, argparse
import functions
from bs4 import BeautifulSoup
parser = argparse.ArgumentParser(description='Reply service. Leave running in the background.')
parser.add_argument('-c', '--cfg', dest='cfg', default='config.json', nargs='?',
parser.add_argument(
'-c', '--cfg', dest='cfg', default='config.json', nargs='?',
help="Specify a custom location for config.json.")
args = parser.parse_args()
@ -22,16 +22,18 @@ client = mastodon.Mastodon(
access_token=cfg['secret'],
api_base_url=cfg['site'])
def extract_toot(toot):
text = functions.extract_toot(toot)
text = re.sub(r"^@[^@]+@[^ ]+\s*", r"", text) #remove the initial mention
text = text.lower() #treat text as lowercase for easier keyword matching (if this bot uses it)
text = re.sub(r"^@[^@]+@[^ ]+\s*", r"", text) # remove the initial mention
text = text.lower() # treat text as lowercase for easier keyword matching (if this bot uses it)
return text
class ReplyListener(mastodon.StreamListener):
def on_notification(self, notification): #listen for notifications
if notification['type'] == 'mention': #if we're mentioned:
acct = "@" + notification['account']['acct'] #get the account's @
def on_notification(self, notification): # listen for notifications
if notification['type'] == 'mention': # if we're mentioned:
acct = "@" + notification['account']['acct'] # get the account's @
post_id = notification['status']['id']
# check if we've already been participating in this thread
@ -44,7 +46,7 @@ class ReplyListener(mastodon.StreamListener):
posts = 0
for post in context['ancestors']:
if post['account']['id'] == me:
pin = post["id"] #Only used if pin is called, but easier to call here
pin = post["id"] # Only used if pin is called, but easier to call here
posts += 1
if posts >= cfg['max_thread_length']:
# stop replying
@ -52,12 +54,12 @@ class ReplyListener(mastodon.StreamListener):
return
mention = extract_toot(notification['status']['content'])
if (mention == "pin") or (mention == "unpin"): #check for keywords
if (mention == "pin") or (mention == "unpin"): # check for keywords
print("Found pin/unpin")
#get a list of people the bot is following
# get a list of people the bot is following
validusers = client.account_following(me)
for user in validusers:
if user["id"] == notification["account"]["id"]: #user is #valid
if user["id"] == notification["account"]["id"]: # user is #valid
print("User is valid")
visibility = notification['status']['visibility']
if visibility == "public":
@ -65,22 +67,23 @@ class ReplyListener(mastodon.StreamListener):
if mention == "pin":
print("pin received, pinning")
client.status_pin(pin)
client.status_post("Toot pinned!", post_id, visibility=visibility, spoiler_text = cfg['cw'])
client.status_post("Toot pinned!", post_id, visibility=visibility, spoiler_text=cfg['cw'])
else:
print("unpin received, unpinning")
client.status_post("Toot unpinned!", post_id, visibility=visibility, spoiler_text = cfg['cw'])
client.status_post("Toot unpinned!", post_id, visibility=visibility, spoiler_text=cfg['cw'])
client.status_unpin(pin)
else:
print("User is not valid")
else:
toot = functions.make_toot(cfg) #generate a toot
toot = acct + " " + toot #prepend the @
print(acct + " says " + mention) #logging
toot = functions.make_toot(cfg) # generate a toot
toot = acct + " " + toot # prepend the @
print(acct + " says " + mention) # logging
visibility = notification['status']['visibility']
if visibility == "public":
visibility = "unlisted"
client.status_post(toot, post_id, visibility=visibility, spoiler_text = cfg['cw']) #send toost
print("replied with " + toot) #logging
client.status_post(toot, post_id, visibility=visibility, spoiler_text=cfg['cw']) # send toost
print("replied with " + toot) # logging
rl = ReplyListener()
client.stream_user(rl) #go!
client.stream_user(rl) # go!