--- /dev/null
+# -*- coding: iso-8859-1 -*-
+"""
+ MoinMoin - FusionForge groups backed
+
+ This backend assigns group membership according to FusionForge user
+ access.
+
+ @copyright: 2009 MoinMoin:DmitrijsMilajevs
+ @license: GPL, see COPYING for details
+"""
+
+from MoinMoin.datastruct.backends import LazyGroup, LazyGroupsBackend
+import logging
+import re
+
+def parse_group_name(group_name):
+ m = re.match \
+ ("FF(Site(Admins|Users)|Project_(.*)_(Admins|Writers|Readers))Group$",
+ group_name)
+ if m:
+ if m.group (1)[0:4] == 'Site':
+ return ('Site', '', m.group (2))
+ else:
+ return ('Project', m.group (3), m.group (4))
+ else:
+ return None
+
+class FFLazyGroup(LazyGroup):
+ pass
+
+class FFLazyGroups(LazyGroupsBackend):
+
+ permdict = { "Admins": ("project_admin", "> 0"),
+ "Writers": ("scm", "= 2"),
+ "Readers": ("scm", "= 1") }
+
+ def __init__(self, request, ffsa):
+ super(FFLazyGroups, self).__init__(request)
+
+ self._ffsa = ffsa
+ if request.user.valid:
+ self._username = request.user.name
+ else:
+ self._username = None
+ logging.debug ("FFLazyGroups __init__: username=%s", (self._username,))
+
+ def __contains__(self, group_name):
+ logging.debug \
+ ("FFLazyGroups __contains__: group_name=%s", (group_name,))
+
+ try:
+ (scope, project, permission) = parse_group_name (group_name)
+ except:
+ return False
+
+ if scope == "Site":
+ return True
+ elif scope == "Project":
+ return project in self._ffsa.projects
+ else:
+ return False
+
+ def __iter__(self):
+ return reduce \
+ (lambda a, b: a+b,
+ [ [ 'SiteAdmins', 'SiteUsers' ] ]
+ + map (lambda p:
+ map (lambda r: 'FFProject_%s_%sGroup' % (p, r),
+ [ 'Admins', 'Writers', 'Readers' ]),
+ self._projects))
+
+ def __getitem__(self, group_name):
+ return FFLazyGroup(self.request, group_name, self)
+
+ def _iter_group_members(self, group_name):
+ logging.debug \
+ ("FFLazyGroups _iter_group_members: group_name=%s", (group_name,))
+ try:
+ (scope, project, permission) = parse_group_name (group_name)
+ except:
+ return None
+
+ if scope == "Site":
+ if permission == "User":
+ # ??? iterator on all Forge users
+ raise NotImplemented
+ elif permission == "Admin":
+ return self._ffsa.admins.__iter__ ()
+
+ elif scope == "Project":
+ try:
+ return self._ffsa.get_permission_entries \
+ (*((project,) \
+ + FFLazyGroups.permdict[permission])).__iter__ ()
+ except:
+ return None
+
+ def _group_has_member(self, group_name, member):
+ logging.debug \
+ ("FFLazyGroups _group_has_member: group_name=%s member=%s _user=%s" \
+ % (group_name, member, self._username))
+
+ # For anonymous (non-logged-in) users, member is "" and self._username
+ # is None. For authenticated users, both are assumed to be set to
+ # the user's login. If not, we consider that we have an unexpected
+ # inconsistency, and return False.
+
+ if member == "" or member != self._username:
+ return False
+
+ try:
+ (scope, project, permission) = parse_group_name (group_name)
+ except:
+ logging.debug \
+ ("FFLazyGroups _group_has_member: False (can't parse)")
+ return False
+
+ logging.debug \
+ ("FFLazyGroups _group_has_member: scope=%s project=%s perm=%s" \
+ % (scope, project, permission))
+
+ result = False
+ if scope == "Site":
+ if permission == "Users":
+ result = self._username != None
+ elif permission == "Admins":
+ # ??? ffsa should instead provide is_super_user
+ result = self._username in self._ffsa.admins
+
+ elif scope == "Project":
+ result = self._ffsa.check_permission \
+ (*((project,) \
+ + FFLazyGroups.permdict[permission] \
+ + (self._username,)))
+
+ logging.debug \
+ ("FFLazyGroups _group_has_member: %s" % (result,))
+ return result
@license: GNU GPL, see COPYING for details.
"""
-import urllib
-import re
-import hashlib
import base64
-import subprocess
+import hashlib
+import logging
import psycopg2
+import re
+import string
+import subprocess
+import urllib
+
from MoinMoin import user
from MoinMoin.auth import _PHPsessionParser, BaseAuth
+
+class FusionForgeError(Exception):
+ def __init__(self, msg):
+ Exception.__init__(self, msg)
+ self.msg = msg
+
+ def __str__(self):
+ return "%s\n" % self.msg
+
+
class FusionForgeLink():
def get_config(self, varname, secname='core'):
if secname not in self.cachedconfig:
self.cachedconfig[secname] = {}
if varname not in self.cachedconfig[secname]:
- self.cachedconfig[secname][varname] = subprocess.Popen(["/usr/share/gforge/bin/forge_get_config", varname, secname], stdout = subprocess.PIPE).communicate()[0].rstrip('\n')
+ self.cachedconfig[secname][varname] = \
+ subprocess.Popen(["/usr/share/gforge/bin/forge_get_config",
+ varname, secname],
+ stdout=subprocess.PIPE).communicate()[0].rstrip('\n')
return self.cachedconfig[secname][varname]
def __init__(self, cookies=['session_ser'], autocreate=True):
self.database_user = self.get_config('database_user')
self.database_port = self.get_config('database_port')
self.database_password = self.get_config('database_password')
-
-
- def get_connection(self):
if (self.database_host != ''):
- return psycopg2.connect(database=self.database_name,
- user=self.database_user,
- port=self.database_port,
- password=self.database_password,
- host=self.database_host)
+ self._conn = psycopg2.connect(database=self.database_name,
+ user=self.database_user,
+ port=self.database_port,
+ password=self.database_password,
+ host=self.database_host)
else:
- return psycopg2.connect(database=self.database_name,
- user=self.database_user,
- port=self.database_port,
- password=self.database_password)
+ self._conn = psycopg2.connect(database=self.database_name,
+ user=self.database_user,
+ port=self.database_port,
+ password=self.database_password)
+ logging.debug ("FusionForgeLink: __init__ done")
- def get_projects(self):
- conn = self.get_connection()
- cur = conn.cursor()
- cur.execute("SELECT g.unix_group_name from groups g, group_plugin gp, plugins p where g.group_id = gp.group_id and gp.plugin_id = p.plugin_id and p.plugin_name = 'moinmoin'")
- projects = []
- for record in cur:
- projects.append(record[0])
- conn.close()
- return projects
+ def __del__(self):
+ self._conn.close ()
class FusionForgeSessionAuth(BaseAuth):
""" FusionForge session cookie authentication """
self.fflink = FusionForgeLink()
self.session_key = self.fflink.get_config('session_key')
- def get_super_users(self):
- conn = self.fflink.get_connection()
+ # List super users (Forge admins)
+
+ conn = self.fflink._conn
+ cur = conn.cursor()
+ cur.execute("""SELECT distinct(u.user_name)
+ FROM users u,
+ pfo_user_role pur,
+ pfo_role pr,
+ pfo_role_setting prs
+ WHERE u.user_id = pur.user_id
+ AND pur.role_id = pr.role_id
+ AND pr.role_id = prs.role_id
+ AND prs.section_name='forge_admin'
+ AND prs.perm_val = 1""")
+ self.admins = []
+ if cur.rowcount > 0:
+ self.admins = [r[0] for r in cur]
+ logging.debug ("FusionForgeSessionAuth: admins=%s", (self.admins,))
+
+ # List projects
+
+ cur.execute("""SELECT g.unix_group_name
+ FROM groups g, group_plugin gp, plugins p
+ WHERE g.group_id = gp.group_id
+ AND gp.plugin_id = p.plugin_id
+ AND p.plugin_name = 'moinmoin'""")
+ self.projects = []
+ if cur.rowcount:
+ self.projects = [r[0] for r in cur]
+ logging.debug ("FusionForgeSessionAuth: projects=%s", (self.projects,))
+
+ def get_moinmoin_acl_string(self, project_name):
+ conn = self.fflink._conn
cur = conn.cursor()
- cur.execute("SELECT distinct(u.user_name) from users u, pfo_user_role pur, pfo_role pr, pfo_role_setting prs WHERE u.user_id = pur.user_id AND pur.role_id = pr.role_id AND pr.role_id = prs.role_id AND prs.section_name='forge_admin' AND prs.perm_val >= 1")
- admins = []
- for record in cur:
- admins.append(record[0])
- conn.close()
- return admins
+
+ # Check whether this is a public project
+ # anomymous users and registered users that are not part of the project
+
+ val = cur.execute("""SELECT is_public
+ FROM groups
+ WHERE unix_group_name='%s'""" % project_name)
+ val = cur.fetchone()
+ is_public = val != None and val[0] != 0
+ cur.close ()
+
+ rights = [ 'FFSiteAdminsGroup:read,write,delete,revert,admin' ] \
+ + map (lambda (g, right):
+ 'FFProject_%s_%sGroup:%s' % (project_name, g, right),
+ { 'Admins': 'read,write,delete,revert,admin',
+ 'Writers': 'read,write,delete,revert',
+ 'Readers': 'read' }.iteritems ())
+
+ if is_public:
+ rights.append ('All:read')
+ else:
+ rights.append ('All:')
+
+ logging.debug ("FusionForgeSessionAuth.get_moinmoin_acl_string: %s",
+ (rights,))
+ return string.join (rights)
+
+ def get_permission_entries (self, project_name, section, condition, user_name = None):
+ conn = self.fflink._conn
+ cur = conn.cursor()
+
+ if user_name:
+ ucond = "u.user_name = '%s'" % (user_name)
+ else:
+ ucond = "TRUE"
+ query = """SELECT DISTINCT(u.user_name)
+ FROM users u, pfo_user_role pur, pfo_role pr,
+ pfo_role_setting prs, groups
+ WHERE %s
+ AND u.user_id = pur.user_id
+ AND pur.role_id = pr.role_id
+ AND pr.role_id = prs.role_id
+ AND prs.section_name = '%s'
+ AND groups.unix_group_name = '%s'
+ AND prs.perm_val %s
+ AND pr.home_group_id = groups.group_id""" \
+ % (ucond, section, project_name, condition)
+ logging.debug ("get_perm_entries: " + query)
+ cur.execute(query)
+ result = []
+ if cur.rowcount > 0:
+ result = [u[0] for u in cur]
+ logging.debug (" -> %s " % (result,))
+ return result
+
+ def check_permission (self, project_name, section, condition, user_name):
+ return len(self.get_permission_entries \
+ (project_name, section, condition, user_name)) > 0
def request(self, request, user_obj, **kw):
cookies = kw.get('cookie')
for cookiename in cookies:
if cookiename not in self.cookies:
continue
- cookievalue = urllib.unquote(cookies[cookiename]).decode('iso-8859-1')
+ cookievalue = \
+ urllib.unquote(cookies[cookiename]).decode('iso-8859-1')
m = re.search('(.*)-\*-(.*)', cookievalue)
if m is None:
sdata = base64.b64decode(sserial)
if hashlib.md5(sdata + self.session_key).hexdigest() != shash:
continue
-
+
m = re.search('(.*)-\*-(.*)-\*-(.*)-\*-(.*)', sdata)
if m is None:
continue
(user_id, time, ip, user_agent) = m.group(1, 2, 3, 4)
- conn = self.fflink.get_connection()
+ conn = self.fflink._conn
cur = conn.cursor()
- cur.execute("SELECT user_name, realname FROM users WHERE user_id=%s", [user_id])
+ cur.execute("""SELECT user_name, realname
+ FROM users WHERE user_id=%s""", [user_id])
(loginname, realname) = cur.fetchone()
cur.close()
- conn.close()
# MoinMoin doesn't enforce unicity of realnames
u = user.User(request, name=loginname, auth_username=loginname,
if u and self.autocreate:
u.create_or_update(True)
if u and u.valid:
+ self.auth_user = u
return u, False
+ self.auth_user = None
return None, False
-