package main import ( "database/sql" "fmt" "io" "log" "net/http" "time" "github.com/goccy/go-json" _ "github.com/mattn/go-sqlite3" ) func main() { db, err := sql.Open("sqlite3", "./data.db") if err != nil { panic(err) } defer db.Close() rows, err := db.Query("SELECT DISTINCT group_id FROM messages") if err != nil { panic(err) } defer rows.Close() var groupIDs []int for rows.Next() { var groupID int err := rows.Scan(&groupID) if err != nil { panic(err) } groupIDs = append(groupIDs, groupID) } for _, groupID := range groupIDs { minSeq := get_min_seqs(db, groupID) println("minSeq:", minSeq) // if minSeq == 0 { // continue // } for minSeq != 0 { fetchHistoryMessages(db, minSeq, groupID) temp := get_min_seqs(db, groupID) if temp == minSeq { println("已获取到最久数据!") time.Sleep(time.Second * 2) break } minSeq = temp // println("minSeq_changed:", minSeq) } } } func get_min_seqs(db *sql.DB, groupID int) int { var minMessageSeq int err := db.QueryRow("SELECT MIN(message_seq) FROM messages WHERE group_id = ?", groupID).Scan(&minMessageSeq) if err != nil { panic(err) } // println("minMessageSeq:", minMessageSeq) var count int err = db.QueryRow("SELECT COUNT(*) FROM messages WHERE group_id = ? AND message_seq = ?", groupID, minMessageSeq-1).Scan(&count) if err != nil { panic(err) } // println("count:", count) if count > 0 { return 0 } fmt.Printf("Group ID: %d, Min Message Seq: %d\n", groupID, minMessageSeq) return minMessageSeq } func fetchHistoryMessages(db *sql.DB, message_seq int, group_id int) (int, error) { url := fmt.Sprintf("http://127.0.0.1:5700/get_group_msg_history?message_seq=%d&group_id=%d", message_seq, group_id) println("min message_seq :", message_seq, "min group_id: ", group_id) println(url) resp, err := http.Get(url) if err != nil { panic(err) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return 0, err } // println("Body:", string(body)) var messageResp map[string]interface{} err = json.Unmarshal(body, &messageResp) if err != nil { return 0, err } data := messageResp["data"].(map[string]interface{}) messages := data["messages"].([]interface{}) for _, message := range messages { messageMap := message.(map[string]interface{}) insertMessage(db, messageMap) time.Sleep(time.Second * 1) } return 0, nil } func insertMessage(db *sql.DB, data map[string]interface{}) error { if data["post_type"] == "message" { // id := data["message_id"].(float64) post_type := data["post_type"].(string) message_type := data["message_type"].(string) // 设置东八区时间 loc, err := time.LoadLocation("Asia/Shanghai") if err != nil { log.Fatal(err) } message_time := (time.Unix(int64(data["time"].(float64)), 0).In(loc)).Format("2006-01-02 15:04:05") group_id := data["group_id"].(float64) message_id := data["message_id"].(float64) raw_message := data["raw_message"].(string) sender := data["sender"].(map[string]interface{}) sender_user_id := sender["user_id"].(float64) sender_nickname := sender["nickname"].(string) sender_card := sender["card"].(string) if sender_card == "" { sender_card = sender_nickname } sender_role := sender["role"].(string) message_seq := data["message_seq"].(float64) fmt.Println(post_type, message_type, message_time, group_id, message_id, raw_message, sender_user_id, sender_nickname, sender_card, sender_role, message_seq) // Insert data into database _, err = db.Exec("INSERT INTO messages ( post_type, message_type, time, group_id, message_id, raw_message, sender_user_id, sender_nickname, sender_card, sender_role, message_seq) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", post_type, message_type, message_time, group_id, message_id, raw_message, sender_user_id, sender_nickname, sender_card, sender_role, message_seq) fmt.Println("Data inserted successfully!") if err != nil { log.Fatal(err) } } return nil }