Yii2 – elasticSearch 新建mapping操作

在一些需要完全匹配,或者其他的一些情况,需要建立mapping,这个有点类似mysql的表定义

ActiveRecord的定义:

<?php

namespace appadmin\code\Ta\models\elasticSearch;

use yii\elasticsearch\ActiveRecord;

class TraceData extends ActiveRecord
{
  public static $currentIndex;
  
  # 定义db链接
  public static function getDb()
  {
    return \Yii::$app->get('elasticsearch_TA');
  }
  
  # 不同的website 使用的是不同的db ,使用前需要先初始化
  # db的名字
  public static function initDb($website_id){
    if($website_id){
      self::$currentIndex = 'ta'."_".$website_id;
    }
  }
  
  
  
  # db
  public static function index()
  {
    return self::$currentIndex;
  }
  # table
  public static function type()
  {
    return 'trace_data';
  }
  
   public function attributes()
    {
        $mapConfig = self::mapConfig();
    return array_keys($mapConfig['properties']);
    }
  
  public static function mapConfig(){
    return [
      'properties' => [
        'id'				=> ['type' => 'string',"index" => "not_analyzed"],
        'ip'				=> ['type' => 'string',"index" => "not_analyzed"],
        'service_date_str'	=> ['type' => 'string',"index" => "not_analyzed"],
        'service_datetime'	=> ['type' => 'string',"index" => "not_analyzed"],
        'service_timestamp'	=> ['type' => 'integer',"index" => "not_analyzed"],
        'devide' 			=> ['type' => 'string',"index" => "not_analyzed"],
        'user_agent' 		=> ['type' => 'string',"index" => "not_analyzed"],
        'browser_name' 		=> ['type' => 'string',"index" => "not_analyzed"],
        'browser_version'	=> ['type' => 'string',"index" => "not_analyzed"],
        'browser_date'		=> ['type' => 'string',"index" => "not_analyzed"],
        'browser_lang'		=> ['type' => 'string',"index" => "not_analyzed"],
        'operate' 			=> ['type' => 'string',"index" => "not_analyzed"],
        'operate_relase'	=> ['type' => 'string',"index" => "not_analyzed"],
        'domain' 			=> ['type' => 'string',"index" => "not_analyzed"],
        'url'				=> ['type' => 'string',"index" => "not_analyzed"],
        'title'				=> ['type' => 'string',"index" => "not_analyzed"],
        'refer_url'			=> ['type' => 'string',"index" => "not_analyzed"],
        'first_referrer_domain'	=> ['type' => 'string',"index" => "not_analyzed"],
        'is_return'			=> ['type' => 'integer',"index" => "not_analyzed"],
        'uuid'				=> ['type' => 'string',"index" => "not_analyzed"],
        'device_pixel_ratio'=> ['type' => 'string',"index" => "not_analyzed"],
        'resolution'		=> ['type' => 'string',"index" => "not_analyzed"],
        'color_depth'		=> ['type' => 'string',"index" => "not_analyzed"],
        'website_id'		=> ['type' => 'integer',"index" => "not_analyzed"],
        'sku'				=> ['type' => 'string',"index" => "not_analyzed"],
        'country_code'		=> ['type' => 'string',"index" => "not_analyzed"],
        'country_name'		=> ['type' => 'string',"index" => "not_analyzed"],
        
        'order_status' 		=> ['type' => 'string',"index" => "not_analyzed"],
        'cart' 				=> ['type' => 'string',"index" => "not_analyzed"],
        'order'				=> ['type' => 'string',"index" => "not_analyzed"],
        'category'			=> ['type' => 'string',"index" => "not_analyzed"],
        'login_email'		=> ['type' => 'string',"index" => "not_analyzed"],
        'register_email'	=> ['type' => 'string',"index" => "not_analyzed"],
        'search'			=> ['type' => 'string',"index" => "not_analyzed"],
        'currency'			=> ['type' => 'string',"index" => "not_analyzed"],
        'url_new'			=> ['type' => 'string',"index" => "not_analyzed"],
        'stay_seconds'		=> ['type' => 'integer',"index" => "not_analyzed"],
        'first_visit_this_url'	=> ['type' => 'string',"index" => "not_analyzed"],
      ]
    ];
  }
  
  public static function mapping()
    {
        return [
            static::type() => self::mapConfig(),
        ];
    }

    /**
     * Set (update) mappings for this model
     */
    public static function updateMapping(){
        $db = self::getDb();
        $command = $db->createCommand();
    if(!$command->indexExists(self::index())){
      $command->createIndex(self::index());
    }
        $command->setMapping(self::index(), self::type(), self::mapping());
    }
  
  public static function getMapping(){
    $db = self::getDb();
        $command = $db->createCommand();
    return $command->getMapping();
  }
  
  
  
}

使用:

public function actionMapping($websiteIds){
    $arr = explode(",",$websiteIds);
    foreach($arr as $website_id){
      TraceData::initDb($website_id);
      TraceData::updateMapping();
      $map = TraceData::getMapping();
      var_dump($map);
    }
}

通过updateMapping来更新mapping

通过getMapping得到定义好的mapping

在这里的一个坑就是:在添加表(type)mapping的时候,需要提前定义Index(相当于mysql的db),才能添加type(相当于表),否则添加不上,或者报错。

其他资料:
https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html

http://www.open-open.com/lib/view/open1455452874636.html

Yii2 – 批量插入数据到 elasticSearch

elasticSearch 是目前来说,最强大的开源搜索引擎,对于一些搜索,放到ElasticSearch中,速度会快很多,当然,这个玩意也是非常消耗资源。

下面是,使用yii2,将数据批量导入到ES中,单行插入的效率太低,使用批量插入,速度还是可以。

安装ElasticSearch 这个参看

安装ElasticSearch ,以及在yii2中的使用

2. 安装yii2-ElasticSearch插件

https://github.com/yiisoft/yii2-elasticsearch

3. 配置

'elasticsearch_TA' => [
    'class' => 'yii\elasticsearch\Connection',
    'nodes' => [
        ['http_address' => '192.168.0.199:9200'],
        ['http_address' => '192.168.0.210:9200'],
    ],
],

4.使用

传递数据,我们还是用shell 脚本来传递数据 /appta/shell/customer/syncCustomerDataToEs.sh

#!/bin/sh

DIR=$(cd `dirname $0`; pwd)
# sync mongodb to elasticsearch
echo 'sync custom data to es'
processDate=$1
websiteIds=$2

arr=$(echo $websiteIds|tr "," "\n");
for website_id in $arr; do
  echo "website_id:".$website_id;
  variable=`$DIR/../../../yii ta/migrate/elasticsearch/customerdatapagecount $processDate $website_id`
  echo "$variable.."
  for (( i=1; i<=$variable; i++ ))
  do 
    $DIR/../../../yii ta/migrate/elasticsearch/customerdata $processDate $website_id $i 
    echo "Page $i done"
  done
done







controller文件:

<?php
namespace appadmin\code\Ta\console\migrate;
use Yii;
use appadmin\code\Ta\models\WebsiteBaseInfo;
use yii\console\Controller;
use appadmin\code\Ta\helper\mongoDb as MongoDb;
use appadmin\code\Ta\models\mongo\CustomerData as MgCustomerData;
use appadmin\code\Ta\models\elasticSearch\CustomerData as EsCustomerData;

use appadmin\code\Ta\models\mongo\TraceData as MgTraceData;
use appadmin\code\Ta\models\elasticSearch\TraceData as EsTraceData;


class ElasticsearchController extends Controller
{
  public $numPerPage = 1000;
  //public $dbName = "ta_".$processDate;
  //public $collName;
  
  public function initParam($processDate,$website_id){
    //$thidbName = "ta_".$processDate;
    $collName = "ta_".$website_id."_customer_data";
    //echo $processDate;exit;
    MongoDb::setDbByDate($processDate);
    MgCustomerData::initCollName($website_id);
    MgTraceData::initCollName($website_id);
  }
  # customer data  数据的总页数
  public function actionCustomerdatapagecount($processDate,$website_id){
    $this->initParam($processDate,$website_id);
    $count =  MgCustomerData::find()->count();
    //var_dump(MgCustomerData::getDb());
    //echo $count;exit;
    echo ceil($count/$this->numPerPage);
  }
  # 同步customer data的数据到ElasticSearch
  public function actionCustomerdata($processDate,$website_id,$pageNum){
    $this->initParam($processDate,$website_id);
    $skip = $this->numPerPage * ($pageNum - 1);
    $data = MgCustomerData::find()
        ->asArray()
        ->limit($this->numPerPage)
        ->offset($skip)
        ->all();
    $arr = [];
    $i = 0;
        
    if(is_array($data) && !empty($data )){
      $elasticsearch = Yii::$app->elasticsearch_TA;
      $bulkclient = $elasticsearch->createBulkCommand();
      //EsCustomerData::initDb($website_id);
      $index_name = 'ta_'.$website_id;
      $one_day_type = 'customer_data';
      //$EsCustomerDataOne = EsCustomerData::findOne($a['_id']);
      foreach($data  as $one){
        $i++;
        $a = [];
        $a['id'] = $one['_id'];
        $value = $one['value'];
        if(is_array($value) && !empty($value )){
          foreach($value  as $k => $v){
            if($k == 'data'){
              //var_dump($v);
              $v = serialize($v);
            }
            $a[$k] = $v;
          }
        }
        
        $bulkclient->addAction(array(
          'index' => array(
            '_index'=> $index_name,
            '_type' => $one_day_type,
            '_id' 	=> $one['_id'],
          )
        ), $a);
        /*
        # 保存数据到ES
        EsCustomerData::initDb($website_id);
        $EsCustomerDataOne = EsCustomerData::findOne($a['_id']);
        if(!$EsCustomerDataOne){
          $EsCustomerDataOne = new EsCustomerData;
          $EsCustomerDataOne->setPrimaryKey($a['_id']);
        }
        $EsCustomerDataOne->id = $a['_id'];
        $attributes = $EsCustomerDataOne->attributes();
        foreach($a as $k=>$v){
          if(in_array($k,$attributes)){
            if($k == 'data'){
              //var_dump($v);
              $v = serialize($v);
            }
            $EsCustomerDataOne[$k] = $v;
          }
        }
        $mtime=explode(' ',microtime());
        $startTime=$mtime[1]+$mtime[0];        
        
        $EsCustomerDataOne->save();
        $mtime=explode(' ',microtime());
        $endTime=$mtime[1]+$mtime[0];        
        echo "chaju_time :($i)".($endTime-$startTime)."\n"; 
        //$arr[] = $a; 
        */
      }
      $bulkclient->execute();
    }
    
    
  }
    
  # customer data  数据的总页数
  public function actionTracedatapagecount($processDate,$website_id){
    $this->initParam($processDate,$website_id);
    $count =  MgTraceData::find()->count();
    //var_dump(MgCustomerData::getDb());
    //echo $count;exit;
    echo ceil($count/$this->numPerPage);
  }
  # 同步customer data的数据到ElasticSearch
  public function actionTracedata($processDate,$website_id,$pageNum){
    $this->initParam($processDate,$website_id);
    $skip = $this->numPerPage * ($pageNum - 1);
    $data = MgTraceData::find()
        ->asArray()
        ->limit($this->numPerPage)
        ->offset($skip)
        ->all();
    $arr = [];
    $i = 0;
        
    if(is_array($data) && !empty($data )){
      $elasticsearch = Yii::$app->elasticsearch_TA;
      $bulkclient = $elasticsearch->createBulkCommand();
      //EsCustomerData::initDb($website_id);
      $index_name = 'ta_'.$website_id;
      $one_day_type = 'trace_data';
      //$EsCustomerDataOne = EsCustomerData::findOne($a['_id']);
      foreach($data  as $one){
        $i++;
        $a = [];
        
        if(is_array($one) && !empty($one )){
          foreach($one  as $k => $v){
            $a[$k] = $v;
          }
        }
        $a['id'] = $a['_id'];
        unset($a['_id']);
        
        $bulkclient->addAction(array(
          'index' => array(
            '_index'=> $index_name,
            '_type' => $one_day_type,
            '_id' 	=> $one['_id'],
          )
        ), $a);
        
      }
      $bulkclient->execute();
    }
    
    
  }	
    
    
    
    
    
    
    
    
    
    
    
}

appadmin\code\Ta\models\mongo\CustomerData

<?php  
# 商家SELLER 和  对应的 SELLERID 的设置。 
namespace appadmin\code\Ta\models\mongo; 
use yii\mongodb\ActiveRecord;
use fec\helpers\CDate;
use fec\helpers\CConfig;
use Yii;
use appadmin\code\Ta\helper\mongoDb;
# use appadmin\code\Ta\models\mongo\CustomerData; 
class CustomerData extends ActiveRecord  
{  
  
  public static $_collectionName;
  
  # 定义db
  public static function getDb()
    {
    return \Yii::$app->get('mongodb_ta_date');
    }
  
  
  
  # 定义collection name  
    public static function collectionName()  
    {  
        return self::$_collectionName;  
    }  
  
  
  
  
  public static function initCollName($website_id){
    self::$_collectionName = "ta_".$website_id."_customer_data";
  }
  
  
  public function attributes()
    {
        // path mapping for '_id' is setup to field 'id'
        return [
      '_id', 
      'value',
      
    ];
    }
  
  
}  

appadmin\code\Ta\models\ElasticSearch\CustomerData

<?php

namespace appadmin\code\Ta\models\elasticSearch;

use yii\elasticsearch\ActiveRecord;

class CustomerData extends ActiveRecord
{
  public static $currentIndex;
  
  # 定义db链接
  public static function getDb()
  {
    return \Yii::$app->get('elasticsearch_TA');
  }
  
  # 不同的website 使用的是不同的db ,使用前需要先初始化
  # db的名字
  public static function initDb($website_id){
    //echo 888;
    if($website_id){
      //echo 999;
      self::$currentIndex = 'ta'."_".$website_id;
      //echo self::$currentIndex;
      //echo 3;
    }
  }
  
  
  
  # db
  public static function index()
  {
    return self::$currentIndex;
  }
  # table
  public static function type()
  {
    return 'customer_data';
  }
  
   public function attributes()
    {
        // path mapping for '_id' is setup to field 'id'
        return [
      'id',
      
      'uuid',
      'customer_id',
      'pv',
      
      'ip',
      'service_date_str',
      'service_datetime',
      'service_timestamp',
      'devide',
      'user_agent',
      'browser_name',
      'browser_version',
      'browser_date',
      'browser_lang',
      'operate',
      'operate_relase',
      'domain',
      'url',
      'title',
      'refer_url',
      'first_referrer_domain',
      'is_return',
      'uuid',
      'device_pixel_ratio',
      'resolution',
      'color_depth',
      'website_id',
      'sku',
      'country_code',
      'country_name',
      
      'data',
      
      'order_status',
      'cart',
      'order',
      'category',
      'login_email',
      'register_email',
      'search',
      'currency',
      'stay_seconds',
    ];
    }
  
  
  
}