#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Interface avec le Wiktionnaire
"""
import os, sys
import re
import time
import httplib
import urllib, urllib2, cookielib
#import simplejson as json
#import xml.dom.minidom as dom
import md5, hashlib
# classe conteneur
class Foo:pass
interface = Foo ()
interface.logged_in = 0
interface.next_request_time = time.time ()
interface.next_check_shutoff_time = time.time ()
## CHOSES DIVERSES
interface.PROJECT = "fr.wiktionary" # on y rajoutera .org après
interface.PREFIX = "/w" # sans grand intérêt
interface.DELAY = 15 # délai minimum (en secondes) entre deux requêtes du bot
interface.SLEEP_POINT_TIME = 0.3
interface.BOT = True
interface.CHECK_SHUTOFF_DELAY = 30
## IDENTIFIANTS
interface.USER_NAME = ""
interface.USER_PASS = None
interface.EMERGENCY_SHUTOFF_PAGE = "Utilisateur:Eikubot/Arrêt"
interface.EDITTOKEN = None
## RÉCUPÉRATION DE MOT DE PASSE
def ask_password ():
try:
import keyring
assert interface.USER_NAME
# si on utilise un autre gestionnaire de mots de passe,
# il faut changer cette ligne (ex : gnome keyring).
wallet = keyring.backend.KDEKWallet ()
interface.USER_PASS = wallet.get_password ('wiktionary.org',interface.USER_NAME)
except:
from getpass import getpass
if (interface.USER_NAME == ''):
interface.USER_NAME = raw_input ("Votre identifiant?\n")
print "OK"
interface.USER_PASS = getpass ('Votre mot de passe?')
return
"""
ouvrir le portefeuille de KDE
et y récupérer le mot de passe
"""
# NE FONCTIONNE PLUS AVEC KDE4 (dcop n’existe plus)
id_opened_kwallet = os.popen ('dcop kded kwalletd open "kdewallet" kdewallet').read ()[:-1]
interface.USER_PASS = os.popen ('dcop kded kwalletd readPassword '
+id_opened_kwallet
+" Passwords "
+ interface.USER_NAME).read ()[:-1]
try: assert interface.USER_PASS, "Impossible de lire le mot de passe."
except:
interface.USER_PASS = raw_input (
"Votre mot de passe? \n"
"(attention aux regards "
"indiscrets car il apparaîtra "
"en clair):\n")
# pour gérer les cookies (honteusement copié-collé depuis la doc python)
cj = cookielib.CookieJar()
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
## FONCTION HYPER IMPORTANTE : ENVOIE UNE REQUÊTE
def demander (Commentaire="", **args):
'''Envoie une requête au serveur défini dans interface.PROJECT et
retourne la réponse au format texte. Paramètres :
Commentaire: un commentaire qui sera affiché à l’appel.
args: tout les arguments envoyés au serveur.
args est de type dictionnaire (voir doc. python).
'''
# adresse de l’API mediawiki
api_url = u"http://%s.org/w/api.php"%interface.PROJECT
# si la dernière requête a moins de <interface.DELAY> s, on attend.
bypass = args.has_key ("bypass_delay")
if bypass:
print "Bypassing delay !"
args.pop ("bypass_delay")
elif interface.next_request_time > time.time ():
print "Waiting: we don’t want to overload the MediaWiki servers (%d s)"%(interface.next_request_time-time.time()),
while interface.next_request_time > time.time ():
sys.stdout.write (".")
sys.stdout.flush ()
time.sleep (interface.SLEEP_POINT_TIME)
# C’est bon, le temps est écoulé
print "OK"
# on enregistre l’heure de lancement de la requête
interface.next_request_time = time.time () + interface.DELAY
if args.has_key ("titles"):
pages=args["titles"]
print "Une requête pour %s est lancée sur la page [[%s]]."%(Commentaire, pages)
else:
print "Une requête pour %s est lancée."%Commentaire
# quelle que soit la requête (les arguments), on veut passer "format=xml"
# parce que sinon, on pourrait demander à l’API du JSON, du YAML, du HTML
# etc., mais comme je n’arrive pas à parser le YAML et le JSON, je préfère
# l’XML.
args.update ({"format": "xml"})
# Le header. Je ne suis pas sûr que ça serve à quoi que ce soit.
headers = {"User-Agent": "EB-net_interface/1.0"}
# URL + requête + headers sont transformés en un objet requête par urllib2
request = urllib2.Request (api_url, urllib.urlencode (args, headers))
# l’objet requête est « ouvert » et lu avec l’opener qu’on a créé avec
# le bocal à cookies. C’est compliqué pour moi mais on finit par s’y
# retrouver.
return opener.open (request).read ()
## LOGGING IN / S’IDENTIFIER
def logging_in (force=False):
if not interface.USER_PASS: ask_password ()
if interface.logged_in and not force: return
reponse = demander ("s’identifier.", action="login", lgname=interface.USER_NAME, lgpassword=interface.USER_PASS)
try:
# on regarde si on est logué
# j’adore les regex, et ne comptez pas sur des commentaires, hinhin.
interface.logged_in = 0
if re.search ('(?s)<login[^>]*?result="(.*?)"', reponse).group (1) == "Success":
interface.token = re.search ('(?s)<login[^>]*?lgtoken="(.*?)"', reponse).group (1)
interface.logged_in = 1
elif re.search ('(?s)<login[^>]*?result="(.*?)"', reponse).group (1) == "NeedToken":
interface.token = re.search ('(?s)<login[^>]*?token="(.*?)"', reponse).group (1)
reponse = demander ("s’identifier (étape 2).",
action='login',
lgname=interface.USER_NAME,
lgpassword=interface.USER_PASS,
lgtoken=interface.token,
bypass_delay=1)
if re.search ('(?s)<login[^>]*?result="(.*?)"', reponse).group (1) == "Success":
interface.logged_in = 1
print reponse
except AttributeError:
# ça veut généralement dire que le login a échoué, alors on s’arrête là
# (pas la peine de s’embêter à chercher un WrongPassword ou un Throttle,
# on sait déjà que c’est loupé)
assert 0, "Échec du login : \n%s"%reponse
print "logué :D !" if interface.logged_in else "Pas logué :("
def verifier_arret_urgence ():
if interface.next_check_shutoff_time > time.time ():
print "Will check shutoff in %d seconds."%(interface.next_check_shutoff_time-time.time())
return False
print "\033[36mverifier_arret_urgence () appelé.\033[0m"
interface.next_check_shutoff_time = time.time () + interface.CHECK_SHUTOFF_DELAY
txt1 = demander ("vérifier si quelqu’un a demandé l’arrêt d’urgence.",
bypass_delay="", action="query", prop="revisions", rvprop="content", titles=interface.EMERGENCY_SHUTOFF_PAGE)
match = re.search ("(?s)<rev(?: .*?>|>)(.*?)</rev>", txt1)
if not match:
assert re.search ("(?s)<rev(?: [^>]*/>|/>)", txt1), "Erreur de regex"
return False
txt = match.group(1)
if re.search ("(?i)Arr(ê|e)t d(’|')urgence", txt): return txt
else: return False
## LES OBJETS PAGE : POUR LIRE ET ÉCRIRE DES PAGES DU WIKI
class Page:
'''
Créer un objet Page ne crée pas la page sur le wiki (ne soyons pas si impatients).
Paramètre :
name : le nom de la page sur le wiki.
Exemple :
ma_page = Page ("Discuter:se la couler douce")
Méthodes intéressantes:
exists () : retourne True ou False selon que la page existe sur le wiki
read () : retourne le contenu de la page si elle existe
write (commentaire, contenu) : remplace le contenu de la page
par celui spécifié, avec <commentaire> comme commentaire
d’édition.
create (commentaire, contenu) : crée la page si elle n’existe pas.
append_section (self, titre, contenu) : ajoute une nouvelle section à la
page, avec comme titre <titre>, comme commentaire, <titre>
aussi, et comme contenu, <contenu>.
'''
def __init__ (self, name):
self.name = name
self.starttimestamp = None
self.basetimestamp = None
self.edittoken = interface.EDITTOKEN
#print self.edittoken
self.existence = None
if self.edittoken == None: self.exists ()
def exists (self):
try: logging_in ()
except AssertionError, msg:
print "Erreur: %s"%msg
if self.existence==None:
return self.test_exists ()
else: return self.existence
def test_exists (self):
txt = demander ( "test_exists ()",
action = "query",
prop = "info|revisions",
rvprop = "timestamp",
intoken = "edit",
titles = self.name)
#exists = not p.getElementsByTagName ("page").pop (0).attributes.has_key ('missing')
self.edittoken = re.search ('<page[^>]* edittoken="(.*?)"', txt).group (1)
#print "edittoken: %s"%self.edittoken
self.existence = not re.search ('<page[^>]* missing=""', txt)
if not self.existence: print txt; return False
timestampsearch = re.search ('<page[^>]* starttimestamp="(.*?)"[^>]* edittoken="(.*?)"', txt)
self.starttimestamp = timestampsearch.group (1)
self.edittoken = timestampsearch.group (2)
interface.EDITTOKEN = self.edittoken
self.basetimestamp = re.search ('<rev[^>]* timestamp="(.*?)"', txt).group (1)
return True
def read (self):
assert self.exists (), "La page n’existe pas."
txt= demander ("read()", action="query", prop="revisions", rvprop="content", titles=self.name)
return re.search ("(?s)<rev(?: .*?>|>)(.*?)</rev>", txt).group(1)
def prepare_mediawiki_edit_request (self, comment, content):
"""
returns a dictionary with the default arguments to the MediaWiki request
"""
std_request = {
"action": "edit",
"title": self.name,
"text": content,
"token": self.edittoken,
"summary": comment,
"basetimestamp": self.basetimestamp,
"starttimestamp": self.starttimestamp,
"md5": hashlib.md5(content).hexdigest ()
}
if interface.BOT: std_request ["bot"] = "1"
return std_request
def write (self, comment, content, minor=None):
self.exists ()
request = self.prepare_mediawiki_edit_request (comment, content)
request["watchlist"] = "watch"
if minor: request ["minor"] = "1"
txt = demander ("write (%s)"%comment[0:50], **request)
assert txt, "ERREUR: la fonction write () n’a rien retourné."
if re.search ('<edit( .*)? result="Success"', txt): ret=True;print "OK"
else: ret=False; print "ÉCHEC de write () : \n\n%s"%txt
return ret, txt
def create (self, comment, content):
#theoretically unnecessary
#if self.exists (): return
request = self.prepare_mediawiki_edit_request (comment, content)
request ["createonly"] = "1"
request ["watchlist"] = "watch"
txt = demander ("create (%s)"%comment[0:50], **request)
if re.search ('<edit( .*)? result="Success"', txt): ret=True;print "OK"
else: ret=False; print "ÉCHEC de create () : \n\n%s"%txt
try: assert txt, "ERREUR: la fonction create () n’a rien retourné."
except AssertionError, msg: ret=False; print msg
return ret, txt
def append_section (self, title, content):
self.exists ()
request = self.prepare_mediawiki_edit_request (title, content)
request ["section"] = "new"
request ["watchlist"] = "watch"
txt = demander ("append_section (%s)"%title, **request)
return self.write_result (txt, "append_section")
def write_result (self, txt, func_name):
if re.search ('<edit( .*)? result="Success"', txt): ret=True;print "OK"
else: ret=False; print "ÉCHEC de %s () : \n\n%s"%(func_name, txt)
try: assert txt, "ERREUR: la fonction %s () n’a rien retourné."%func_name
except AssertionError, msg: ret=False; print msg
return ret, txt
""" # Mes premiers tests
a = Page ("Utilisateur:Eiku/brouillons/test")
if a.exists (): print "a existe"
print "-----Contenu de a:-----\n%s"%a.read ()
a.write ("Encore un test de mon bot",
"Je vais bien finir par arriver à faire une section correcte, b.*el !"
"\n\n:(et à me loguer, accessoirement) --~~~~")
a.write ("(bot) test: remplacement de page (ici, blanchir)", "")
b = Page ("Utilisateur:Eiku/brouillons/créationpage")
if b.exists (): print "b existe"
else: print "b n’existe pas"
b.create ("(bot) test: création de page", "= Création de page =\nTest réussi ?")
"""
if __name__ == "__main__":
try: pass
except KeyboardInterrupt: print "\033[31mLeaving: User typed Ctrl+C.\033[0m"