[SCHEMA] Better DBMS information fetching

On PostgreSQL:
  - Parse defaults for strings and booleans properly.
  - Parse the "serial" definition type properly.
  - Get information on the "enum" definition type.
  - Re-work getting information about keys/indices.

On MariaDB:
  - Get information about lengths in indices.
  - Get foreign key information separately from the rest as they can have
    colliding names.
This commit is contained in:
Alexei Sorokin 2020-08-01 19:05:48 +03:00 committed by Diogo Peralta Cordeiro
parent d55488cdec
commit caac2cea44
2 changed files with 288 additions and 219 deletions

View File

@ -157,50 +157,37 @@ class MysqlSchema extends Schema
} }
if ($hasKeys) { if ($hasKeys) {
// INFORMATION_SCHEMA's CONSTRAINTS and KEY_COLUMN_USAGE tables give $key_info = $this->fetchKeyInfo($table);
// good info on primary and unique keys but don't list ANY info on
// multi-value keys, which is lame-o. Sigh.
$keyColumns = $this->fetchMetaInfo($table, 'KEY_COLUMN_USAGE', 'CONSTRAINT_NAME, ORDINAL_POSITION');
$keys = [];
$fkeys = [];
foreach ($keyColumns as $row) { foreach ($key_info as $row) {
$keyName = $row['CONSTRAINT_NAME']; $key_name = $row['key_name'];
$keyCol = $row['COLUMN_NAME'];
if (!isset($keys[$keyName])) {
$keys[$keyName] = [];
}
$keys[$keyName][] = $keyCol;
if (!is_null($row['REFERENCED_TABLE_NAME'])) {
$fkeys[] = $keyName;
}
}
foreach ($keys as $keyName => $cols) {
if ($keyName === 'PRIMARY') {
$def['primary key'] = $cols;
} elseif (in_array($keyName, $fkeys)) {
$fkey = $this->fetchForeignKeyInfo($table, $keyName);
$colMap = array_combine($cols, $fkey['cols']);
$def['foreign keys'][$keyName] = [$fkey['table_name'], $colMap];
} else {
$def['unique keys'][$keyName] = $cols;
}
}
$indexInfo = $this->fetchIndexInfo($table);
foreach ($indexInfo as $row) {
$keyName = $row['key_name'];
$cols = $row['cols']; $cols = $row['cols'];
if ($row['key_type'] === 'FULLTEXT') { switch ($row['key_type']) {
$def['fulltext indexes'][$keyName] = $cols; case 'PRIMARY':
} else { $def['primary key'] = $cols;
$def['indexes'][$keyName] = $cols; break;
case 'UNIQUE':
$def['unique keys'][$key_name] = $cols;
break;
case 'FULLTEXT':
$def['fulltext indexes'][$key_name] = $cols;
break;
default:
$def['indexes'][$key_name] = $cols;
} }
} }
} }
$foreign_key_info = $this->fetchForeignKeyInfo($table);
foreach ($foreign_key_info as $row) {
$key_name = $row['key_name'];
$cols = $row['cols'];
$ref_table = $row['ref_table'];
$def['foreign keys'][$key_name] = [$ref_table, $cols];
}
return $def; return $def;
} }
@ -235,14 +222,17 @@ class MysqlSchema extends Schema
*/ */
public function fetchMetaInfo($table, $infoTable, $orderBy = null) public function fetchMetaInfo($table, $infoTable, $orderBy = null)
{ {
$query = "SELECT * FROM INFORMATION_SCHEMA.%s " .
"WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s'";
$schema = $this->conn->dsn['database']; $schema = $this->conn->dsn['database'];
$sql = sprintf($query, $infoTable, $schema, $table); return $this->fetchQueryData(sprintf(
if ($orderBy) { <<<'END'
$sql .= ' ORDER BY ' . $orderBy; SELECT * FROM INFORMATION_SCHEMA.%1$s
} WHERE TABLE_SCHEMA = '%2$s' AND TABLE_NAME = '%3$s'%4$s;
return $this->fetchQueryData($sql); END,
$this->quoteIdentifier($infoTable),
$schema,
$table,
($orderBy ? " ORDER BY {$orderBy}" : '')
));
} }
/** /**
@ -252,64 +242,89 @@ class MysqlSchema extends Schema
* @return array of arrays * @return array of arrays
* @throws PEAR_Exception * @throws PEAR_Exception
*/ */
public function fetchIndexInfo(string $table): array private function fetchKeyInfo(string $table): array
{
$schema = $this->conn->dsn['database'];
$data = $this->fetchQueryData(
<<<EOT
SELECT INDEX_NAME AS `key_name`,
CASE
WHEN INDEX_NAME = 'PRIMARY' THEN 'PRIMARY'
WHEN NON_UNIQUE IS NOT TRUE THEN 'UNIQUE'
ELSE INDEX_TYPE
END AS `key_type`,
COLUMN_NAME AS `col`,
SUB_PART AS `col_length`
FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = '{$schema}' AND TABLE_NAME = '{$table}'
ORDER BY `key_name`, `key_type`, SEQ_IN_INDEX;
EOT
);
$rows = [];
foreach ($data as $row) {
$name = $row['key_name'];
if (!is_null($row['col_length'])) {
$row['col'] = [$row['col'], (int) $row['col_length']];
}
unset($row['col_length']);
if (!array_key_exists($name, $rows)) {
$row['cols'] = [$row['col']];
unset($row['col']);
$rows[$name] = $row;
} else {
$rows[$name]['cols'][] = $row['col'];
}
}
return array_values($rows);
}
/**
* Pull foreign key information for the given table.
*
* @param string $table
* @return array array of arrays
* @throws PEAR_Exception
*/
private function fetchForeignKeyInfo(string $table): array
{ {
$schema = $this->conn->dsn['database']; $schema = $this->conn->dsn['database'];
$data = $this->fetchQueryData( $data = $this->fetchQueryData(
<<<END <<<END
SELECT INDEX_NAME AS `key_name`, INDEX_TYPE AS `key_type`, COLUMN_NAME AS `col` SELECT CONSTRAINT_NAME AS `key_name`,
FROM INFORMATION_SCHEMA.STATISTICS COLUMN_NAME AS `col`,
REFERENCED_TABLE_NAME AS `ref_table`,
REFERENCED_COLUMN_NAME AS `ref_col`
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = '{$schema}' WHERE TABLE_SCHEMA = '{$schema}'
AND TABLE_NAME = '{$table}' AND TABLE_NAME = '{$table}'
AND NON_UNIQUE IS TRUE AND REFERENCED_TABLE_SCHEMA = '{$schema}'
ORDER BY SEQ_IN_INDEX; ORDER BY `key_name`, ORDINAL_POSITION;
END END
); );
$rows = []; $rows = [];
foreach ($data as $row) { foreach ($data as $row) {
$name = $row['key_name']; $name = $row['key_name'];
if (isset($rows[$name])) {
$rows[$name]['cols'][] = $row['col']; if (!array_key_exists($name, $rows)) {
} else { $row['cols'] = [$row['col'] => $row['ref_col']];
$row['cols'] = [$row['col']];
unset($row['col']); unset($row['col']);
unset($row['ref_col']);
$rows[$name] = $row; $rows[$name] = $row;
} else {
$rows[$name]['cols'][$row['col']] = $row['ref_col'];
} }
} }
return array_values($rows); return array_values($rows);
} }
/**
* @param string $table
* @param string $constraint_name
* @return array array of rows with keys: table_name, cols (array of strings)
* @throws PEAR_Exception
*/
public function fetchForeignKeyInfo(string $table, string $constraint_name): array
{
$query = 'SELECT REFERENCED_TABLE_NAME AS `table_name`, REFERENCED_COLUMN_NAME AS `col` ' .
'FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE ' .
'WHERE TABLE_SCHEMA = \'%s\' AND TABLE_NAME = \'%s\' AND CONSTRAINT_NAME = \'%s\' ' .
'AND REFERENCED_TABLE_SCHEMA IS NOT NULL ' .
'ORDER BY POSITION_IN_UNIQUE_CONSTRAINT';
$schema = $this->conn->dsn['database'];
$sql = sprintf($query, $schema, $table, $constraint_name);
$data = $this->fetchQueryData($sql);
if (count($data) < 1) {
throw new Exception('Could not find foreign key ' . $constraint_name . ' on table ' . $table);
}
$info = [
'table_name' => $data[0]['table_name'],
'cols' => [],
];
foreach ($data as $row) {
$info['cols'][] = $row['col'];
}
return $info;
}
/** /**
* Append an SQL statement with an index definition for a full-text search * Append an SQL statement with an index definition for a full-text search
* index over one or more columns on a table. * index over one or more columns on a table.

View File

@ -80,12 +80,11 @@ class PgsqlSchema extends Schema
throw new SchemaTableMissingException("No such table: $table"); throw new SchemaTableMissingException("No such table: $table");
} }
// We'll need to match up fields by ordinal reference // Get information on the emulated "enum" type
$orderedFields = []; $enum_info = $this->fetchEnumInfo($table);
foreach ($columns as $row) { foreach ($columns as $row) {
$name = $row['column_name']; $name = $row['column_name'];
$orderedFields[$row['ordinal_position']] = $name;
$field = []; $field = [];
$field['type'] = $type = $row['udt_name']; $field['type'] = $type = $row['udt_name'];
@ -108,68 +107,66 @@ class PgsqlSchema extends Schema
if ($row['is_nullable'] == 'NO') { if ($row['is_nullable'] == 'NO') {
$field['not null'] = true; $field['not null'] = true;
} }
if ($row['column_default'] !== null) { $col_default = $row['column_default'];
$field['default'] = $row['column_default']; if (!is_null($col_default)) {
if ($this->isNumericType($field)) { if ($this->isNumericType($field)) {
$field['default'] = (int) $field['default']; $field['default'] = (int) $col_default;
} elseif ($type === 'bool') {
$field['default'] = ($col_default === 'true') ? true : false;
} else {
$match = "/^'(.*)'(::.+)*$/";
if (preg_match($match, $col_default)) {
$field['default'] = preg_replace(
$match,
'\1',
$col_default
);
} else {
$field['default'] = $col_default;
} }
} }
}
if (
$row['is_identity'] === 'YES'
&& $row['identity_generation'] = 'BY DEFAULT'
) {
$field['auto_increment'] = true;
} elseif (array_key_exists($name, $enum_info)) {
$field['type'] = $type = 'enum';
$field['enum'] = $enum_info[$name];
}
$def['fields'][$name] = $field; $def['fields'][$name] = $field;
} }
// Pulling index info from pg_class & pg_index $key_info = $this->fetchKeyInfo($table);
// This can give us primary & unique key info, but not foreign key constraints
// so we exclude them and pick them up later.
$indexInfo = $this->fetchIndexInfo($table);
foreach ($indexInfo as $row) { foreach ($key_info as $row) {
$keyName = $row['key_name']; $key_name = $row['key_name'];
$cols = $row['cols'];
// Dig the column references out! switch ($row['key_type']) {
// case 'primary':
// These are inconvenient arrays with partial references to the
// pg_att table, but since we've already fetched up the column
// info on the current table, we can look those up locally.
$cols = [];
$colPositions = explode(' ', $row['indkey']);
foreach ($colPositions as $ord) {
if ($ord == 0) {
$cols[] = 'FUNCTION'; // @fixme
} else {
$cols[] = $orderedFields[$ord];
}
}
$def['indexes'][$keyName] = $cols;
}
// Pull constraint data from INFORMATION_SCHEMA:
// Primary key, unique keys, foreign keys
$keyColumns = $this->fetchMetaInfo($table, 'key_column_usage', 'constraint_name,ordinal_position');
$keys = [];
foreach ($keyColumns as $row) {
$keyName = $row['constraint_name'];
$keyCol = $row['column_name'];
if (!isset($keys[$keyName])) {
$keys[$keyName] = [];
}
$keys[$keyName][] = $keyCol;
}
foreach ($keys as $keyName => $cols) {
// name hack -- is this reliable?
if ($keyName == "{$table}_pkey") {
$def['primary key'] = $cols; $def['primary key'] = $cols;
} elseif (preg_match("/^{$table}_(.*)_fkey$/", $keyName, $matches)) { break;
$fkey = $this->fetchForeignKeyInfo($table, $keyName); case 'unique':
$colMap = array_combine($cols, $fkey['col_names']); $def['unique keys'][$key_name] = $cols;
$def['foreign keys'][$keyName] = [$fkey['table_name'], $colMap]; break;
} else { default:
$def['unique keys'][$keyName] = $cols; $def['indexes'][$key_name] = $cols;
} }
} }
$foreign_key_info = $this->fetchForeignKeyInfo($table);
foreach ($foreign_key_info as $row) {
$key_name = $row['key_name'];
$cols = $row['cols'];
$ref_table = $row['ref_table'];
$def['foreign keys'][$key_name] = [$ref_table, $cols];
}
return $def; return $def;
} }
@ -184,90 +181,158 @@ class PgsqlSchema extends Schema
*/ */
public function fetchMetaInfo($table, $infoTable, $orderBy = null) public function fetchMetaInfo($table, $infoTable, $orderBy = null)
{ {
$query = "SELECT * FROM information_schema.%s " . $catalog = $this->conn->dsn['database'];
"WHERE table_name='%s'"; return $this->fetchQueryData(sprintf(
$sql = sprintf($query, $infoTable, $table); <<<'END'
if ($orderBy) { SELECT * FROM information_schema.%1$s
$sql .= ' ORDER BY ' . $orderBy; WHERE table_catalog = '%2$s' AND table_name = '%3$s'%4$s;
} END,
return $this->fetchQueryData($sql); $this->quoteIdentifier($infoTable),
$catalog,
$table,
($orderBy ? " ORDER BY {$orderBy}" : '')
));
} }
/** /**
* Pull some PG-specific index info * Pull index and keys information for the given table.
*
* @param string $table * @param string $table
* @return array of arrays * @return array of arrays
* @throws PEAR_Exception * @throws PEAR_Exception
*/ */
public function fetchIndexInfo(string $table): array private function fetchKeyInfo(string $table): array
{ {
return $this->fetchQueryData( $data = $this->fetchQueryData(sprintf(
<<<'EOT'
SELECT "rel"."relname" AS "key_name",
CASE
WHEN "idx"."indisprimary" IS TRUE THEN 'primary'
WHEN "idx"."indisunique" IS TRUE THEN 'unique'
ELSE "am"."amname"
END AS "key_type",
"cols"."attname" AS "col"
FROM pg_index AS "idx"
CROSS JOIN LATERAL unnest("idx"."indkey")
WITH ORDINALITY AS "col_nums" ("num", "pos")
INNER JOIN pg_class AS "rel"
ON "idx"."indexrelid" = "rel".oid
LEFT JOIN pg_attribute AS "cols"
ON "idx"."indrelid" = "cols"."attrelid"
AND "col_nums"."num" = "cols"."attnum"
LEFT JOIN pg_am AS "am"
ON "rel"."relam" = "am".oid
WHERE "idx"."indrelid" = CAST('%s' AS REGCLASS)
ORDER BY "key_type", "key_name", "col_nums"."pos";
EOT,
$table
));
$rows = [];
foreach ($data as $row) {
$name = $row['key_name'];
if (!array_key_exists($name, $rows)) {
$row['cols'] = [$row['col']];
unset($row['col']);
$rows[$name] = $row;
} else {
$rows[$name]['cols'][] = $row['col'];
}
}
return array_values($rows);
}
/**
* Pull foreign key information for the given table.
*
* @param string $table
* @return array array of arrays
* @throws PEAR_Exception
*/
private function fetchForeignKeyInfo(string $table): array
{
$data = $this->fetchQueryData(sprintf(
<<<'END'
SELECT "con"."conname" AS "key_name",
"cols"."attname" AS "col",
"ref_rel"."relname" AS "ref_table",
"ref_cols"."attname" AS "ref_col"
FROM pg_constraint AS "con"
CROSS JOIN LATERAL unnest("con"."conkey", "con"."confkey")
WITH ORDINALITY AS "col_nums" ("num", "ref_num", "pos")
LEFT JOIN pg_attribute AS "cols"
ON "con"."conrelid" = "cols"."attrelid"
AND "col_nums"."num" = "cols"."attnum"
LEFT JOIN pg_class AS "ref_rel"
ON "con"."confrelid" = "ref_rel".oid
LEFT JOIN pg_attribute AS "ref_cols"
ON "con"."confrelid" = "ref_cols"."attrelid"
AND "col_nums"."ref_num" = "ref_cols"."attnum"
WHERE "con"."contype" = 'f'
AND "con"."conrelid" = CAST('%s' AS REGCLASS)
ORDER BY "key_name", "col_nums"."pos";
END,
$table
));
$rows = [];
foreach ($data as $row) {
$name = $row['key_name'];
if (!array_key_exists($name, $rows)) {
$row['cols'] = [$row['col'] => $row['ref_col']];
unset($row['col']);
unset($row['ref_col']);
$rows[$name] = $row;
} else {
$rows[$name]['cols'][$row['col']] = $row['ref_col'];
}
}
return array_values($rows);
}
/**
* Pull information about the emulated enum columns
*
* @param string $table
* @return array of arrays
* @throws PEAR_Exception
*/
private function fetchEnumInfo($table)
{
$data = $this->fetchQueryData(
<<<END <<<END
SELECT indexname AS key_name, indexdef AS key_def, pg_index.* SELECT "cols"."attname" AS "col", "con"."consrc" AS "check"
FROM pg_index INNER JOIN pg_indexes FROM pg_constraint AS "con"
ON pg_index.indexrelid = CAST(pg_indexes.indexname AS regclass) INNER JOIN pg_attribute AS "cols"
WHERE pg_indexes.tablename = '{$table}' ON "con"."conrelid" = "cols"."attrelid"
AND indisprimary IS FALSE AND indisunique IS FALSE AND "con"."conkey"[1] = "cols"."attnum"
ORDER BY indrelid, indexrelid; WHERE "cols".atttypid = CAST('text' AS REGTYPE)
AND "con"."contype" = 'c'
AND cardinality("con"."conkey") = 1
AND "con"."conrelid" = CAST('{$table}' AS REGCLASS);
END END
); );
}
/** $rows = [];
* @param string $table
* @param string $constraint_name
* @return array array of rows with keys: table_name, col_names (array of strings)
* @throws PEAR_Exception
*/
public function fetchForeignKeyInfo(string $table, string $constraint_name): array
{
// In a sane world, it'd be easier to query the column names directly.
// But it's pretty hard to work with arrays such as col_indexes in direct SQL here.
$query = 'SELECT ' .
'(SELECT relname FROM pg_class WHERE oid = confrelid) AS table_name, ' .
'confrelid AS table_id, ' .
'(SELECT indkey FROM pg_index WHERE indexrelid = conindid) AS col_indices ' .
'FROM pg_constraint ' .
'WHERE conrelid = CAST(\'%s\' AS regclass) AND conname = \'%s\' AND contype = \'f\'';
$sql = sprintf($query, $table, $constraint_name);
$data = $this->fetchQueryData($sql);
if (count($data) < 1) {
throw new Exception('Could not find foreign key ' . $constraint_name . ' on table ' . $table);
}
$row = $data[0];
return [
'table_name' => $row['table_name'],
'col_names' => $this->getTableColumnNames($row['table_id'], $row['col_indices'])
];
}
/**
*
* @param int $table_id
* @param array $col_indexes
* @return array of strings
* @throws PEAR_Exception
*/
public function getTableColumnNames($table_id, $col_indexes)
{
$indexes = array_map('intval', explode(' ', $col_indexes));
$query = 'SELECT attnum AS col_index, attname AS col_name ' .
'FROM pg_attribute where attrelid=%d ' .
'AND attnum IN (%s)';
$sql = sprintf($query, $table_id, implode(',', $indexes));
$data = $this->fetchQueryData($sql);
$byId = [];
foreach ($data as $row) { foreach ($data as $row) {
$byId[$row['col_index']] = $row['col_name']; // PostgreSQL can show either
} $name_regex = '(' . preg_quote($this->quoteIdentifier($row['col']))
. '|' . preg_quote($row['col']) . ')';
$out = []; $enum = explode("'::text, '", preg_replace(
foreach ($indexes as $id) { "/^\({$name_regex} = ANY \(ARRAY\['(.+)'::text]\)\)$/D",
$out[] = $byId[$id]; '\2',
$row['check']
));
$rows[$row['col']] = $enum;
} }
return $out; return $rows;
} }
private function isNumericType(array $cd): bool private function isNumericType(array $cd): bool
@ -293,16 +358,6 @@ class PgsqlSchema extends Schema
$line = []; $line = [];
$line[] = parent::columnSql($name, $cd); $line[] = parent::columnSql($name, $cd);
/*
if ($table['foreign keys'][$name]) {
foreach ($table['foreign keys'][$name] as $foreignTable => $foreignColumn) {
$line[] = 'references';
$line[] = $this->quoteIdentifier($foreignTable);
$line[] = '(' . $this->quoteIdentifier($foreignColumn) . ')';
}
}
*/
// This'll have been added from our transform of 'serial' type // This'll have been added from our transform of 'serial' type
if (!empty($cd['auto_increment'])) { if (!empty($cd['auto_increment'])) {
$line[] = 'GENERATED BY DEFAULT AS IDENTITY'; $line[] = 'GENERATED BY DEFAULT AS IDENTITY';
@ -328,8 +383,7 @@ class PgsqlSchema extends Schema
'integer' => 'int', 'integer' => 'int',
'char' => 'bpchar', 'char' => 'bpchar',
'datetime' => 'timestamp', 'datetime' => 'timestamp',
'blob' => 'bytea', 'blob' => 'bytea'
'enum' => 'text',
]; ];
$type = $column['type']; $type = $column['type'];