import datetime import threading import paho.mqtt.client as mqtt from requests_toolbelt.adapters.source import SourceAddressAdapter import json import requests import io # 客户端id client_id='python_mqqt_syncdevice111' # 连接回调函数 def on_connect(client, userdata, flags, rc): """一旦连接成功, 回调此方法""" rc_status = ["连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户名或密码错误", "无授权"] print("connect:", rc_status[rc]) topics = ["syncdevice", "syncvirtualdevice", "device"] # 要订阅的主题列表 for topic in topics: # 订阅主题 print("订阅主题:",topic) client.subscribe(topic) def messageData(msg): topic=msg.topic print("处理消息,时间:" + str(datetime.datetime.now())) data = json.loads(msg.payload.decode()) print("数据",data) try: if data: s = requests.Session() # 指定特定网卡的IP地址'192.168.121.1',使用该网卡发送POST请求 #192.168.8.134 是内网卡所在的路由器IP,由这个IP发送数据到192.168.101.11 s.mount('http://', SourceAddressAdapter('192.168.8.134')) map = { "tenantId": 81} if topic=="syncdevice": deviceLogList=data["deviceLogList"] print("数据对象==",deviceLogList) for device in deviceLogList: filePath=device["filePath"] filePath = filePath.replace("\\", "/") url="http://192.168.101.11:9000/detect/api/data/upload/remote" fileUrl='http://106.13.50.125:9999/detect' + filePath #本地测试ip #res = requests.get('http://192.168.1.51:9099/detect' + filePath) #file = io.BytesIO(res.content) res = requests.get(fileUrl) file = io.BytesIO(res.content) files = {"file": file} mapPath={"path":filePath} response=s.post(url=url, data=mapPath,files=files) message = response.text print("音频接口返回值:", message) print("音频结束时间:" + str(datetime.datetime.now())) if "userPhoto" in data: userPhoto= data["userPhoto"] if userPhoto: for user in userPhoto: filePath = user["url"] filePath = filePath.replace("\\", "/") url = "http://192.168.101.11:9000/detect/api/data/upload/remote" res1 = requests.get('http://106.13.50.125:9999/detect' + filePath) # 本地测试ip #res = requests.get('http://192.168.1.51:9099/detect' + filePath) file1 = io.BytesIO(res1.content) files1={"file":file1} mapPath = {"path": filePath} response = s.post(url=url, data=mapPath, files=files1) message = response.text print("图片接口返回值:", message) print("图片结束时间:" + str(datetime.datetime.now())) #将数据上传到内网 url = "http://192.168.101.11:9000/detect/admin/inspect/task_user/synchrodata" response = s.post(url=url, json=data,headers={'Content-Type':'application/json'}) message = response.text print("同步数据接口返回值:", message) print("同步数据结束时间:" + str(datetime.datetime.now())) map["type"] = 2 map["annotation"] = message elif topic=="syncvirtualdevice": # 将数据上传到内网 url = "http://192.168.101.11:9000/detect/admin/iot/device/snycdevice/receive" response = s.post(url=url, json=data, headers={'Content-Type': 'application/json'}) message = response.text print("同步设备数据接口返回值:", message) map["type"] = 3 map["annotation"] = message elif topic=="device": deviceLogDTOS = data["deviceLogDTOS"] print("数据对象==", deviceLogDTOS) for device in deviceLogDTOS: filePath = device["filePath"] filePath = filePath.replace("\\", "/") url = "http://192.168.101.11:9000/detect/api/data/upload/remote" fileUrl = 'http://106.13.50.125:9999/detect' + filePath # res = requests.get('http://106.13.50.125:9999/detect' + filePath) # 本地测试ip # fileUrl='http://192.168.1.10:9099/detect' + filePath print("地址路径" + fileUrl) res = requests.get(fileUrl) file = io.BytesIO(res.content) mapPath = {"path": filePath} files = {"file": file} response = s.post(url=url, data=mapPath, files=files) message = response.text print("音频接口返回值:", message) print("音频结束时间:" + str(datetime.datetime.now())) # 将数据上传到内网 url = "http://192.168.101.11:9000/detect/admin/iot/device_log/send/warningTwo" response = s.post(url=url, json=data, headers={'Content-Type': 'application/json'}) message = response.text print("数据接口返回值:", message) print("数据结束时间:" + str(datetime.datetime.now())) map["type"] = 1 map["annotation"] = message waiurl = "http://106.13.50.125:9099/detect/admin/iot/internalLog/save" print("日志数据",map) result = requests.post(waiurl, json=map, headers={'Content-Type': 'application/json'}) print("日志接口返回值:", result.text) except requests.exceptions.ConnectTimeout as e: # 处理连接超时异常 print("连接超时:", e) run() except Exception as e: # 处理其他异常 print("发生异常:", e) run() def on_message(client, userdata, msg): """一旦订阅到消息, 回调此方法""" print("主题:" + msg.topic + " 消息:" + str(msg.payload.decode('utf-8'))) # 客户端返回的消息,使用gb2312编码中文不会报错 # 将收到的消息转换为JSON格式 threading.Thread(target=messageData, args=((msg,))).start() def run(): # 创建客户端对象 client = mqtt.Client(client_id=client_id) client.username_pw_set("adminRQ", "yongqiang666") # 设置连接回调函数 client.on_connect = on_connect # 设置接收消息回调函数 client.on_message = on_message # 连接到MQTT代理 client.connect("106.13.50.125", 1883, 60) # 保持连接并接收消息 client.loop_forever() if __name__ == '__main__': run()