2019/11/28

阿里云-DataWork-ip库


对每个日志的ip进行解析,然后统计落地回来的激活、活跃、付费的每个地区用户分布情况。当前MaxComputer没有直接能将ip解析为对应地区信息的接口,所以需要事先将ip地址库存入到MaxComputer表格中,然后基于联合查询得出ip对应的地区信息。

ip地址库找了GitHub上一个仓库 lionsoul2014/ip2region,创建数据表存放ip库信息。

CREATE TABLE `ip_resource` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `start_ip` bigint(20) NOT NULL COMMENT 'ip起始地址',
  `end_ip` bigint(20) NOT NULL COMMENT 'ip结束地址',
  `start_ip_arg` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'ip起始地址',
  `end_ip_arg` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'ip结束地址',
  `country` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '国家',
  `area` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '省',
  `city` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '市',
  `county` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '县',
  `isp` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '运营商',
  PRIMARY KEY (`id`),
  KEY `ip_resource_start_ip_index` (`start_ip`),
  KEY `ip_resource_end_ip_index` (`end_ip`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

编写php脚本,对ip地址库进行解析,将ip地址转成bigint类型,方便数据库查询,然后将数据落地。

ini_set('memory_limit', '2048M');

// 写入文件
$wpath = './insert.sql';
@unlink($wpath);
$wfile = fopen($wpath, 'w+');

// 读取文件
$filepath = './ip.merge.txt';
$file     = fopen($filepath, "r");

// 循环处理
$num      = 0;
while (!feof($file)) {
    $str = fgets($file);
    $tmp = explode("|", $str);
    foreach ($tmp as $k => $v) {
        $tmp[$k] = preg_replace('|[\n\r]+|','',$tmp[$k]);
    }
    $start_ip = ip2long($tmp[0]);
    $end_ip = ip2long($tmp[1]);
    $sql = "INSERT INTO `ip_resource` (`start_ip`, `end_ip`, `start_ip_arg`, `end_ip_arg`, `country`, `area`, `city`, `isp`) VALUES ('{$start_ip}', '{$end_ip}', '{$tmp[0]}', '{$tmp[1]}', '{$tmp[2]}', '{$tmp[4]}', '{$tmp[5]}', '{$tmp[6]}');\n";
    fwrite($wfile, $sql);
    $num++;
    echo "{$num}\n";
}
fclose($file);
fclose($wfile);

在业务流程-资源处添加 Python 资源,将点号分割的IP地址转化为整数类型的IP地址。命名: ip2long.py 。加入以下代码后进行保存 提交上锁

from odps.udf import annotate
@annotate("string->bigint")
class ip2long(object):
    def evaluate(self, ip):
        try:
            return reduce(lambda x, y: (x << 8) + y, map(int, ip.split('.')))
        except:
            return 0

在业务流程-函数 新建函数,编写完成保存 提交上锁

函数名:ip2long
类名: ip2long.ip2long
资源列表: ip2long.py

接下来就可以创建一个 ODPS SQL 进行测试

SELECT IP2LONG('183.6.107.165');