From caac2cea445ce03123d7d8dc5ef1a8d7f7b901da Mon Sep 17 00:00:00 2001 From: Alexei Sorokin Date: Sat, 1 Aug 2020 19:05:48 +0300 Subject: [PATCH] [SCHEMA] Better DBMS information fetching On PostgreSQL: - Parse defaults for strings and booleans properly. - Parse the "serial" definition type properly. - Get information on the "enum" definition type. - Re-work getting information about keys/indices. On MariaDB: - Get information about lengths in indices. - Get foreign key information separately from the rest as they can have colliding names. --- lib/database/mysqlschema.php | 183 +++++++++++--------- lib/database/pgsqlschema.php | 324 ++++++++++++++++++++--------------- 2 files changed, 288 insertions(+), 219 deletions(-) diff --git a/lib/database/mysqlschema.php b/lib/database/mysqlschema.php index 9b710b2902..b9d4ca2ccb 100644 --- a/lib/database/mysqlschema.php +++ b/lib/database/mysqlschema.php @@ -157,50 +157,37 @@ class MysqlSchema extends Schema } if ($hasKeys) { - // INFORMATION_SCHEMA's CONSTRAINTS and KEY_COLUMN_USAGE tables give - // good info on primary and unique keys but don't list ANY info on - // multi-value keys, which is lame-o. Sigh. - $keyColumns = $this->fetchMetaInfo($table, 'KEY_COLUMN_USAGE', 'CONSTRAINT_NAME, ORDINAL_POSITION'); - $keys = []; - $fkeys = []; + $key_info = $this->fetchKeyInfo($table); - foreach ($keyColumns as $row) { - $keyName = $row['CONSTRAINT_NAME']; - $keyCol = $row['COLUMN_NAME']; - if (!isset($keys[$keyName])) { - $keys[$keyName] = []; - } - $keys[$keyName][] = $keyCol; - if (!is_null($row['REFERENCED_TABLE_NAME'])) { - $fkeys[] = $keyName; - } - } - - foreach ($keys as $keyName => $cols) { - if ($keyName === 'PRIMARY') { - $def['primary key'] = $cols; - } elseif (in_array($keyName, $fkeys)) { - $fkey = $this->fetchForeignKeyInfo($table, $keyName); - $colMap = array_combine($cols, $fkey['cols']); - $def['foreign keys'][$keyName] = [$fkey['table_name'], $colMap]; - } else { - $def['unique keys'][$keyName] = $cols; - } - } - - $indexInfo = $this->fetchIndexInfo($table); - - foreach ($indexInfo as $row) { - $keyName = $row['key_name']; + foreach ($key_info as $row) { + $key_name = $row['key_name']; $cols = $row['cols']; - if ($row['key_type'] === 'FULLTEXT') { - $def['fulltext indexes'][$keyName] = $cols; - } else { - $def['indexes'][$keyName] = $cols; + switch ($row['key_type']) { + case 'PRIMARY': + $def['primary key'] = $cols; + break; + case 'UNIQUE': + $def['unique keys'][$key_name] = $cols; + break; + case 'FULLTEXT': + $def['fulltext indexes'][$key_name] = $cols; + break; + default: + $def['indexes'][$key_name] = $cols; } } } + + $foreign_key_info = $this->fetchForeignKeyInfo($table); + + foreach ($foreign_key_info as $row) { + $key_name = $row['key_name']; + $cols = $row['cols']; + $ref_table = $row['ref_table']; + + $def['foreign keys'][$key_name] = [$ref_table, $cols]; + } return $def; } @@ -235,14 +222,17 @@ class MysqlSchema extends Schema */ public function fetchMetaInfo($table, $infoTable, $orderBy = null) { - $query = "SELECT * FROM INFORMATION_SCHEMA.%s " . - "WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s'"; $schema = $this->conn->dsn['database']; - $sql = sprintf($query, $infoTable, $schema, $table); - if ($orderBy) { - $sql .= ' ORDER BY ' . $orderBy; - } - return $this->fetchQueryData($sql); + return $this->fetchQueryData(sprintf( + <<<'END' + SELECT * FROM INFORMATION_SCHEMA.%1$s + WHERE TABLE_SCHEMA = '%2$s' AND TABLE_NAME = '%3$s'%4$s; + END, + $this->quoteIdentifier($infoTable), + $schema, + $table, + ($orderBy ? " ORDER BY {$orderBy}" : '') + )); } /** @@ -252,64 +242,89 @@ class MysqlSchema extends Schema * @return array of arrays * @throws PEAR_Exception */ - public function fetchIndexInfo(string $table): array + private function fetchKeyInfo(string $table): array + { + $schema = $this->conn->dsn['database']; + $data = $this->fetchQueryData( + <<conn->dsn['database']; $data = $this->fetchQueryData( << $row['ref_col']]; + unset($row['col']); + unset($row['ref_col']); $rows[$name] = $row; + } else { + $rows[$name]['cols'][$row['col']] = $row['ref_col']; } } + return array_values($rows); } - /** - * @param string $table - * @param string $constraint_name - * @return array array of rows with keys: table_name, cols (array of strings) - * @throws PEAR_Exception - */ - public function fetchForeignKeyInfo(string $table, string $constraint_name): array - { - $query = 'SELECT REFERENCED_TABLE_NAME AS `table_name`, REFERENCED_COLUMN_NAME AS `col` ' . - 'FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE ' . - 'WHERE TABLE_SCHEMA = \'%s\' AND TABLE_NAME = \'%s\' AND CONSTRAINT_NAME = \'%s\' ' . - 'AND REFERENCED_TABLE_SCHEMA IS NOT NULL ' . - 'ORDER BY POSITION_IN_UNIQUE_CONSTRAINT'; - $schema = $this->conn->dsn['database']; - $sql = sprintf($query, $schema, $table, $constraint_name); - $data = $this->fetchQueryData($sql); - if (count($data) < 1) { - throw new Exception('Could not find foreign key ' . $constraint_name . ' on table ' . $table); - } - - $info = [ - 'table_name' => $data[0]['table_name'], - 'cols' => [], - ]; - foreach ($data as $row) { - $info['cols'][] = $row['col']; - } - return $info; - } - /** * Append an SQL statement with an index definition for a full-text search * index over one or more columns on a table. diff --git a/lib/database/pgsqlschema.php b/lib/database/pgsqlschema.php index 38f7566eaf..63d1f5cf0c 100644 --- a/lib/database/pgsqlschema.php +++ b/lib/database/pgsqlschema.php @@ -80,12 +80,11 @@ class PgsqlSchema extends Schema throw new SchemaTableMissingException("No such table: $table"); } - // We'll need to match up fields by ordinal reference - $orderedFields = []; + // Get information on the emulated "enum" type + $enum_info = $this->fetchEnumInfo($table); foreach ($columns as $row) { $name = $row['column_name']; - $orderedFields[$row['ordinal_position']] = $name; $field = []; $field['type'] = $type = $row['udt_name']; @@ -108,68 +107,66 @@ class PgsqlSchema extends Schema if ($row['is_nullable'] == 'NO') { $field['not null'] = true; } - if ($row['column_default'] !== null) { - $field['default'] = $row['column_default']; + $col_default = $row['column_default']; + if (!is_null($col_default)) { if ($this->isNumericType($field)) { - $field['default'] = (int) $field['default']; + $field['default'] = (int) $col_default; + } elseif ($type === 'bool') { + $field['default'] = ($col_default === 'true') ? true : false; + } else { + $match = "/^'(.*)'(::.+)*$/"; + if (preg_match($match, $col_default)) { + $field['default'] = preg_replace( + $match, + '\1', + $col_default + ); + } else { + $field['default'] = $col_default; + } } } + if ( + $row['is_identity'] === 'YES' + && $row['identity_generation'] = 'BY DEFAULT' + ) { + $field['auto_increment'] = true; + } elseif (array_key_exists($name, $enum_info)) { + $field['type'] = $type = 'enum'; + $field['enum'] = $enum_info[$name]; + } $def['fields'][$name] = $field; } - // Pulling index info from pg_class & pg_index - // This can give us primary & unique key info, but not foreign key constraints - // so we exclude them and pick them up later. - $indexInfo = $this->fetchIndexInfo($table); + $key_info = $this->fetchKeyInfo($table); - foreach ($indexInfo as $row) { - $keyName = $row['key_name']; + foreach ($key_info as $row) { + $key_name = $row['key_name']; + $cols = $row['cols']; - // Dig the column references out! - // - // These are inconvenient arrays with partial references to the - // pg_att table, but since we've already fetched up the column - // info on the current table, we can look those up locally. - $cols = []; - $colPositions = explode(' ', $row['indkey']); - foreach ($colPositions as $ord) { - if ($ord == 0) { - $cols[] = 'FUNCTION'; // @fixme - } else { - $cols[] = $orderedFields[$ord]; - } - } - - $def['indexes'][$keyName] = $cols; - } - - // Pull constraint data from INFORMATION_SCHEMA: - // Primary key, unique keys, foreign keys - $keyColumns = $this->fetchMetaInfo($table, 'key_column_usage', 'constraint_name,ordinal_position'); - $keys = []; - - foreach ($keyColumns as $row) { - $keyName = $row['constraint_name']; - $keyCol = $row['column_name']; - if (!isset($keys[$keyName])) { - $keys[$keyName] = []; - } - $keys[$keyName][] = $keyCol; - } - - foreach ($keys as $keyName => $cols) { - // name hack -- is this reliable? - if ($keyName == "{$table}_pkey") { - $def['primary key'] = $cols; - } elseif (preg_match("/^{$table}_(.*)_fkey$/", $keyName, $matches)) { - $fkey = $this->fetchForeignKeyInfo($table, $keyName); - $colMap = array_combine($cols, $fkey['col_names']); - $def['foreign keys'][$keyName] = [$fkey['table_name'], $colMap]; - } else { - $def['unique keys'][$keyName] = $cols; + switch ($row['key_type']) { + case 'primary': + $def['primary key'] = $cols; + break; + case 'unique': + $def['unique keys'][$key_name] = $cols; + break; + default: + $def['indexes'][$key_name] = $cols; } } + + $foreign_key_info = $this->fetchForeignKeyInfo($table); + + foreach ($foreign_key_info as $row) { + $key_name = $row['key_name']; + $cols = $row['cols']; + $ref_table = $row['ref_table']; + + $def['foreign keys'][$key_name] = [$ref_table, $cols]; + } + return $def; } @@ -184,90 +181,158 @@ class PgsqlSchema extends Schema */ public function fetchMetaInfo($table, $infoTable, $orderBy = null) { - $query = "SELECT * FROM information_schema.%s " . - "WHERE table_name='%s'"; - $sql = sprintf($query, $infoTable, $table); - if ($orderBy) { - $sql .= ' ORDER BY ' . $orderBy; - } - return $this->fetchQueryData($sql); + $catalog = $this->conn->dsn['database']; + return $this->fetchQueryData(sprintf( + <<<'END' + SELECT * FROM information_schema.%1$s + WHERE table_catalog = '%2$s' AND table_name = '%3$s'%4$s; + END, + $this->quoteIdentifier($infoTable), + $catalog, + $table, + ($orderBy ? " ORDER BY {$orderBy}" : '') + )); } /** - * Pull some PG-specific index info + * Pull index and keys information for the given table. + * * @param string $table * @return array of arrays * @throws PEAR_Exception */ - public function fetchIndexInfo(string $table): array + private function fetchKeyInfo(string $table): array { - return $this->fetchQueryData( + $data = $this->fetchQueryData(sprintf( + <<<'EOT' + SELECT "rel"."relname" AS "key_name", + CASE + WHEN "idx"."indisprimary" IS TRUE THEN 'primary' + WHEN "idx"."indisunique" IS TRUE THEN 'unique' + ELSE "am"."amname" + END AS "key_type", + "cols"."attname" AS "col" + FROM pg_index AS "idx" + CROSS JOIN LATERAL unnest("idx"."indkey") + WITH ORDINALITY AS "col_nums" ("num", "pos") + INNER JOIN pg_class AS "rel" + ON "idx"."indexrelid" = "rel".oid + LEFT JOIN pg_attribute AS "cols" + ON "idx"."indrelid" = "cols"."attrelid" + AND "col_nums"."num" = "cols"."attnum" + LEFT JOIN pg_am AS "am" + ON "rel"."relam" = "am".oid + WHERE "idx"."indrelid" = CAST('%s' AS REGCLASS) + ORDER BY "key_type", "key_name", "col_nums"."pos"; + EOT, + $table + )); + + $rows = []; + foreach ($data as $row) { + $name = $row['key_name']; + + if (!array_key_exists($name, $rows)) { + $row['cols'] = [$row['col']]; + + unset($row['col']); + $rows[$name] = $row; + } else { + $rows[$name]['cols'][] = $row['col']; + } + } + + return array_values($rows); + } + + /** + * Pull foreign key information for the given table. + * + * @param string $table + * @return array array of arrays + * @throws PEAR_Exception + */ + private function fetchForeignKeyInfo(string $table): array + { + $data = $this->fetchQueryData(sprintf( + <<<'END' + SELECT "con"."conname" AS "key_name", + "cols"."attname" AS "col", + "ref_rel"."relname" AS "ref_table", + "ref_cols"."attname" AS "ref_col" + FROM pg_constraint AS "con" + CROSS JOIN LATERAL unnest("con"."conkey", "con"."confkey") + WITH ORDINALITY AS "col_nums" ("num", "ref_num", "pos") + LEFT JOIN pg_attribute AS "cols" + ON "con"."conrelid" = "cols"."attrelid" + AND "col_nums"."num" = "cols"."attnum" + LEFT JOIN pg_class AS "ref_rel" + ON "con"."confrelid" = "ref_rel".oid + LEFT JOIN pg_attribute AS "ref_cols" + ON "con"."confrelid" = "ref_cols"."attrelid" + AND "col_nums"."ref_num" = "ref_cols"."attnum" + WHERE "con"."contype" = 'f' + AND "con"."conrelid" = CAST('%s' AS REGCLASS) + ORDER BY "key_name", "col_nums"."pos"; + END, + $table + )); + + $rows = []; + foreach ($data as $row) { + $name = $row['key_name']; + + if (!array_key_exists($name, $rows)) { + $row['cols'] = [$row['col'] => $row['ref_col']]; + + unset($row['col']); + unset($row['ref_col']); + $rows[$name] = $row; + } else { + $rows[$name]['cols'][$row['col']] = $row['ref_col']; + } + } + + return array_values($rows); + } + + /** + * Pull information about the emulated enum columns + * + * @param string $table + * @return array of arrays + * @throws PEAR_Exception + */ + private function fetchEnumInfo($table) + { + $data = $this->fetchQueryData( <<fetchQueryData($sql); - if (count($data) < 1) { - throw new Exception('Could not find foreign key ' . $constraint_name . ' on table ' . $table); - } - - $row = $data[0]; - return [ - 'table_name' => $row['table_name'], - 'col_names' => $this->getTableColumnNames($row['table_id'], $row['col_indices']) - ]; - } - - /** - * - * @param int $table_id - * @param array $col_indexes - * @return array of strings - * @throws PEAR_Exception - */ - public function getTableColumnNames($table_id, $col_indexes) - { - $indexes = array_map('intval', explode(' ', $col_indexes)); - $query = 'SELECT attnum AS col_index, attname AS col_name ' . - 'FROM pg_attribute where attrelid=%d ' . - 'AND attnum IN (%s)'; - $sql = sprintf($query, $table_id, implode(',', $indexes)); - $data = $this->fetchQueryData($sql); - - $byId = []; + $rows = []; foreach ($data as $row) { - $byId[$row['col_index']] = $row['col_name']; - } + // PostgreSQL can show either + $name_regex = '(' . preg_quote($this->quoteIdentifier($row['col'])) + . '|' . preg_quote($row['col']) . ')'; - $out = []; - foreach ($indexes as $id) { - $out[] = $byId[$id]; + $enum = explode("'::text, '", preg_replace( + "/^\({$name_regex} = ANY \(ARRAY\['(.+)'::text]\)\)$/D", + '\2', + $row['check'] + )); + $rows[$row['col']] = $enum; } - return $out; + return $rows; } private function isNumericType(array $cd): bool @@ -293,16 +358,6 @@ class PgsqlSchema extends Schema $line = []; $line[] = parent::columnSql($name, $cd); - /* - if ($table['foreign keys'][$name]) { - foreach ($table['foreign keys'][$name] as $foreignTable => $foreignColumn) { - $line[] = 'references'; - $line[] = $this->quoteIdentifier($foreignTable); - $line[] = '(' . $this->quoteIdentifier($foreignColumn) . ')'; - } - } - */ - // This'll have been added from our transform of 'serial' type if (!empty($cd['auto_increment'])) { $line[] = 'GENERATED BY DEFAULT AS IDENTITY'; @@ -328,8 +383,7 @@ class PgsqlSchema extends Schema 'integer' => 'int', 'char' => 'bpchar', 'datetime' => 'timestamp', - 'blob' => 'bytea', - 'enum' => 'text', + 'blob' => 'bytea' ]; $type = $column['type'];