云里听内网发送代码 --镇江
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

171 lines
7.3 KiB

import datetime
import threading
import paho.mqtt.client as mqtt
from requests_toolbelt.adapters.source import SourceAddressAdapter
import json
import requests
import io
# 客户端id
client_id='python_mqqt_syncdevice111'
# 连接回调函数
def on_connect(client, userdata, flags, rc):
"""一旦连接成功, 回调此方法"""
rc_status = ["连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户名或密码错误", "无授权"]
print("connect:", rc_status[rc])
topics = ["syncdevice", "syncvirtualdevice", "device"]
# 要订阅的主题列表
for topic in topics:
# 订阅主题
print("订阅主题:",topic)
client.subscribe(topic)
def messageData(msg):
topic=msg.topic
print("处理消息,时间:" + str(datetime.datetime.now()))
data = json.loads(msg.payload.decode())
print("数据",data)
try:
if data:
s = requests.Session()
# 指定特定网卡的IP地址'192.168.121.1',使用该网卡发送POST请求
#192.168.8.134 是内网卡所在的路由器IP,由这个IP发送数据到192.168.101.11
s.mount('http://', SourceAddressAdapter('192.168.8.134'))
map = { "tenantId": 81}
if topic=="syncdevice":
deviceLogList=data["deviceLogList"]
print("数据对象==",deviceLogList)
for device in deviceLogList:
filePath=device["filePath"]
filePath = filePath.replace("\\", "/")
url="http://192.168.101.11:9000/detect/api/data/upload/remote"
fileUrl='http://106.13.50.125:9999/detect' + filePath
#本地测试ip
#res = requests.get('http://192.168.1.51:9099/detect' + filePath)
#file = io.BytesIO(res.content)
res = requests.get(fileUrl)
file = io.BytesIO(res.content)
files = {"file": file}
mapPath={"path":filePath}
response=s.post(url=url, data=mapPath,files=files)
message = response.text
print("音频接口返回值:", message)
print("音频结束时间:" + str(datetime.datetime.now()))
if "userPhoto" in data:
userPhoto= data["userPhoto"]
if userPhoto:
for user in userPhoto:
filePath = user["url"]
filePath = filePath.replace("\\", "/")
url = "http://192.168.101.11:9000/detect/api/data/upload/remote"
res1 = requests.get('http://106.13.50.125:9999/detect' + filePath)
# 本地测试ip
#res = requests.get('http://192.168.1.51:9099/detect' + filePath)
file1 = io.BytesIO(res1.content)
files1={"file":file1}
mapPath = {"path": filePath}
response = s.post(url=url, data=mapPath, files=files1)
message = response.text
print("图片接口返回值:", message)
print("图片结束时间:" + str(datetime.datetime.now()))
#将数据上传到内网
url = "http://192.168.101.11:9000/detect/admin/inspect/task_user/synchrodata"
response = s.post(url=url, json=data,headers={'Content-Type':'application/json'})
message = response.text
print("同步数据接口返回值:", message)
print("同步数据结束时间:" + str(datetime.datetime.now()))
map["type"] = 2
map["annotation"] = message
elif topic=="syncvirtualdevice":
# 将数据上传到内网
url = "http://192.168.101.11:9000/detect/admin/iot/device/snycdevice/receive"
response = s.post(url=url, json=data, headers={'Content-Type': 'application/json'})
message = response.text
print("同步设备数据接口返回值:", message)
map["type"] = 3
map["annotation"] = message
elif topic=="device":
deviceLogDTOS = data["deviceLogDTOS"]
print("数据对象==", deviceLogDTOS)
for device in deviceLogDTOS:
filePath = device["filePath"]
filePath = filePath.replace("\\", "/")
url = "http://192.168.101.11:9000/detect/api/data/upload/remote"
fileUrl = 'http://106.13.50.125:9999/detect' + filePath
# res = requests.get('http://106.13.50.125:9999/detect' + filePath)
# 本地测试ip
# fileUrl='http://192.168.1.10:9099/detect' + filePath
print("地址路径" + fileUrl)
res = requests.get(fileUrl)
file = io.BytesIO(res.content)
mapPath = {"path": filePath}
files = {"file": file}
response = s.post(url=url, data=mapPath, files=files)
message = response.text
print("音频接口返回值:", message)
print("音频结束时间:" + str(datetime.datetime.now()))
# 将数据上传到内网
url = "http://192.168.101.11:9000/detect/admin/iot/device_log/send/warningTwo"
response = s.post(url=url, json=data, headers={'Content-Type': 'application/json'})
message = response.text
print("数据接口返回值:", message)
print("数据结束时间:" + str(datetime.datetime.now()))
map["type"] = 1
map["annotation"] = message
waiurl = "http://106.13.50.125:9099/detect/admin/iot/internalLog/save"
print("日志数据",map)
result = requests.post(waiurl, json=map, headers={'Content-Type': 'application/json'})
print("日志接口返回值:", result.text)
except requests.exceptions.ConnectTimeout as e:
# 处理连接超时异常
print("连接超时:", e)
run()
except Exception as e:
# 处理其他异常
print("发生异常:", e)
run()
def on_message(client, userdata, msg):
"""一旦订阅到消息, 回调此方法"""
print("主题:" + msg.topic + " 消息:" + str(msg.payload.decode('utf-8'))) # 客户端返回的消息,使用gb2312编码中文不会报错
# 将收到的消息转换为JSON格式
threading.Thread(target=messageData, args=((msg,))).start()
def run():
# 创建客户端对象
client = mqtt.Client(client_id=client_id)
client.username_pw_set("adminRQ", "yongqiang666")
# 设置连接回调函数
client.on_connect = on_connect
# 设置接收消息回调函数
client.on_message = on_message
# 连接到MQTT代理
client.connect("106.13.50.125", 1883, 60)
# 保持连接并接收消息
client.loop_forever()
if __name__ == '__main__':
run()