使用PyWechatSpy爬取聊天记录入库

main.py

from PyWeChatSpy import WeChatSpy
from PyWeChatSpy.command import *
import time
import sys
import os
sys.path.append(os.path.dirname(__file__))
from config import Config, MyPymysqlPool
from queue import Queue
from threading import Thread
from PyWeChatSpy.proto import spy_pb2
import re
from phone import Phone
import emoji

# 先进先出队列
request_queue = Queue()


# 获取到请求时写入队列中
def get_request(data):
    request_queue.put(data)


def my_parser():
    while True:
        data = request_queue.get()

        if data.type == CHAT_MESSAGE:
            chat_message = spy_pb2.ChatMessage()
            chat_message.ParseFromString(data.bytes)

            for message in chat_message.message:
                _type = message.type            # 消息类型 1.文本|3.图片...自行探索
                _from = message.wxidFrom.str    # 消息发送方
                _to = message.wxidTo.str        # 消息接收方
                content = message.content.str   # 消息内容
                overview = message.overview     # 消息缩略
                timestamp = message.timestamp   # 消息时间戳

                if _type == 1:
                    handle_text_message(content, timestamp)


def handle_text_message(content, timestamp):
    receive_keywords = Config().instance().get('config', 'receive_keywords').split(',')
    user_id = Config().instance().get('config', 'user_id')

    is_save = False
    # 消息中是否含有关键词
    for keyword in receive_keywords:
        if keyword in content:
            is_save = True
            break

    if is_save:
        # 消息中是否含有需要屏蔽的关键词
        filter_keywords = Config().instance().get('config', 'filter_keywords').split(',')
        for keyword in filter_keywords:
            if keyword in content:
                is_save = False
                break

    if is_save:
        # 替换群聊前缀用户信息
        print(content)
        content = re.sub(re.compile('^[A-Za-z0-9_]*:'), "", content)
        print(content)
        # 匹配电话号码
        # re_mobile = "((\+?86)?|\(\+?86\))[ -]?(1\d{10}|((\d{3,4})?|\(\d{3,4}\))[ -]?\d{7,8}([ -]?\d{1,4})?)"
        re_mobile = "((\+?86)?|\(\+?86\))[ -]?(1\d{10}|((\d{3,4})|\(\d{3,4}\))[ -]?\d{7,8}([ -]?\d{1,4})?)"
        pattern = re.compile(re_mobile)

        # 获取电话号码
        tel_list = []
        re_list = re.findall(pattern, content)
        if len(re_list) > 0:
            for tuple_ in re_list:
                tel_list.append(tuple_[2])

        # 替换电话和文字
        content = re.sub(pattern, ",", content)
        # re_str = "联系电话:|请联系:|电话:"
        # content = re.sub(re_str, "", content)


        # 根据电话获取归属
        mobile = ''
        province = ''
        city = ''
        try:
            temp_phone = None
            for phone in tel_list:
                mobile = phone
                mobile = str(mobile).replace(' ', "")
                mobile = str(mobile).replace('-', "")
                if str(mobile).startswith('+86') or str(mobile).startswith('86'):
                    mobile = re.sub(re.compile("^\+?86"), "", mobile)

                if len(mobile) == 11 and str(mobile).startswith('1'):
                    temp_phone = mobile
                    break

            # 电话未匹配到的,不要
            if temp_phone is None:
                print("-" * 10, "未匹配到电话,记录:%s" % content, "-" * 10)
                return False

            info = Phone().find(temp_phone)
            if info:
                mobile = info['phone']
                province = info['province']
                city = info['city']

            # 电话未匹配到归属地的,不要
            if province == '':
                print("-" * 10, "未匹配到归属地,记录:%s" % content, "-" * 10)
                return False

            # 根据省获取地区
            area = get_region(province)
            if area is False:
                print("-" * 10, "未匹配到地区,记录:%s" % content, "-" * 10)
                return False

            # 将省放入消息前面
            content = province + content.strip('\n')
            # 过滤表情文字
            emoji_str = emoji.demojize(content)
            content = re.sub(r':(.*?):', '', emoji_str).strip()  # 清洗后的数据

            mysql = MyPymysqlPool("db")
            # 匹配数据库24小时内是否有重复消息
            create_time = int(time.time()) - 48 * 3600
            sql = "select * from `wj_forum` where `create_time` > %s and `content` = %s"
            result = mysql.getOne(sql, [str(create_time), content])
            # 数据库中已经存在,不保存
            if result is not False:
                print("-" * 10, "24小时内数据库已存在,该条记录不保存,记录:%s" % content, "-" * 10)
                return False

            # 类型1苗木求购2工程采购3特价处理4苗木价格5供求关系
            mType = 1
            sql = "insert into wj_forum (`user_id`,`type`,`content`,`create_time`,`region`,`mobile`) values (%s,%s,%s,%s,%s,%s)"
            result = mysql.insert(sql, [str(user_id), str(mType), content, str(timestamp), area, mobile])
            mysql.end()
            print("-" * 10, "插入1条记录:%s" % content, "-" * 10)
        except:
            print("-" * 10, "手机号码未匹配到,记录:%s" % content, "-" * 10)


# def remove_emoji(text):
#     emoji_pattern = re.compile(
#         u"(\ud83d[\ude00-\ude4f])|"  # emoticons
#         u"(\ud83c[\udf00-\uffff])|"  # symbols & pictographs (1 of 2)
#         u"(\ud83d[\u0000-\uddff])|"  # symbols & pictographs (2 of 2)
#         u"(\ud83d[\ude80-\udeff])|"  # transport & map symbols
#         u"(\ud83c[\udde0-\uddff])"  # flags (iOS)
#         "+", flags=re.UNICODE
#     )
#
#     return emoji_pattern.sub(r'', text)


# 获取片区
def get_region(province):
    region = dict({
        "华东": ["山东", "江苏", "浙江", "上海", "安徽", "福建"],
        "华北": ["河北", "山西", "内蒙", "北京", "天津"],
        "华南": ["广东", "广西", "海南", "香港", "澳门", "台湾", "港澳台"],
        "华中": ["河南", "湖南", "湖北", "江西"],
        "西南": ["四川", "重庆", "云南", "贵州", "西藏"],
        "西北": ["陕西", "甘肃", "宁夏", "青海", "新疆"],
        "东北": ["辽宁", "吉林", "黑龙江"],
    })
    for area in region:
        if province in region[area]:
            return area

    return False


if __name__ == '__main__':
    wx_path = Config().get('config', 'wx_path')
    spy = WeChatSpy(parser=get_request)
    pid = spy.run(r"%s" % wx_path)

    t_parse = Thread(target=my_parser)
    t_parse.daemon = True
    t_parse.name = "parse request"
    t_parse.start()

    input()

.bat批处理文件

::初始化批处理文件
CLS
@echo off
echo.

::进入当前目录
cd %~dp0
::设置一个变量
set python_path=%~dp0python-3.8.6-embed-amd64
::设置环境变量
set path=%path%;%python_path%;
set path=%path%;%python_path%\Scripts;

if exist %python_path%\python3.8.exe goto run

::复制文件
copy %python_path%\python.exe %python_path%\python3.8.exe
::安装pip
echo install pip ...
python3.8 get-pip.py
::安装依赖
echo install dependent ...
::python -m ensurepip && python -m pip install --upgrade pip
pip3 install -r requirements.txt

:run
echo start running ...
::echo %~dp0
::echo %path%
python3.8 main.py
::cmd /k "cd /d %~dp0 && python3.8 main.py"
pause

requirements.txt

bleach==3.1.5
certifi==2020.6.20
chardet==3.0.4
colorama==0.4.3
docutils==0.16
idna==2.10
keyring==21.2.1
packaging==20.4
pkginfo==1.5.0.1
protobuf==3.11.3
Pygments==2.6.1
pyparsing==2.4.7
pywin32-ctypes==0.2.0
readme-renderer==26.0
requests==2.24.0
requests-toolbelt==0.9.1
rfc3986==1.4.0
six==1.15.0
tqdm==4.48.0
twine==3.2.0
urllib3==1.25.10
webencodings==0.5.1
pymysql==0.10.1
dbutils==2.0
PyWeChatSpy==3.0.6.1
emoji==0.6.0

config.py

import pymysql, os, configparser
from pymysql.cursors import DictCursor
from dbutils.pooled_db import PooledDB


class Config(object):
    """
    # Config().get_content("user_information")

    配置文件里面的参数
    [notdbMysql]
    host = 192.168.1.101
    port = 3306
    user = root
    password = python123
    """

    def __init__(self, config_filename="config.ini"):
        file_path = os.path.join(os.path.dirname(__file__), config_filename)
        self.cf = configparser.ConfigParser()
        self.cf.read(file_path, encoding="utf-8")


    def get_sections(self):
        return self.cf.sections()

    def get_options(self, section):
        return self.cf.options(section)

    def get_content(self, section):
        result = {}
        for option in self.get_options(section):
            value = self.cf.get(section, option)
            result[option] = int(value) if value.isdigit() else value
        return result

    def get(self, section, name):
        return self.cf.get(section, name)

    def instance(self):
        return self.cf


class BasePymysqlPool(object):
    def __init__(self, host, port, user, password, db_name=None):
        self.db_host = host
        self.db_port = int(port)
        self.user = user
        self.password = str(password)
        self.db = db_name
        self.conn = None
        self.cursor = None


class MyPymysqlPool(BasePymysqlPool):
    """
    MYSQL数据库对象,负责产生数据库连接 , 此类中的连接采用连接池实现获取连接对象:conn = Mysql.getConn()
            释放连接对象;conn.close()或del conn
    """
    # 连接池对象
    __pool = None

    def __init__(self, conf_name=None):
        self.conf = Config().get_content(conf_name)
        super(MyPymysqlPool, self).__init__(**self.conf)
        # 数据库构造函数,从连接池中取出连接,并生成操作游标
        self._conn = self.__getConn()
        self._cursor = self._conn.cursor()

    def __getConn(self):
        """
        @summary: 静态方法,从连接池中取出连接
        @return MySQLdb.connection
        """
        if MyPymysqlPool.__pool is None:
            __pool = PooledDB(creator=pymysql,
                              mincached=1,
                              maxcached=20,
                              host=self.db_host,
                              port=self.db_port,
                              user=self.user,
                              passwd=self.password,
                              db=self.db,
                              use_unicode=False,
                              charset="utf8",
                              cursorclass=DictCursor)
        return __pool.connection()

    def getAll(self, sql, param=None):
        """
        @summary: 执行查询,并取出所有结果集
        @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
        @param param: 可选参数,条件列表值(元组/列表)
        @return: result list(字典对象)/boolean 查询到的结果集
        """
        if param is None:
            count = self._cursor.execute(sql)
        else:
            count = self._cursor.execute(sql, param)
        if count > 0:
            result = self._cursor.fetchall()
        else:
            result = False
        return result

    def getOne(self, sql, param=None):
        """
        @summary: 执行查询,并取出第一条
        @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
        @param param: 可选参数,条件列表值(元组/列表)
        @return: result list/boolean 查询到的结果集
        """
        if param is None:
            count = self._cursor.execute(sql)
        else:
            count = self._cursor.execute(sql, param)
        if count > 0:
            result = self._cursor.fetchone()
        else:
            result = False
        return result

    def getMany(self, sql, num, param=None):
        """
        @summary: 执行查询,并取出num条结果
        @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
        @param num:取得的结果条数
        @param param: 可选参数,条件列表值(元组/列表)
        @return: result list/boolean 查询到的结果集
        """
        if param is None:
            count = self._cursor.execute(sql)
        else:
            count = self._cursor.execute(sql, param)
        if count > 0:
            result = self._cursor.fetchmany(num)
        else:
            result = False
        return result

    def insertMany(self, sql, values):
        """
        @summary: 向数据表插入多条记录
        @param sql:要插入的SQL格式
        @param values:要插入的记录数据tuple(tuple)/list[list]
        @return: count 受影响的行数
        """
        count = self._cursor.executemany(sql, values)
        return count

    def __query(self, sql, param=None):
        if param is None:
            count = self._cursor.execute(sql)
        else:
            count = self._cursor.execute(sql, param)
        return count

    def update(self, sql, param=None):
        """
        @summary: 更新数据表记录
        @param sql: SQL格式及条件,使用(%s,%s)
        @param param: 要更新的  值 tuple/list
        @return: count 受影响的行数
        """
        return self.__query(sql, param)

    def insert(self, sql, param=None):
        """
        @summary: 更新数据表记录
        @param sql: SQL格式及条件,使用(%s,%s)
        @param param: 要更新的  值 tuple/list
        @return: count 受影响的行数
        """
        return self.__query(sql, param)

    def delete(self, sql, param=None):
        """
        @summary: 删除数据表记录
        @param sql: SQL格式及条件,使用(%s,%s)
        @param param: 要删除的条件 值 tuple/list
        @return: count 受影响的行数
        """
        return self.__query(sql, param)

    def begin(self):
        """
        @summary: 开启事务
        """
        self._conn.autocommit(0)

    def end(self, option='commit'):
        """
        @summary: 结束事务
        """
        if option == 'commit':
            self._conn.commit()
        else:
            self._conn.rollback()

    def dispose(self, isEnd=1):
        """
        @summary: 释放连接池资源
        """
        if isEnd == 1:
            self.end('commit')
        else:
            self.end('rollback')
        self._cursor.close()
        self._conn.close()


__all__ = ["Config", 'MyPymysqlPool']

config.ini

[config]

# 接收消息中含有关键词,则写入数据库,多个关键字使用英文逗号","分隔
receive_keywords = 求购

# 含有以下关键词则屏蔽,多个关键字使用英文逗号","分隔
filter_keywords = 出售,注意,求购的注意了

# 默认发布的账户ID,请勿随意修改
user_id = 8

# 微信安装位置
wx_path = F:\swoftare\WeChat\WeChat.exe

[db]
host = 
port = 3306
user = 
password = 
db_name = 
项目目录结构

参考资料:https://zhuanlan.zhihu.com/p/118674498

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇