← Back to the web2py-plugin list
Demo
| back | : | |||
| output | : |
|
Usage
-
controllers/plugin_friendship.py
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
200.
201.
202.
203.
204.
205.
206.
207.
208.
209.
210.
211.
212.
213.
214.
215.
216.
217.
218.
219.
220.
221.
222.
223.
224.
225.
226.
227.
228.
229.from plugin_friendship import Friendship
from gluon.tools import Auth
import unittest
import datetime
if request.function == 'test':
db = DAL('sqlite:memory:')
### setup core objects #########################################################
auth = Auth(db)
friendship = Friendship(db)
friendship.settings.table_edge_name = 'plugin_friendship_edge'
friendship.settings.extra_fields = {
'plugin_friendship_edge':
[Field('affinity', 'double', default=1),
Field('created_on', 'datetime', default=request.now)],
}
### define tables ##############################################################
auth.define_tables()
table_user = auth.settings.table_user
friendship.define_tables(str(table_user))
table_edge = friendship.settings.table_edge
### populate records ###########################################################
num_users = 5
user_ids = {}
for user_no in range(1, num_users+1):
email = 'user%s@test.com' % user_no
user = db(table_user.email==email).select().first()
user_ids[user_no] = user and user.id or table_user.insert(email=email)
if db(table_edge.created_on<request.now-datetime.timedelta(minutes=30)).count():
table_edge.truncate()
session.flash = 'the database has been refreshed'
redirect(URL('index'))
### demo functions #############################################################
def index():
user_no = int(request.args(0) or 1)
user_id = user_ids[user_no]
for action in ('add_friend', 'confirm_friend', 'ignore_friend', 'remove_friend'):
if action in request.vars:
getattr(friendship, action)(user_id, request.vars[action])
session.flash = action
redirect(URL('index', args=user_no))
user_chooser = []
for i in range(1, num_users+1):
if i == user_no:
user_chooser.append(SPAN('user%s' % user_no))
else:
user_chooser.append(A('user%s' % i, _href=URL('index', args=i)))
user_chooser = DIV(XML(' '.join([r.xml() for r in user_chooser])), _style='font-weight:bold')
friends = []
records = db(table_user.id.belongs(set(user_ids.values())-set([user_id]))).select(
table_user.ALL, table_edge.ALL,
left=table_edge.on(
(table_edge.friend==table_user.id) &
(table_edge.user==user_id)))
status_options = dict(table_edge.status.requires.options())
for record in records:
status = record[table_edge].status
if status is None:
option = A(T('Add friend'),
_href=URL('index', args=user_no, vars={'add_friend': record[table_user].id}))
elif status == friendship.settings.status_requesting:
option = SPAN(status_options[status], _style='color:blue;')
else:
option = SPAN(T('%s mutual friends') % len(record[table_edge].mutual_friends), ' ',
A(T('Remove friend'),
_href=URL('index', args=user_no, vars={'remove_friend': record[table_user].id})),
_style='color:green;')
friends.append(DIV(record.auth_user.email[:5], ': ', option))
friend_requests = []
records = friendship.requesting_edges_from_user(user_id).select(
table_user.ALL,
left=table_user.on(table_user.id==table_edge.user))
for record in records:
friend_requests.append(DIV(record.email[:5], ': ',
A(T('Confirm'), _href=URL('index', args=user_no, vars={'confirm_friend': record.id})), ' ',
A(T('Not now'), _href=URL('index', args=user_no, vars={'ignore_friend': record.id})),
_style='color:red;font-size:1.2em;'))
return dict(current_user=user_chooser,
friends=friends,
friend_requests=friend_requests,
unit_tests=[A('basic test', _href=URL('test'))],
)
### unit tests #################################################################
class TestFriendship(unittest.TestCase):
def setUp(self):
table_edge.truncate()
def test_add_friend(self):
friendship.add_friend(user_ids[1], user_ids[2])
self.assertEqual(friendship.requesting_edges_from_user(user_ids[1]).count(), 0)
self.assertEqual(friendship.requesting_edges_from_user(user_ids[2]).count(), 1)
self.assertEqual([r.user for r in friendship.requesting_edges_from_user(user_ids[2]).select()],
[user_ids[1]])
def test_confirm_friend(self):
friendship.add_friend(user_ids[1], user_ids[2])
friendship.confirm_friend(user_ids[2], user_ids[1])
self.assertEqual(friendship.requesting_edges_from_user(user_ids[1]).count(), 0)
self.assertEqual(friendship.requesting_edges_from_user(user_ids[2]).count(), 0)
friends = friendship.friend_edges_from_user(user_ids[1]).select()
self.assertEqual([r.friend for r in friends], [user_ids[2]])
self.assertEqual([len(r.mutual_friends) for r in friends], [0])
friends = friendship.friend_edges_from_user(user_ids[2]).select()
self.assertEqual([r.friend for r in friends], [user_ids[1]])
self.assertEqual([len(r.mutual_friends) for r in friends], [0])
def test_remove_friend(self):
friendship.add_friend(user_ids[1], user_ids[2])
friendship.confirm_friend(user_ids[2], user_ids[1])
friendship.remove_friend(user_ids[1], user_ids[2])
friends = friendship.friend_edges_from_user(user_ids[1]).select()
self.assertEqual(len(friends), 0)
friends = friendship.friend_edges_from_user(user_ids[2]).select()
self.assertEqual(len(friends), 0)
def test_mutual_friends(self):
friendship.add_friend(user_ids[1], user_ids[2])
friendship.confirm_friend(user_ids[2], user_ids[1])
friendship.add_friend(user_ids[2], user_ids[3])
friendship.confirm_friend(user_ids[3], user_ids[2])
friendship.add_friend(user_ids[3], user_ids[1])
friendship.confirm_friend(user_ids[1], user_ids[3])
for i in range(1, 4):
self.assertEqual([len(r.mutual_friends) for r in friendship.friend_edges_from_user(user_ids[i]).select()],
[1, 1])
friendship.add_friend(user_ids[1], user_ids[4])
friendship.confirm_friend(user_ids[4], user_ids[1])
friendship.add_friend(user_ids[2], user_ids[4])
friendship.confirm_friend(user_ids[4], user_ids[2])
self.assertEqual([len(r.mutual_friends) for r in friendship.friend_edges_from_user(user_ids[1]).select()],
[2, 1, 1])
self.assertEqual([len(r.mutual_friends) for r in friendship.friend_edges_from_user(user_ids[2]).select()],
[2, 1, 1])
self.assertEqual([len(r.mutual_friends) for r in friendship.friend_edges_from_user(user_ids[4]).select()],
[1, 1])
self.assertEqual([len(r.mutual_friends) for r in friendship.friend_edges_from_user(user_ids[3]).select()],
[1, 1])
friendship.remove_friend(user_ids[1], user_ids[4])
friendship.remove_friend(user_ids[2], user_ids[4])
for i in range(1, 4):
self.assertEqual([len(r.mutual_friends) for r in friendship.friend_edges_from_user(user_ids[i]).select()],
[1, 1])
friendship.add_friend(user_ids[1], user_ids[4])
friendship.confirm_friend(user_ids[4], user_ids[1])
friendship.add_friend(user_ids[2], user_ids[4])
friendship.confirm_friend(user_ids[4], user_ids[2])
friendship.add_friend(user_ids[3], user_ids[4])
friendship.confirm_friend(user_ids[4], user_ids[3])
for i in range(1, 5):
self.assertEqual([len(r.mutual_friends) for r
in friendship.friend_edges_from_user(user_ids[i]).select()],
[2, 2, 2])
friendship.add_friend(user_ids[1], user_ids[5])
friendship.confirm_friend(user_ids[5], user_ids[1])
friendship.add_friend(user_ids[2], user_ids[5])
friendship.confirm_friend(user_ids[5], user_ids[2])
friendship.add_friend(user_ids[3], user_ids[5])
friendship.confirm_friend(user_ids[5], user_ids[3])
friendship.add_friend(user_ids[4], user_ids[5])
friendship.confirm_friend(user_ids[5], user_ids[4])
for i in range(1, 6):
self.assertEqual([len(r.mutual_friends) for r
in friendship.friend_edges_from_user(user_ids[i]).select()],
[3, 3, 3, 3])
friendship.remove_friend(user_ids[1], user_ids[5])
friendship.remove_friend(user_ids[2], user_ids[5])
friendship.remove_friend(user_ids[3], user_ids[5])
friendship.remove_friend(user_ids[4], user_ids[5])
for i in range(1, 5):
self.assertEqual([len(r.mutual_friends) for r
in friendship.friend_edges_from_user(user_ids[i]).select()],
[2, 2, 2])
def test_ignore_friend(self):
friendship.add_friend(user_ids[1], user_ids[2])
friendship.ignore_friend(user_ids[2], user_ids[1])
self.assertEqual(friendship.requesting_edges_from_user(user_ids[1]).count(), 0)
self.assertEqual(friendship.requesting_edges_from_user(user_ids[2]).count(), 0)
self.assertEqual(friendship.friend_edges_from_user(user_ids[1]).count(), 0)
self.assertEqual(friendship.friend_edges_from_user(user_ids[2]).count(), 0)
def test_extra_fields(self):
friendship.add_friend(user_ids[1], user_ids[2])
friendship.confirm_friend(user_ids[2], user_ids[1])
friend_edge = friendship.get_friend_edge(user_ids[1], user_ids[2])
self.assertEqual(friend_edge.affinity, 1.0)
def run_test(TestCase):
import cStringIO
stream = cStringIO.StringIO()
suite = unittest.TestLoader().loadTestsFromTestCase(TestCase)
unittest.TextTestRunner(stream=stream, verbosity=2).run(suite)
return stream.getvalue()
def test():
return dict(back=A('back', _href=URL('index')),
output=CODE(run_test(TestFriendship)))
Source code
-
modules/plugin_friendship.py
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.from gluon import *
from gluon.storage import Storage, Messages
class Friendship(object):
def __init__(self, db):
self.db = db
settings = self.settings = Storage()
settings.extra_fields = {}
settings.onconfirm = None
settings.table_edge_name = 'friendship_edge'
settings.table_edge = None
settings.status_requesting = 'requesting'
settings.status_confirmed = 'confirmed'
messages = self.messages = Messages(current.T)
messages.requesting = 'Requesting'
messages.confirmed = 'Confirmed'
messages.label_status = 'Status'
messages.label_mutual_friends = 'Confirmed'
def define_tables(self, table_user_name, migrate=True, fake_migrate=False):
db, settings = self.db, self.settings
if not settings.table_edge_name in db.tables:
table = db.define_table(
settings.table_edge_name,
Field('user', 'reference %s' % table_user_name),
Field('friend', 'reference %s' % table_user_name),
Field('status', length=16, default=settings.status_requesting,
label=self.messages.label_status),
Field('mutual_friends', 'list:reference %s' % table_user_name,
label=self.messages.label_mutual_friends),
migrate=migrate, fake_migrate=fake_migrate,
*settings.extra_fields.get(settings.table_edge_name, []))
table.status.requires = IS_IN_SET(
[(settings.status_requesting, self.messages.requesting),
(settings.status_confirmed, self.messages.confirmed)])
settings.table_edge = db[settings.table_edge_name]
def add_friend(self, user_id, friend_id, **extra_vars):
table_edge = self.settings.table_edge
if user_id == friend_id:
raise ValueError
if self.db(table_edge.user == user_id)(table_edge.friend == friend_id).count():
raise ValueError
table_edge.insert(user=user_id, friend=friend_id, **extra_vars)
def confirm_friend(self, user_id, friend_id):
user_id, friend_id = int(user_id), int(friend_id)
db, settings, table_edge = self.db, self.settings, self.settings.table_edge
if not db(table_edge.friend == user_id)(table_edge.user == friend_id)(
table_edge.status == settings.status_requesting).count():
raise ValueError
mutual_friends = (set([r.friend for r in self.friend_edges_from_user(user_id).select(table_edge.friend)]) &
set([r.friend for r in self.friend_edges_from_user(friend_id).select(table_edge.friend)]))
for _user_id, _friend_id in ((user_id, friend_id), (friend_id, user_id)):
for mutual_friend in mutual_friends:
edge = self.friend_edges_from_user(_user_id)(
table_edge.friend == mutual_friend
).select(table_edge.id, table_edge.mutual_friends).first()
edge.mutual_friends.append(_friend_id)
edge.update_record(mutual_friends=edge.mutual_friends)
self.friend_edges_from_friend(_user_id)(
table_edge.user == mutual_friend
).update(mutual_friends=edge.mutual_friends)
updator = dict(status=settings.status_confirmed,
mutual_friends=list(mutual_friends))
db(table_edge.friend == user_id)(table_edge.user == friend_id).update(**updator)
if db(table_edge.user == user_id)(table_edge.friend == friend_id).count():
db(table_edge.user == user_id)(table_edge.friend == friend_id).update(**updator)
else:
table_edge.insert(user=user_id, friend=friend_id, **updator)
if settings.onconfirm:
settings.onconfirm(user_id, friend_id)
def requesting_edges_from_user(self, user_id):
table_edge = self.settings.table_edge
return self.db(table_edge.friend == user_id)(table_edge.status == self.settings.status_requesting)
def friend_edges_from_user(self, user_id):
table_edge = self.settings.table_edge
return self.db(table_edge.user == user_id)(
table_edge.status == self.settings.status_confirmed)
def friend_edges_from_friend(self, friend_id):
table_edge = self.settings.table_edge
return self.db(table_edge.friend == friend_id)(
table_edge.status == self.settings.status_confirmed)
def get_friend_edge(self, user_id, friend_id, *fields, **attributes):
return self.friend_edges_from_user(user_id)(self.settings.table_edge.friend == friend_id
).select(*fields, **attributes).first()
def ignore_friend(self, user_id, friend_id):
db, table_edge = self.db, self.settings.table_edge
if not db(table_edge.friend == user_id)(table_edge.user == friend_id)(
table_edge.status == self.settings.status_requesting).count():
raise ValueError
db(table_edge.friend == user_id)(table_edge.user == friend_id).delete()
def remove_friend(self, user_id, friend_id):
user_id, friend_id = int(user_id), int(friend_id)
db, settings, table_edge = self.db, self.settings, self.settings.table_edge
if not db(table_edge.user == user_id)(table_edge.friend == friend_id)(
table_edge.status == settings.status_confirmed).count():
raise ValueError
mutual_friends = (set([r.friend for r in self.friend_edges_from_user(user_id).select(table_edge.friend)]) &
set([r.friend for r in self.friend_edges_from_user(friend_id).select(table_edge.friend)]))
for _user_id, _friend_id in ((user_id, friend_id), (friend_id, user_id)):
for mutual_friend in mutual_friends:
edge = self.friend_edges_from_user(_user_id)(
table_edge.friend == mutual_friend
).select(table_edge.id, table_edge.mutual_friends).first()
edge.mutual_friends.remove(_friend_id)
edge.update_record(mutual_friends=edge.mutual_friends)
self.friend_edges_from_friend(_user_id)(
table_edge.user == mutual_friend
).update(mutual_friends=edge.mutual_friends)
db(table_edge.user == user_id)(table_edge.friend == friend_id).delete()
db(table_edge.friend == user_id)(table_edge.user == friend_id).delete()
def refresh_all_mutual_friends(self):
# ! Be careful when using the method. It will require much time.
db, table_edge = self.db, self.settings.table_edge
edges = db(table_edge.id > 0).select()
for edge in edges:
user_id = edge.user
friend_id = edge.friend
mutual_friends = list(set([r.friend for r in self.friend_edges_from_user(user_id).select(table_edge.friend)]) &
set([r.friend for r in self.friend_edges_from_user(friend_id).select(table_edge.friend)]))
db(table_edge.user == user_id)(table_edge.friend == friend_id
).update(mutual_friends=mutual_friends)
Static files
There are no static files
