个人数据分析学习过程
from flask import Flask, request, jsonify
import serial
import pandas as pd
import time
import re
from datetime import datetime
import threading
ser = serial.Serial('COM3', 115200, timeout=1)
dy = r"电压: (\d+\.\d+) V"
dl = r"电流: (\d+\.\d+) A"
gl = r"有功功率: (\d+) W"
pl = r"频率: (\d+\.\d+) Hz"
dn = r"电能: (\d+\.\d+) kWh"
app = Flask(__name__)
def append_to_csv(data_to_append, filename='datas.csv', max_rows=10000):
# 尝试读取现有CSV文件
try:
df = pd.read_csv(filename)
except FileNotFoundError:
# 如果文件不存在,则创建一个空的DataFrame
df = pd.DataFrame()
# 将新数据添加到DataFrame
df = pd.concat([df, data_to_append], ignore_index=True)
# 如果行数超过max_rows,则删除旧数据
if len(df) > max_rows:
df = df.iloc[-max_rows:] # 保留最新的max_rows行
# 写回CSV文件
df.to_csv(filename, index=False)
def init():
while True:
try:
if ser.in_waiting > 0:
datastr = ser.read(ser.in_waiting).decode('utf-8') # 假设数据是UTF-8编码
if "数据有效,开始解析..." in datastr:
datas = {}
now = datetime.now()
now_time = now.strftime("%Y-%m-%d %H:%M:%S")
dy_num = re.search(dy, datastr)
if dy_num:
datas["时间"] = [now_time]
datas["电压"] = [dy_num.group(1)]
dl_num = re.search(dl, datastr)
if dy_num:
datas["电流"] = [dl_num.group(1)]
gl_num = re.search(gl, datastr)
if gl_num:
datas["功率"] = [gl_num.group(1)]
pl_num = re.search(pl, datastr)
if pl_num:
datas["频率"] = [pl_num.group(1)]
dn_num = re.search(dn, datastr)
if dn_num:
datas["电能"] = [dn_num.group(1)]
if datas:
print(datas)
df = pd.DataFrame(datas)
append_to_csv(df)
time.sleep(5)
except KeyboardInterrupt:
print('程序被用户中断')
finally:
print('读取数据+1')
def start_background_task():
"""启动后台任务的函数,用于在 Flask 应用中调用"""
thread = threading.Thread(target=init)
thread.daemon = True # 设置为守护线程,主程序退出时自动退出
thread.start()
# start_background_task()
@app.route('/doOpen',methods=['GET'])
def doOpen():
doSerial("on")
return jsonify({"code":200,"msg":"成功"})
@app.route('/doDown',methods=['GET'])
def doDown():
doSerial("off")
return jsonify({"code": 200, "msg": "成功"})
@app.route('/getData',methods=['GET'])
def getData():
df = pd.read_csv("datas.csv")
df["时间"] = pd.to_datetime(df["时间"], errors='coerce')
df = df.fillna(0)
# df.sort_values(by="时间",ascending=False)
# 将 DataFrame 转换成字典列表,每个字典代表 DataFrame 的一行
data_dict = df.tail(500)
data = {}
data["sj"] = data_dict["时间"].dt.strftime('%H:%M').tolist()
data["dy"] = data_dict["电压"].tolist()
data["dl"] = data_dict["电流"].tolist()
data["gl"] = data_dict["功率"].tolist()
data["pl"] = data_dict["频率"].tolist()
datanow = df.tail(1)
data['now'] = datanow.to_dict(orient='records')
# 使用 Flask 的 jsonify 函数返回 JSON 响应
return jsonify(data)
def doSerial(command):
try:
# 确保串口已经打开
if ser.isOpen():
command = command.encode("utf-8")
# 发送指令
ser.write(command)
else:
print("Failed to open serial port.")
except serial.SerialException as e:
print(f"An error occurred: {e}")
finally:
print("Serial port closed.")
if __name__ == '__main__':
# 设置端口号为 8000
app.run(port=8000)
from flask import Flask, request, jsonify
import serial
import pandas as pd
import time
import re
from datetime import datetime
import threading
ser = serial.Serial('COM3', 115200, timeout=1)
dy = r"电压: (\d+\.\d+) V"
dl = r"电流: (\d+\.\d+) A"
gl = r"有功功率: (\d+) W"
pl = r"频率: (\d+\.\d+) Hz"
dn = r"电能: (\d+\.\d+) kWh"
app = Flask(__name__)
def append_to_csv(data_to_append, filename='datas.csv', max_rows=10000):
# 尝试读取现有CSV文件
try:
df = pd.read_csv(filename)
except FileNotFoundError:
# 如果文件不存在,则创建一个空的DataFrame
df = pd.DataFrame()
# 将新数据添加到DataFrame
df = pd.concat([df, data_to_append], ignore_index=True)
# 如果行数超过max_rows,则删除旧数据
if len(df) > max_rows:
df = df.iloc[-max_rows:] # 保留最新的max_rows行
# 写回CSV文件
df.to_csv(filename, index=False)
def init():
while True:
try:
if ser.in_waiting > 0:
datastr = ser.read(ser.in_waiting).decode('utf-8') # 假设数据是UTF-8编码
if "数据有效,开始解析..." in datastr:
datas = {}
now = datetime.now()
now_time = now.strftime("%Y-%m-%d %H:%M:%S")
dy_num = re.search(dy, datastr)
if dy_num:
datas["时间"] = [now_time]
datas["电压"] = [dy_num.group(1)]
dl_num = re.search(dl, datastr)
if dy_num:
datas["电流"] = [dl_num.group(1)]
gl_num = re.search(gl, datastr)
if gl_num:
datas["功率"] = [gl_num.group(1)]
pl_num = re.search(pl, datastr)
if pl_num:
datas["频率"] = [pl_num.group(1)]
dn_num = re.search(dn, datastr)
if dn_num:
datas["电能"] = [dn_num.group(1)]
if datas:
print(datas)
df = pd.DataFrame(datas)
append_to_csv(df)
time.sleep(5)
except KeyboardInterrupt:
print('程序被用户中断')
finally:
print('读取数据+1')
def start_background_task():
"""启动后台任务的函数,用于在 Flask 应用中调用"""
thread = threading.Thread(target=init)
thread.daemon = True # 设置为守护线程,主程序退出时自动退出
thread.start()
# start_background_task()
@app.route('/doOpen',methods=['GET'])
def doOpen():
doSerial("on")
return jsonify({"code":200,"msg":"成功"})
@app.route('/doDown',methods=['GET'])
def doDown():
doSerial("off")
return jsonify({"code": 200, "msg": "成功"})
@app.route('/getData',methods=['GET'])
def getData():
df = pd.read_csv("datas.csv")
df["时间"] = pd.to_datetime(df["时间"], errors='coerce')
df = df.fillna(0)
# df.sort_values(by="时间",ascending=False)
# 将 DataFrame 转换成字典列表,每个字典代表 DataFrame 的一行
data_dict = df.tail(500)
data = {}
data["sj"] = data_dict["时间"].dt.strftime('%H:%M').tolist()
data["dy"] = data_dict["电压"].tolist()
data["dl"] = data_dict["电流"].tolist()
data["gl"] = data_dict["功率"].tolist()
data["pl"] = data_dict["频率"].tolist()
datanow = df.tail(1)
data['now'] = datanow.to_dict(orient='records')
# 使用 Flask 的 jsonify 函数返回 JSON 响应
return jsonify(data)
def doSerial(command):
try:
# 确保串口已经打开
if ser.isOpen():
command = command.encode("utf-8")
# 发送指令
ser.write(command)
else:
print("Failed to open serial port.")
except serial.SerialException as e:
print(f"An error occurred: {e}")
finally:
print("Serial port closed.")
if __name__ == '__main__':
# 设置端口号为 8000
app.run(port=8000000)