V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
fuyun
V2EX  ›  Node.js

纯真 IP 数据库解析 Node.js 版

  •  
  •   fuyun · 14 天前 · 1564 次点击

    用 Node.js 写了一个解析纯真 IP 数据库的项目:纯真 IP 数据库解析 Nest.js 版

    支持以下特性:

    1. 提供国家、省、市、区、运营商等字段的解析;
    2. 通过定时任务定时更新 IP 数据库;
    3. IP 搜索、批量搜索;
    4. 查询 IP 数据库版本信息。

    其中,IP 数据库来源:qqwry.dat

    如果是数据库方式,还可以自定义各种模糊搜索、按字段搜索等(性能是个问题)。

    该项目已在 ifuyun.com 上使用。

    最后,感谢算法提供:qqwry.dat-analyse

    8 条回复    2024-09-03 15:39:59 +08:00
    jiayouzl
        1
    jiayouzl  
       14 天前
    不错,待会我整个 Python 版的。
    daimaosix
        2
    daimaosix  
       14 天前
    不错,待会我整个 Rust 版的。
    jiayouzl
        3
    jiayouzl  
       14 天前
    Python 版我写好了
    ```
    # -*- coding: UTF-8 -*-

    import socket
    import struct


    class CzIp:

    def __init__(self, db_file="qqwry2024-08-28.dat"): # db_file="qqwry.dat"
    self.f_db = open(db_file, "rb")
    bs = self.f_db.read(8)
    (self.first_index, self.last_index) = struct.unpack("II", bs)
    self.index_count = int((self.last_index - self.first_index) / 7 + 1)
    self.cur_start_ip = None
    self.cur_end_ip_offset = None
    self.cur_end_ip = None
    print(self.get_version(), " 记录总数: %d 条 " % (self.index_count))

    def get_version(self):
    """
    获取版本信息,最后一条 IP 记录 255.255.255.0-255.255.255.255 是版本信息
    :return: str
    """
    s = self.get_addr_by_ip(0xFFFFFF00)
    return s

    def _get_area_addr(self, offset=0):
    if offset:
    self.f_db.seek(offset)
    bs = self.f_db.read(1)
    (byte,) = struct.unpack("B", bs)
    if byte == 0x01 or byte == 0x02:
    p = self.getLong3()
    if p:
    return self.get_offset_string(p)
    else:
    return ""
    else:
    self.f_db.seek(-1, 1)
    return self.get_offset_string(offset)

    def _get_addr(self, offset):
    """
    获取 offset 处记录区地址信息(包含国家和地区)
    如果是中国 ip ,则是 "xx 省 xx 市 xxxxx 地区" 这样的形式
    (比如:"福建省 电信", "澳大利亚 墨尔本 Goldenit 有限公司")
    :param offset:
    :return:str
    """
    self.f_db.seek(offset + 4)
    bs = self.f_db.read(1)
    (byte,) = struct.unpack("B", bs)
    if byte == 0x01: # 重定向模式 1
    country_offset = self.getLong3()
    self.f_db.seek(country_offset)
    bs = self.f_db.read(1)
    (b,) = struct.unpack("B", bs)
    if b == 0x02:
    country_addr = self.get_offset_string(self.getLong3())
    self.f_db.seek(country_offset + 4)
    else:
    country_addr = self.get_offset_string(country_offset)
    area_addr = self._get_area_addr()
    elif byte == 0x02: # 重定向模式 2
    country_addr = self.get_offset_string(self.getLong3())
    area_addr = self._get_area_addr(offset + 8)
    else: # 字符串模式
    country_addr = self.get_offset_string(offset + 4)
    area_addr = self._get_area_addr()
    return country_addr + " " + area_addr

    def dump(self, first, last):
    """
    打印数据库中索引为 first 到索引为 last(不包含 last)的记录
    :param first:
    :param last:
    :return:
    """
    if last > self.index_count:
    last = self.index_count
    for index in range(first, last):
    offset = self.first_index + index * 7
    self.f_db.seek(offset)
    buf = self.f_db.read(7)
    (ip, of1, of2) = struct.unpack("IHB", buf)
    address = self._get_addr(of1 + (of2 << 16))
    print("%d %s %s" % (index, self.ip2str(ip), address))

    def _set_ip_range(self, index):
    offset = self.first_index + index * 7
    self.f_db.seek(offset)
    buf = self.f_db.read(7)
    (self.cur_start_ip, of1, of2) = struct.unpack("IHB", buf)
    self.cur_end_ip_offset = of1 + (of2 << 16)
    self.f_db.seek(self.cur_end_ip_offset)
    buf = self.f_db.read(4)
    (self.cur_end_ip,) = struct.unpack("I", buf)

    def get_addr_by_ip(self, ip):
    """
    通过 ip 查找其地址
    :param ip: (int or str)
    :return: str
    """
    if type(ip) == str:
    ip = self.str2ip(ip)
    L = 0
    R = self.index_count - 1
    while L < R - 1:
    M = int((L + R) / 2)
    self._set_ip_range(M)
    if ip == self.cur_start_ip:
    L = M
    break
    if ip > self.cur_start_ip:
    L = M
    else:
    R = M
    self._set_ip_range(L)
    # version information, 255.255.255.X, urgy but useful
    if ip & 0xFFFFFF00 == 0xFFFFFF00:
    self._set_ip_range(R)
    if self.cur_start_ip <= ip <= self.cur_end_ip:
    address = self._get_addr(self.cur_end_ip_offset)
    else:
    address = "未找到该 IP 的地址"
    return address

    def get_ip_range(self, ip):
    """
    返回 ip 所在记录的 IP 段
    :param ip: ip(str or int)
    :return: str
    """
    if type(ip) == str:
    ip = self.str2ip(ip)
    self.get_addr_by_ip(ip)
    range = self.ip2str(self.cur_start_ip) + " - " + self.ip2str(self.cur_end_ip)
    return range

    def get_offset_string(self, offset=0):
    """
    获取文件偏移处的字符串(以'\0'结尾)
    :param offset: 偏移
    :return: str
    """
    if offset:
    self.f_db.seek(offset)
    bs = b""
    ch = self.f_db.read(1)
    (byte,) = struct.unpack("B", ch)
    while byte != 0:
    bs += ch
    ch = self.f_db.read(1)
    (byte,) = struct.unpack("B", ch)
    return bs.decode("gbk")

    def ip2str(self, ip):
    """
    整数 IP 转化为 IP 字符串
    :param ip:
    :return:
    """
    return str(ip >> 24) + "." + str((ip >> 16) & 0xFF) + "." + str((ip >> 8) & 0xFF) + "." + str(ip & 0xFF)

    def str2ip(self, s):
    """
    IP 字符串转换为整数 IP
    :param s:
    :return:
    """
    (ip,) = struct.unpack("I", socket.inet_aton(s))
    return ((ip >> 24) & 0xFF) | ((ip & 0xFF) << 24) | ((ip >> 8) & 0xFF00) | ((ip & 0xFF00) << 8)

    def getLong3(self, offset=0):
    """
    3 字节的数值
    :param offset:
    :return:
    """
    if offset:
    self.f_db.seek(offset)
    bs = self.f_db.read(3)
    (a, b) = struct.unpack("HB", bs)
    return (b << 16) + a


    if __name__ == "__main__":
    # todo:纯真 IP 库解析
    cz = CzIp()
    # print(cz.get_version())
    ip = "8.8.8.8"
    print(cz.get_ip_range(ip))
    print(cz.get_addr_by_ip(ip))
    print("====")
    ip = "125.129.173.203"
    print(cz.get_ip_range(ip))
    print(cz.get_addr_by_ip(ip))
    ```
    fuyun
        4
    fuyun  
    OP
       14 天前
    @daimaosix @jiayouzl 真是机灵鬼!
    lsls931011
        5
    lsls931011  
       13 天前
    不错,待会我整个 PHP 版的。
    jianyang
        6
    jianyang  
       13 天前
    马上停止更新了还做这个干嘛
    Sayuri
        7
    Sayuri  
       13 天前
    Nest.js 搞一大堆 module 的项目我都感觉特别特别蠢。还得处理 module 之间的依赖关系特别麻烦。我所在的公司的 Nest.js 的项目都是一个 module 底下挂一大堆 controller 和 service 。
    另外,其实这东西做成那种可配置的 module 也是挺不错的。
    fuyun
        8
    fuyun  
    OP
       13 天前
    @Sayuri 这种公共的、基础的服务本来就是可以独立成类似 Nest.js 官方的一些模块。这个项目为了能够独立运行,才做成了一个完整的 APP 。后续条件允许,再考虑模块化成 npm 包。
    另外,就 module 而言,见仁见智吧,我不喜欢大杂烩,分工本来也有这一层意思。或许更好的是,像 Angular 那样,再抽象出一个 Standalone 层面的,也就无需 import 整个 module 。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   940 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 19:52 · PVG 03:52 · LAX 12:52 · JFK 15:52
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.