main.py
from PyWeChatSpy import WeChatSpy
from PyWeChatSpy.command import *
import time
import sys
import os
sys.path.append(os.path.dirname(__file__))
from config import Config, MyPymysqlPool
from queue import Queue
from threading import Thread
from PyWeChatSpy.proto import spy_pb2
import re
from phone import Phone
import emoji
# 先进先出队列
request_queue = Queue()
# 获取到请求时写入队列中
def get_request(data):
request_queue.put(data)
def my_parser():
while True:
data = request_queue.get()
if data.type == CHAT_MESSAGE:
chat_message = spy_pb2.ChatMessage()
chat_message.ParseFromString(data.bytes)
for message in chat_message.message:
_type = message.type # 消息类型 1.文本|3.图片...自行探索
_from = message.wxidFrom.str # 消息发送方
_to = message.wxidTo.str # 消息接收方
content = message.content.str # 消息内容
overview = message.overview # 消息缩略
timestamp = message.timestamp # 消息时间戳
if _type == 1:
handle_text_message(content, timestamp)
def handle_text_message(content, timestamp):
receive_keywords = Config().instance().get('config', 'receive_keywords').split(',')
user_id = Config().instance().get('config', 'user_id')
is_save = False
# 消息中是否含有关键词
for keyword in receive_keywords:
if keyword in content:
is_save = True
break
if is_save:
# 消息中是否含有需要屏蔽的关键词
filter_keywords = Config().instance().get('config', 'filter_keywords').split(',')
for keyword in filter_keywords:
if keyword in content:
is_save = False
break
if is_save:
# 替换群聊前缀用户信息
print(content)
content = re.sub(re.compile('^[A-Za-z0-9_]*:'), "", content)
print(content)
# 匹配电话号码
# re_mobile = "((\+?86)?|\(\+?86\))[ -]?(1\d{10}|((\d{3,4})?|\(\d{3,4}\))[ -]?\d{7,8}([ -]?\d{1,4})?)"
re_mobile = "((\+?86)?|\(\+?86\))[ -]?(1\d{10}|((\d{3,4})|\(\d{3,4}\))[ -]?\d{7,8}([ -]?\d{1,4})?)"
pattern = re.compile(re_mobile)
# 获取电话号码
tel_list = []
re_list = re.findall(pattern, content)
if len(re_list) > 0:
for tuple_ in re_list:
tel_list.append(tuple_[2])
# 替换电话和文字
content = re.sub(pattern, ",", content)
# re_str = "联系电话:|请联系:|电话:"
# content = re.sub(re_str, "", content)
# 根据电话获取归属
mobile = ''
province = ''
city = ''
try:
temp_phone = None
for phone in tel_list:
mobile = phone
mobile = str(mobile).replace(' ', "")
mobile = str(mobile).replace('-', "")
if str(mobile).startswith('+86') or str(mobile).startswith('86'):
mobile = re.sub(re.compile("^\+?86"), "", mobile)
if len(mobile) == 11 and str(mobile).startswith('1'):
temp_phone = mobile
break
# 电话未匹配到的,不要
if temp_phone is None:
print("-" * 10, "未匹配到电话,记录:%s" % content, "-" * 10)
return False
info = Phone().find(temp_phone)
if info:
mobile = info['phone']
province = info['province']
city = info['city']
# 电话未匹配到归属地的,不要
if province == '':
print("-" * 10, "未匹配到归属地,记录:%s" % content, "-" * 10)
return False
# 根据省获取地区
area = get_region(province)
if area is False:
print("-" * 10, "未匹配到地区,记录:%s" % content, "-" * 10)
return False
# 将省放入消息前面
content = province + content.strip('\n')
# 过滤表情文字
emoji_str = emoji.demojize(content)
content = re.sub(r':(.*?):', '', emoji_str).strip() # 清洗后的数据
mysql = MyPymysqlPool("db")
# 匹配数据库24小时内是否有重复消息
create_time = int(time.time()) - 48 * 3600
sql = "select * from `wj_forum` where `create_time` > %s and `content` = %s"
result = mysql.getOne(sql, [str(create_time), content])
# 数据库中已经存在,不保存
if result is not False:
print("-" * 10, "24小时内数据库已存在,该条记录不保存,记录:%s" % content, "-" * 10)
return False
# 类型1苗木求购2工程采购3特价处理4苗木价格5供求关系
mType = 1
sql = "insert into wj_forum (`user_id`,`type`,`content`,`create_time`,`region`,`mobile`) values (%s,%s,%s,%s,%s,%s)"
result = mysql.insert(sql, [str(user_id), str(mType), content, str(timestamp), area, mobile])
mysql.end()
print("-" * 10, "插入1条记录:%s" % content, "-" * 10)
except:
print("-" * 10, "手机号码未匹配到,记录:%s" % content, "-" * 10)
# def remove_emoji(text):
# emoji_pattern = re.compile(
# u"(\ud83d[\ude00-\ude4f])|" # emoticons
# u"(\ud83c[\udf00-\uffff])|" # symbols & pictographs (1 of 2)
# u"(\ud83d[\u0000-\uddff])|" # symbols & pictographs (2 of 2)
# u"(\ud83d[\ude80-\udeff])|" # transport & map symbols
# u"(\ud83c[\udde0-\uddff])" # flags (iOS)
# "+", flags=re.UNICODE
# )
#
# return emoji_pattern.sub(r'', text)
# 获取片区
def get_region(province):
region = dict({
"华东": ["山东", "江苏", "浙江", "上海", "安徽", "福建"],
"华北": ["河北", "山西", "内蒙", "北京", "天津"],
"华南": ["广东", "广西", "海南", "香港", "澳门", "台湾", "港澳台"],
"华中": ["河南", "湖南", "湖北", "江西"],
"西南": ["四川", "重庆", "云南", "贵州", "西藏"],
"西北": ["陕西", "甘肃", "宁夏", "青海", "新疆"],
"东北": ["辽宁", "吉林", "黑龙江"],
})
for area in region:
if province in region[area]:
return area
return False
if __name__ == '__main__':
wx_path = Config().get('config', 'wx_path')
spy = WeChatSpy(parser=get_request)
pid = spy.run(r"%s" % wx_path)
t_parse = Thread(target=my_parser)
t_parse.daemon = True
t_parse.name = "parse request"
t_parse.start()
input()
.bat批处理文件
::初始化批处理文件
CLS
@echo off
echo.
::进入当前目录
cd %~dp0
::设置一个变量
set python_path=%~dp0python-3.8.6-embed-amd64
::设置环境变量
set path=%path%;%python_path%;
set path=%path%;%python_path%\Scripts;
if exist %python_path%\python3.8.exe goto run
::复制文件
copy %python_path%\python.exe %python_path%\python3.8.exe
::安装pip
echo install pip ...
python3.8 get-pip.py
::安装依赖
echo install dependent ...
::python -m ensurepip && python -m pip install --upgrade pip
pip3 install -r requirements.txt
:run
echo start running ...
::echo %~dp0
::echo %path%
python3.8 main.py
::cmd /k "cd /d %~dp0 && python3.8 main.py"
pause
requirements.txt
bleach==3.1.5
certifi==2020.6.20
chardet==3.0.4
colorama==0.4.3
docutils==0.16
idna==2.10
keyring==21.2.1
packaging==20.4
pkginfo==1.5.0.1
protobuf==3.11.3
Pygments==2.6.1
pyparsing==2.4.7
pywin32-ctypes==0.2.0
readme-renderer==26.0
requests==2.24.0
requests-toolbelt==0.9.1
rfc3986==1.4.0
six==1.15.0
tqdm==4.48.0
twine==3.2.0
urllib3==1.25.10
webencodings==0.5.1
pymysql==0.10.1
dbutils==2.0
PyWeChatSpy==3.0.6.1
emoji==0.6.0
config.py
import pymysql, os, configparser
from pymysql.cursors import DictCursor
from dbutils.pooled_db import PooledDB
class Config(object):
"""
# Config().get_content("user_information")
配置文件里面的参数
[notdbMysql]
host = 192.168.1.101
port = 3306
user = root
password = python123
"""
def __init__(self, config_filename="config.ini"):
file_path = os.path.join(os.path.dirname(__file__), config_filename)
self.cf = configparser.ConfigParser()
self.cf.read(file_path, encoding="utf-8")
def get_sections(self):
return self.cf.sections()
def get_options(self, section):
return self.cf.options(section)
def get_content(self, section):
result = {}
for option in self.get_options(section):
value = self.cf.get(section, option)
result[option] = int(value) if value.isdigit() else value
return result
def get(self, section, name):
return self.cf.get(section, name)
def instance(self):
return self.cf
class BasePymysqlPool(object):
def __init__(self, host, port, user, password, db_name=None):
self.db_host = host
self.db_port = int(port)
self.user = user
self.password = str(password)
self.db = db_name
self.conn = None
self.cursor = None
class MyPymysqlPool(BasePymysqlPool):
"""
MYSQL数据库对象,负责产生数据库连接 , 此类中的连接采用连接池实现获取连接对象:conn = Mysql.getConn()
释放连接对象;conn.close()或del conn
"""
# 连接池对象
__pool = None
def __init__(self, conf_name=None):
self.conf = Config().get_content(conf_name)
super(MyPymysqlPool, self).__init__(**self.conf)
# 数据库构造函数,从连接池中取出连接,并生成操作游标
self._conn = self.__getConn()
self._cursor = self._conn.cursor()
def __getConn(self):
"""
@summary: 静态方法,从连接池中取出连接
@return MySQLdb.connection
"""
if MyPymysqlPool.__pool is None:
__pool = PooledDB(creator=pymysql,
mincached=1,
maxcached=20,
host=self.db_host,
port=self.db_port,
user=self.user,
passwd=self.password,
db=self.db,
use_unicode=False,
charset="utf8",
cursorclass=DictCursor)
return __pool.connection()
def getAll(self, sql, param=None):
"""
@summary: 执行查询,并取出所有结果集
@param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param param: 可选参数,条件列表值(元组/列表)
@return: result list(字典对象)/boolean 查询到的结果集
"""
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchall()
else:
result = False
return result
def getOne(self, sql, param=None):
"""
@summary: 执行查询,并取出第一条
@param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param param: 可选参数,条件列表值(元组/列表)
@return: result list/boolean 查询到的结果集
"""
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchone()
else:
result = False
return result
def getMany(self, sql, num, param=None):
"""
@summary: 执行查询,并取出num条结果
@param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param num:取得的结果条数
@param param: 可选参数,条件列表值(元组/列表)
@return: result list/boolean 查询到的结果集
"""
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchmany(num)
else:
result = False
return result
def insertMany(self, sql, values):
"""
@summary: 向数据表插入多条记录
@param sql:要插入的SQL格式
@param values:要插入的记录数据tuple(tuple)/list[list]
@return: count 受影响的行数
"""
count = self._cursor.executemany(sql, values)
return count
def __query(self, sql, param=None):
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
return count
def update(self, sql, param=None):
"""
@summary: 更新数据表记录
@param sql: SQL格式及条件,使用(%s,%s)
@param param: 要更新的 值 tuple/list
@return: count 受影响的行数
"""
return self.__query(sql, param)
def insert(self, sql, param=None):
"""
@summary: 更新数据表记录
@param sql: SQL格式及条件,使用(%s,%s)
@param param: 要更新的 值 tuple/list
@return: count 受影响的行数
"""
return self.__query(sql, param)
def delete(self, sql, param=None):
"""
@summary: 删除数据表记录
@param sql: SQL格式及条件,使用(%s,%s)
@param param: 要删除的条件 值 tuple/list
@return: count 受影响的行数
"""
return self.__query(sql, param)
def begin(self):
"""
@summary: 开启事务
"""
self._conn.autocommit(0)
def end(self, option='commit'):
"""
@summary: 结束事务
"""
if option == 'commit':
self._conn.commit()
else:
self._conn.rollback()
def dispose(self, isEnd=1):
"""
@summary: 释放连接池资源
"""
if isEnd == 1:
self.end('commit')
else:
self.end('rollback')
self._cursor.close()
self._conn.close()
__all__ = ["Config", 'MyPymysqlPool']
config.ini
[config]
# 接收消息中含有关键词,则写入数据库,多个关键字使用英文逗号","分隔
receive_keywords = 求购
# 含有以下关键词则屏蔽,多个关键字使用英文逗号","分隔
filter_keywords = 出售,注意,求购的注意了
# 默认发布的账户ID,请勿随意修改
user_id = 8
# 微信安装位置
wx_path = F:\swoftare\WeChat\WeChat.exe
[db]
host =
port = 3306
user =
password =
db_name =