You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
171 lines
7.3 KiB
171 lines
7.3 KiB
import datetime
|
|
|
|
import threading
|
|
|
|
|
|
import paho.mqtt.client as mqtt
|
|
|
|
from requests_toolbelt.adapters.source import SourceAddressAdapter
|
|
import json
|
|
|
|
import requests
|
|
import io
|
|
# 客户端id
|
|
client_id='python_mqqt_syncdevice111'
|
|
|
|
|
|
|
|
|
|
|
|
# 连接回调函数
|
|
def on_connect(client, userdata, flags, rc):
|
|
"""一旦连接成功, 回调此方法"""
|
|
rc_status = ["连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户名或密码错误", "无授权"]
|
|
print("connect:", rc_status[rc])
|
|
topics = ["syncdevice", "syncvirtualdevice", "device"]
|
|
# 要订阅的主题列表
|
|
for topic in topics:
|
|
# 订阅主题
|
|
print("订阅主题:",topic)
|
|
client.subscribe(topic)
|
|
|
|
|
|
def messageData(msg):
|
|
topic=msg.topic
|
|
print("处理消息,时间:" + str(datetime.datetime.now()))
|
|
data = json.loads(msg.payload.decode())
|
|
print("数据",data)
|
|
try:
|
|
if data:
|
|
s = requests.Session()
|
|
# 指定特定网卡的IP地址'192.168.121.1',使用该网卡发送POST请求
|
|
#192.168.8.134 是内网卡所在的路由器IP,由这个IP发送数据到192.168.101.11
|
|
s.mount('http://', SourceAddressAdapter('192.168.8.134'))
|
|
map = { "tenantId": 81}
|
|
|
|
if topic=="syncdevice":
|
|
deviceLogList=data["deviceLogList"]
|
|
print("数据对象==",deviceLogList)
|
|
for device in deviceLogList:
|
|
filePath=device["filePath"]
|
|
filePath = filePath.replace("\\", "/")
|
|
url="http://192.168.101.11:9000/detect/api/data/upload/remote"
|
|
fileUrl='http://106.13.50.125:9999/detect' + filePath
|
|
#本地测试ip
|
|
#res = requests.get('http://192.168.1.51:9099/detect' + filePath)
|
|
#file = io.BytesIO(res.content)
|
|
res = requests.get(fileUrl)
|
|
file = io.BytesIO(res.content)
|
|
files = {"file": file}
|
|
mapPath={"path":filePath}
|
|
response=s.post(url=url, data=mapPath,files=files)
|
|
message = response.text
|
|
|
|
print("音频接口返回值:", message)
|
|
print("音频结束时间:" + str(datetime.datetime.now()))
|
|
if "userPhoto" in data:
|
|
userPhoto= data["userPhoto"]
|
|
if userPhoto:
|
|
for user in userPhoto:
|
|
filePath = user["url"]
|
|
filePath = filePath.replace("\\", "/")
|
|
url = "http://192.168.101.11:9000/detect/api/data/upload/remote"
|
|
res1 = requests.get('http://106.13.50.125:9999/detect' + filePath)
|
|
# 本地测试ip
|
|
#res = requests.get('http://192.168.1.51:9099/detect' + filePath)
|
|
file1 = io.BytesIO(res1.content)
|
|
files1={"file":file1}
|
|
mapPath = {"path": filePath}
|
|
response = s.post(url=url, data=mapPath, files=files1)
|
|
message = response.text
|
|
print("图片接口返回值:", message)
|
|
print("图片结束时间:" + str(datetime.datetime.now()))
|
|
#将数据上传到内网
|
|
url = "http://192.168.101.11:9000/detect/admin/inspect/task_user/synchrodata"
|
|
response = s.post(url=url, json=data,headers={'Content-Type':'application/json'})
|
|
message = response.text
|
|
print("同步数据接口返回值:", message)
|
|
print("同步数据结束时间:" + str(datetime.datetime.now()))
|
|
map["type"] = 2
|
|
map["annotation"] = message
|
|
elif topic=="syncvirtualdevice":
|
|
# 将数据上传到内网
|
|
url = "http://192.168.101.11:9000/detect/admin/iot/device/snycdevice/receive"
|
|
response = s.post(url=url, json=data, headers={'Content-Type': 'application/json'})
|
|
message = response.text
|
|
print("同步设备数据接口返回值:", message)
|
|
map["type"] = 3
|
|
map["annotation"] = message
|
|
elif topic=="device":
|
|
deviceLogDTOS = data["deviceLogDTOS"]
|
|
print("数据对象==", deviceLogDTOS)
|
|
for device in deviceLogDTOS:
|
|
filePath = device["filePath"]
|
|
filePath = filePath.replace("\\", "/")
|
|
url = "http://192.168.101.11:9000/detect/api/data/upload/remote"
|
|
fileUrl = 'http://106.13.50.125:9999/detect' + filePath
|
|
# res = requests.get('http://106.13.50.125:9999/detect' + filePath)
|
|
# 本地测试ip
|
|
# fileUrl='http://192.168.1.10:9099/detect' + filePath
|
|
print("地址路径" + fileUrl)
|
|
res = requests.get(fileUrl)
|
|
file = io.BytesIO(res.content)
|
|
mapPath = {"path": filePath}
|
|
files = {"file": file}
|
|
response = s.post(url=url, data=mapPath, files=files)
|
|
message = response.text
|
|
print("音频接口返回值:", message)
|
|
print("音频结束时间:" + str(datetime.datetime.now()))
|
|
# 将数据上传到内网
|
|
url = "http://192.168.101.11:9000/detect/admin/iot/device_log/send/warningTwo"
|
|
response = s.post(url=url, json=data, headers={'Content-Type': 'application/json'})
|
|
message = response.text
|
|
print("数据接口返回值:", message)
|
|
print("数据结束时间:" + str(datetime.datetime.now()))
|
|
map["type"] = 1
|
|
map["annotation"] = message
|
|
waiurl = "http://106.13.50.125:9099/detect/admin/iot/internalLog/save"
|
|
print("日志数据",map)
|
|
result = requests.post(waiurl, json=map, headers={'Content-Type': 'application/json'})
|
|
print("日志接口返回值:", result.text)
|
|
|
|
except requests.exceptions.ConnectTimeout as e:
|
|
# 处理连接超时异常
|
|
print("连接超时:", e)
|
|
run()
|
|
|
|
except Exception as e:
|
|
# 处理其他异常
|
|
print("发生异常:", e)
|
|
run()
|
|
def on_message(client, userdata, msg):
|
|
"""一旦订阅到消息, 回调此方法"""
|
|
print("主题:" + msg.topic + " 消息:" + str(msg.payload.decode('utf-8'))) # 客户端返回的消息,使用gb2312编码中文不会报错
|
|
# 将收到的消息转换为JSON格式
|
|
threading.Thread(target=messageData, args=((msg,))).start()
|
|
|
|
|
|
|
|
|
|
|
|
def run():
|
|
# 创建客户端对象
|
|
client = mqtt.Client(client_id=client_id)
|
|
client.username_pw_set("adminRQ", "yongqiang666")
|
|
# 设置连接回调函数
|
|
client.on_connect = on_connect
|
|
# 设置接收消息回调函数
|
|
client.on_message = on_message
|
|
# 连接到MQTT代理
|
|
client.connect("106.13.50.125", 1883, 60)
|
|
# 保持连接并接收消息
|
|
client.loop_forever()
|
|
|
|
if __name__ == '__main__':
|
|
run()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|