← Back to the web2py-plugin list
Demo
| current_user | : | |
| message | : | |
| threads | : | |
| unit_tests | : |
Usage
-
controllers/plugin_messaging.py
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.from plugin_messaging import Messaging
from gluon.tools import Auth
import unittest
import datetime
if request.function == 'test':
db = DAL('sqlite:memory:')
### setup core objects #########################################################
auth = Auth(db)
messaging = Messaging(db)
messaging.settings.table_thread_name = 'plugin_messaging_thread'
messaging.settings.table_message_name = 'plugin_messaging_message'
messaging.settings.extra_fields = {
'plugin_messaging_thread':
[Field('created_on', 'datetime', default=request.now)],
}
### define tables ##############################################################
auth.define_tables()
table_user = auth.settings.table_user
messaging.define_tables(str(table_user))
table_thread = messaging.settings.table_thread
table_message = messaging.settings.table_message
### populate records ###########################################################
num_users = 3
user_ids = {}
for user_no in range(1, num_users+1):
email = 'user%s@test.com' % user_no
user = db(auth.settings.table_user.email==email).select().first()
user_ids[user_no] = user and user.id or auth.settings.table_user.insert(email=email)
deleted = db(table_thread.created_on<request.now-datetime.timedelta(minutes=30)).delete()
if deleted:
table_thread.truncate()
table_message.truncate()
session.flash = 'the database has been refreshed'
redirect(URL('index'))
### demo functions #############################################################
def index():
user_no = int(request.args(0) or 1)
user_id = user_ids[user_no]
if not request.args(1):
user_chooser = []
for i in range(1, num_users+1):
if i == user_no:
user_chooser.append(SPAN('user%s' % user_no))
else:
user_chooser.append(A('user%s' % i, _href=URL('index', args=i)))
user_chooser = DIV(XML(' '.join([r.xml() for r in user_chooser])), _style='font-weight:bold')
records = messaging.threads_from_user(user_id).select(
table_user.ALL, table_thread.ALL,
left=(table_user.on(table_user.id==table_thread.receiver)))
threads = []
status_options = dict(table_thread.status.requires.options())
for record in records:
el = A(record[table_user].email[:5],
_href=URL('index', args=[user_no, record[table_user].email[4]]))
status = record[table_thread].status
if status == messaging.settings.status_unread:
el = SPAN(el, ' ', status_options[status], _style='background:silver')
threads.append(el)
table_thread.receiver.requires = IS_IN_SET(
[(i, 'user%s' % i) for i in range(1, num_users+1) if i != user_no])
form = SQLFORM.factory(table_thread.receiver, table_message.body_text)
if form.accepts(request.vars, session):
messaging.add_message(user_id, user_ids[int(form.vars.receiver)], form.vars.body_text)
session.flash = T('Record Created')
redirect(URL('index', args=[user_no, form.vars.receiver]))
return dict(current_user=user_chooser,
message=form,
threads=threads,
unit_tests=[A('basic test', _href=URL('test'))],
)
else:
receiver_no = request.args(1)
receiver_id = user_ids[int(receiver_no)]
thread = messaging.get_thread(user_id, receiver_id)
if thread.status == messaging.settings.status_unread:
thread.update_record(status=messaging.settings.status_read)
records = messaging.messages_from_thread(thread.id).select(
table_user.ALL, table_message.ALL,
orderby=~table_message.id,
left=(table_user.on(table_user.id==table_message.user)))
messages = []
for record in reversed(records):
messages.append(LI(EM(record[table_user].email[:5], _href='#'), ' ', SPAN(record[table_message].body_text)))
messages = UL(*messages)
form = SQLFORM.factory(table_message.body_text)
if form.accepts(request.vars, session):
messaging.add_message(user_id, receiver_id, form.vars.body_text)
session.flash = T('Record Created')
redirect(URL('index', args=request.args))
return dict(back=A('back', _href=URL('index', args=user_no)),
message_to='user%s' % receiver_no,
messages=messages,
reply=form,
)
### unit tests #################################################################
class TestMessaging(unittest.TestCase):
def setUp(self):
table_thread.truncate()
table_message.truncate()
def test_add_message(self):
user_id = 1
receiver_id = 2
self.assertEqual(messaging.threads_from_user(user_id).count(), 0)
body_text = 'test'
messaging.add_message(user_id, receiver_id, body_text)
user_thread = messaging.get_thread(user_id, receiver_id)
self.assertEqual(user_thread.status, messaging.settings.status_read)
message = messaging.messages_from_thread(user_thread.id).select().first()
self.assertEqual(message.user, user_id)
self.assertEqual(message.body_text, body_text)
receiver_thread = messaging.get_thread(receiver_id, user_id)
self.assertEqual(receiver_thread.status, messaging.settings.status_unread)
message = messaging.messages_from_thread(receiver_thread.id).select().first()
self.assertEqual(message.user, user_id)
self.assertEqual(message.body_text, body_text)
body_text = 'test2'
messaging.add_message(user_id, receiver_id, body_text)
self.assertEqual(messaging.messages_from_thread(user_thread.id).count(), 2)
self.assertEqual(messaging.messages_from_thread(receiver_thread.id).count(), 2)
def test_remove_messages(self):
user_id = 1
receiver_id = 2
for i in range(3):
messaging.add_message(user_id, receiver_id, 'test')
messaging.remove_messages(user_id, receiver_id)
self.assertEqual(messaging.threads_from_user(user_id).count(), 0)
for i in range(3):
messaging.add_message(user_id, receiver_id, 'test')
user_thread = messaging.get_thread(user_id, receiver_id)
message_ids = [r.id for r in messaging.messages_from_thread(user_thread.id).select()]
messaging.remove_messages(user_id, receiver_id, message_ids[1:])
self.assertEqual(messaging.messages_from_thread(user_thread.id).count(), 1)
def run_test(TestCase):
import cStringIO
stream = cStringIO.StringIO()
suite = unittest.TestLoader().loadTestsFromTestCase(TestCase)
unittest.TextTestRunner(stream=stream, verbosity=2).run(suite)
return stream.getvalue()
def test():
return dict(back=A('back', _href=URL('index')),
output=CODE(run_test(TestMessaging)))
Source code
-
modules/plugin_messaging.py
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.from gluon import *
from gluon.storage import Storage, Messages
class Messaging(object):
def __init__(self, db):
self.db = db
settings = self.settings = Storage()
settings.onmessage = None
settings.extra_fields = {}
settings.select_fields = []
settings.select_attributes = {}
settings.table_message_name = 'messaging_message'
settings.table_message = None
settings.table_thread_name = 'messaging_thread'
settings.table_thread = None
settings.status_read = 'read'
settings.status_unread = 'unread'
messages = self.messages = Messages(current.T)
messages.read = 'Already read'
messages.unread = 'Unread'
messages.label_status = 'Status'
messages.label_body_text = 'Body text'
messages.label_receiver = 'Receiver'
def define_tables(self, table_user_name, migrate=True, fake_migrate=False):
db, settings = self.db, self.settings
if not settings.table_thread_name in db.tables:
table = db.define_table(
settings.table_thread_name,
Field('user', 'reference %s' % table_user_name),
Field('receiver', 'reference %s' % table_user_name,
label=self.messages.label_receiver),
Field('status', length=16, default=settings.status_read,
label=self.messages.label_status),
migrate=migrate, fake_migrate=fake_migrate,
*settings.extra_fields.get(settings.table_thread_name, []))
table.status.requires = IS_IN_SET(
[(settings.status_read, self.messages.read),
(settings.status_unread, self.messages.unread)])
settings.table_thread = db[settings.table_thread_name]
if not settings.table_message_name in db.tables:
settings.table_message = db.define_table(
settings.table_message_name,
Field('user', 'reference %s' % table_user_name),
Field('thread', 'reference %s' % settings.table_thread_name),
Field('body_text', 'text',
label=self.messages.label_body_text),
Field('forward', 'text'), # TODO
migrate=migrate, fake_migrate=fake_migrate,
*settings.extra_fields.get(settings.table_message_name, []))
settings.table_message = db[settings.table_message_name]
def threads_from_user(self, user_id):
return self.db(self.settings.table_thread.user == user_id)
def get_thread(self, user_id, receiver_id, *fields, **attributes):
return self.threads_from_user(user_id)(self.settings.table_thread.receiver == receiver_id
).select(*fields, **attributes).first()
def messages_from_thread(self, thread_id):
return self.db(self.settings.table_message.thread == thread_id)
def add_message(self, user_id, receiver_id, body_text, forward_message_ids=None, **extra):
db, settings = self.db, self.settings
if str(user_id) == str(receiver_id):
raise ValueError
user_thread = self.get_thread(user_id, receiver_id)
if not user_thread:
user_thread_id = settings.table_thread.insert(user=user_id, receiver=receiver_id)
else:
user_thread_id = user_thread.id
receiver_thread = self.get_thread(receiver_id, user_id)
if not receiver_thread:
receiver_thread_id = settings.table_thread.insert(user=receiver_id, receiver=user_id,
status=settings.status_unread)
else:
if receiver_thread.status != settings.status_unread:
receiver_thread.update_record(status=settings.status_unread)
receiver_thread_id = receiver_thread.id
forward = None
if forward_message_ids:
pass # TODO
settings.table_message.insert(user=user_id, thread=user_thread_id,
body_text=body_text, forward=forward, **extra)
settings.table_message.insert(user=user_id, thread=receiver_thread_id,
body_text=body_text, forward=forward, **extra)
if settings.onmessage:
settings.onmessage(user_id, receiver_id)
def remove_messages(self, user_id, receiver_id, message_ids=None):
settings = self.settings
if not message_ids:
self.db(settings.table_thread.user == user_id)(settings.table_thread.receiver == receiver_id
).delete()
else:
thread = self.get_thread(user_id, receiver_id)
if not thread:
raise ValueError
self.messages_from_thread(thread.id)(settings.table_message.id.belongs(message_ids)
).delete()
Static files
There are no static files
