在现代 Web 应用中,实时通知系统是提升用户体验的关键功能。本文将带你一步步实现一个完整的 Django 实时通知系统,涵盖站内通知、邮件推送、WebSocket 实时消息等多种通知方式,通过 channels
、django-notifications-hq
、django-anymail
和 Mailgun 的深度集成,构建一个高性能、可扩展的通知解决方案。
一、系统架构概述 我们将构建的通知系统包含以下核心组件:
Django :作为主框架,处理业务逻辑和数据存储。
channels :提供 WebSocket 支持,实现实时消息推送。
django-notifications-hq :管理站内通知,存储通知数据并提供查询接口。
django-anymail + Mailgun :实现邮件通知,确保高送达率。
Redis :作为 channels 的消息队列和缓存,支持多服务器部署。
系统工作流程:
用户触发通知事件(如评论、点赞)。
django-notifications-hq
将通知存储到数据库。
通过 channels 的 WebSocket 实时推送给在线用户。
同时通过 django-anymail
和 Mailgun 发送邮件通知。
二、环境准备与安装 1. 安装必要的 Python 包 1 pip install channels django-notifications-hq django-anymail redis websockets
2. 安装并启动 Redis 服务 1 2 3 4 5 6 7 8 sudo apt-get install redis-serversudo systemctl start redis-serversudo systemctl enable redis-server brew install redis brew services start redis
3. 注册 Mailgun 账号
访问 Mailgun 官网 注册账号。
添加并验证发件域名(如 mg.yourdomain.com
),配置 DNS 记录。
获取 API 密钥(key-xxxxxxxxxxxx
)和 SMTP 凭据。
三、Django 项目配置 1. 修改 settings.py 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 INSTALLED_APPS = [ 'channels' , 'notifications' , 'anymail' , 'your_app' , ] ASGI_APPLICATION = 'your_project.asgi.application' CHANNEL_LAYERS = { "default" : { "BACKEND" : "channels_redis.core.RedisChannelLayer" , "CONFIG" : { "hosts" : [("127.0.0.1" , 6379 )], }, }, } ANYMAIL = { "MAILGUN_API_KEY" : "your-mailgun-api-key" , "MAILGUN_SENDER_DOMAIN" : "mg.yourdomain.com" , } EMAIL_BACKEND = "anymail.backends.mailgun.EmailBackend" DEFAULT_FROM_EMAIL = "通知系统 <notifications@mg.yourdomain.com" NOTIFICATIONS_BACKENDS = [ 'notifications.backends.db.DatabaseBackend' , 'your_app.backends.EmailNotificationBackend' , ]
2. 创建 ASGI 应用 在项目根目录创建 asgi.py
:
1 2 3 4 5 6 7 8 9 10 11 import osfrom django.core.asgi import get_asgi_applicationfrom channels.routing import ProtocolTypeRouteros.environ.setdefault('DJANGO_SETTINGS_MODULE' , 'your_project.settings' ) application = ProtocolTypeRouter({ "http" : get_asgi_application(), "websocket" : })
四、配置 django-notifications-hq 1. 创建自定义邮件后端 在你的应用目录下创建 backends.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from django.core.mail import send_mailfrom notifications.backends.base import BaseBackendclass EmailNotificationBackend (BaseBackend ): def send (self, notification, *args, **kwargs ): recipient = notification.recipient subject = f"【新通知】{notification.verb} " message = ( f"你好,{recipient.username} !\n\n" f"{notification.description} \n\n" f"点击查看:https://yourdomain.com/notifications/" ) send_mail( subject=subject, message=message, from_email=None , recipient_list=[recipient.email], fail_silently=False , )
2. 执行数据库迁移 1 python manage.py migrate
五、配置 channels 实现实时通知 1. 创建 WebSocket 消费者 在你的应用目录下创建 consumers.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 import jsonfrom channels.generic.websocket import AsyncWebsocketConsumerfrom asgiref.sync import sync_to_asyncfrom django.contrib.auth.models import Userclass NotificationConsumer (AsyncWebsocketConsumer ): async def connect (self ): user = self .scope["user" ] if user.is_anonymous: await self .close() return self .group_name = f"user_{user.id } _notifications" await self .channel_layer.group_add( self .group_name, self .channel_name ) await self .accept() unread_count = await self .get_unread_count(user) await self .send(text_data=json.dumps({ "type" : "unread_count" , "count" : unread_count })) async def disconnect (self, close_code ): await self .channel_layer.group_discard( self .group_name, self .channel_name ) async def notify_message (self, event ): message = event["message" ] await self .send(text_data=json.dumps({ "type" : "notification" , "data" : message })) @sync_to_async def get_unread_count (self, user ): return user.notifications.unread().count()
2. 配置 WebSocket 路由 在你的应用目录下创建 routing.py
:
1 2 3 4 5 6 7 from django.urls import re_pathfrom .consumers import NotificationConsumerwebsocket_urlpatterns = [ re_path(r'ws/notifications/$' , NotificationConsumer.as_asgi()), ]
3. 更新 ASGI 应用配置 修改项目根目录的 asgi.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import osfrom django.core.asgi import get_asgi_applicationfrom channels.routing import ProtocolTypeRouter, URLRouterfrom channels.auth import AuthMiddlewareStackimport your_app.routingos.environ.setdefault('DJANGO_SETTINGS_MODULE' , 'your_project.settings' ) application = ProtocolTypeRouter({ "http" : get_asgi_application(), "websocket" : AuthMiddlewareStack( URLRouter( your_app.routing.websocket_urlpatterns ) ), })
六、实现通知触发机制 1. 创建信号处理 在你的应用目录下创建 signals.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from django.db.models.signals import post_savefrom django.dispatch import receiverfrom notifications.models import Notificationfrom asgiref.sync import async_to_syncfrom channels.layers import get_channel_layer@receiver(post_save, sender=Notification ) def send_notification_to_websocket (sender, instance, created, **kwargs ): if created: recipient = instance.recipient channel_layer = get_channel_layer() notification_data = { "id" : instance.id , "verb" : instance.verb, "description" : instance.description, "timestamp" : instance.timestamp.isoformat(), "unread" : instance.unread, } async_to_sync(channel_layer.group_send)( f"user_{recipient.id } _notifications" , { "type" : "notify_message" , "message" : notification_data } )
2. 注册信号 在你的应用的 apps.py
中注册信号:
1 2 3 4 5 6 7 8 9 from django.apps import AppConfigclass YourAppConfig (AppConfig ): default_auto_field = 'django.db.models.BigAutoField' name = 'your_app' def ready (self ): import your_app.signals
七、前端实现 1. 连接 WebSocket 在前端模板中添加 WebSocket 连接代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 <script> document .addEventListener ('DOMContentLoaded' , function ( ) { if (window .user_id ) { const wsProtocol = window .location .protocol === 'https:' ? 'wss:' : 'ws:' ; const wsUrl = `${wsProtocol} //${window .location.host} /ws/notifications/` ; const socket = new WebSocket (wsUrl); socket.onopen = function (e ) { console .log ('WebSocket 连接成功' ); }; socket.onmessage = function (e ) { const data = JSON .parse (e.data ); if (data.type === 'unread_count' ) { updateNotificationCount (data.count ); } else if (data.type === 'notification' ) { addNewNotification (data.data ); updateNotificationCount (data.count ); } }; socket.onclose = function (e ) { console .log ('WebSocket 连接关闭,尝试重连...' ); setTimeout (() => { document .addEventListener ('DOMContentLoaded' , function ( ) {}); }, 3000 ); }; } function updateNotificationCount (count ) { const countElement = document .getElementById ('notification-count' ); if (countElement) { countElement.textContent = count; if (count > 0 ) { countElement.classList .remove ('hidden' ); } else { countElement.classList .add ('hidden' ); } } } function addNewNotification (notification ) { const notificationsList = document .getElementById ('notifications-list' ); if (notificationsList) { const notificationElement = document .createElement ('div' ); notificationElement.className = 'notification-item unread' ; notificationElement.innerHTML = ` <a href="${notification.url} "> <span>${notification.verb} </span> <span class="timestamp">${notification.timestamp} </span> </a> ` ; notificationsList.prepend (notificationElement); } } }); </script>
2. 在模板中显示通知 1 2 3 4 5 6 7 8 9 10 <div class ="notification-icon" > <i class ="fa fa-bell" > </i > <span id ="notification-count" class ="hidden" > 0</span > </div > <div id ="notifications-list" > </div >
八、测试与验证 1. 启动开发服务器和 channels worker 1 2 3 4 5 python manage.py runserver python manage.py runworker
2. 发送测试通知 在视图或 shell 中测试通知发送:
1 2 3 4 5 6 7 8 9 10 11 12 13 from django.contrib.auth.models import Userfrom notifications import notifysender = User.objects.get(username='admin' ) recipient = User.objects.get(username='test_user' ) notify.send( sender=sender, recipient=recipient, verb='发送了一条消息给你' , description='请查看消息详情' )
3. 验证结果
数据库 :检查 notifications_notification
表是否有新记录。
WebSocket :打开浏览器控制台,查看是否收到实时通知。
邮件 :检查收件箱是否收到通知邮件。
九、性能优化与扩展 1. 使用 Celery 处理异步任务 将邮件发送和 WebSocket 推送改为异步任务:
1 pip install celery redis
配置 Celery:
1 2 3 4 5 6 7 8 9 import osfrom celery import Celeryos.environ.setdefault('DJANGO_SETTINGS_MODULE' , 'your_project.settings' ) app = Celery('your_project' ) app.config_from_object('django.conf:settings' , namespace='CELERY' ) app.autodiscover_tasks()
修改信号处理:
1 2 3 4 5 6 7 from .tasks import send_notification_to_websocket@receiver(post_save, sender=Notification ) def trigger_notification_tasks (sender, instance, created, **kwargs ): if created: send_notification_to_websocket.delay(instance.id )
创建异步任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from celery import shared_taskfrom notifications.models import Notificationfrom asgiref.sync import async_to_syncfrom channels.layers import get_channel_layer@shared_task def send_notification_to_websocket (notification_id ): try : notification = Notification.objects.get(id =notification_id) recipient = notification.recipient channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( f"user_{recipient.id } _notifications" , { "type" : "notify_message" , "message" : { "id" : notification.id , "verb" : notification.verb, "description" : notification.description, "timestamp" : notification.timestamp.isoformat(), } } ) except Notification.DoesNotExist: pass
2. 配置邮件模板 创建 HTML 邮件模板:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 <!DOCTYPE html > <html > <head > <style > .container { max-width : 600px ; margin : 0 auto; padding : 20px ; } .header { background : #f5f5f5 ; padding : 15px ; text-align : center; } .content { padding : 20px ; border : 1px solid #eee ; } .footer { margin-top : 20px ; text-align : center; color : #888 ; } .btn { display : inline-block; background : #007bff ; color : white; padding : 10px 15px ; text-decoration : none; border-radius : 4px ; } </style > </head > <body > <div class ="container" > <div class ="header" > <h2 > 新通知</h2 > </div > <div class ="content" > <p > 你好,{{ user.username }}!</p > <p > {{ description }}</p > <p > <a href ="{{ url }}" class ="btn" > 查看详情</a > </p > </div > <div class ="footer" > <p > 你可以在设置中调整通知偏好</p > </div > </div > </body > </html >
修改邮件后端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from django.core.mail import EmailMultiAlternativesfrom django.template.loader import render_to_stringclass EmailNotificationBackend (BaseBackend ): def send (self, notification, *args, **kwargs ): recipient = notification.recipient subject = f"【新通知】{notification.verb} " html_content = render_to_string('email_notification.html' , { 'user' : recipient, 'description' : notification.description, 'url' : f'https://yourdomain.com/notifications/{notification.id } /' }) msg = EmailMultiAlternatives( subject=subject, body=notification.description, to=[recipient.email] ) msg.attach_alternative(html_content, "text/html" ) msg.send()
十、常见问题排查 1. WebSocket 连接失败
检查 Redis 服务是否正常运行:redis-cli ping
应返回 PONG
。
确认 CHANNEL_LAYERS
配置正确,Redis 地址和端口无误。
检查浏览器控制台错误信息,可能是跨域问题或路径错误。
2. 邮件发送失败
验证 Mailgun API 密钥和发件域名是否正确。
检查 Mailgun 控制台的日志,查看具体错误原因。
确认接收者邮箱地址有效,无拼写错误。
3. 通知未触发
检查信号处理是否正确注册,apps.py
中的 ready()
方法是否导入信号模块。
验证 NOTIFICATIONS_BACKENDS
配置是否包含你的自定义后端。
总结 通过本文的完整方案,你已成功构建了一个功能全面的 Django 实时通知系统,包括:
站内通知 :使用 django-notifications-hq
存储和管理通知。
实时推送 :通过 channels
和 WebSocket 实现即时消息推送。
邮件通知 :集成 django-anymail
和 Mailgun 确保高送达率。
异步处理 :利用 Redis 和 Celery 提升系统性能。
这个系统具有良好的扩展性,你可以根据需求添加更多通知渠道(如短信、APP 推送),或定制更复杂的通知规则和用户偏好设置。 希望本文能帮助你快速上手 Django 实时通知系统的开发。如果你有任何问题或建议,欢迎在评论区交流讨论!
评论