ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/owl/trunk/tools/DataDistribution.java
Revision: 79
Committed: Wed Apr 12 11:03:01 2006 UTC (12 years, 7 months ago) by duarte
File size: 27365 byte(s)
Log Message:
Modified insertIdsToKeyMaster method. Now removing pk in table before inserting, then removing zeros and adding finally the pk back. At that point mysql should complain if there are duplicates
New methods to deal with key_master database actions. They belong in another class, but for the moment here.
- createNewKeyMasterTbl
- removePK
- addPK
- removeZeros
Line File contents
1 package tools;
2
3 import java.io.*;
4 import java.sql.*;
5 import java.util.ArrayList;
6 import java.util.HashMap;
7
8 public class DataDistribution {
9
10 final static String GLOBALDIR="/project/snow/global/tmp";
11 final static String ADMINDIR="/project/StruPPi/Cluster/admin";
12 public final static String MASTER="white";
13 final static String HFILE=ADMINDIR+"/hosts_ss.txt";
14 final static String KEYMASTERDB="key_master";
15
16 boolean debug=false; // if set to true only mysql commands written, no actual dump or load, also dump directory not removed. Use setDebug method to change it
17 String dumpdir;
18 private String db;
19 private String user;
20 private String pwd;
21
22
23 public DataDistribution(String db,String user,String pwd) {
24 this.dumpdir=GLOBALDIR+"/dumps_tmp_"+System.currentTimeMillis();
25 this.db=db;
26 this.user=user;
27 this.pwd=pwd;
28 }
29
30 private MySQLConnection getConnectionToMaster() {
31 MySQLConnection conn=this.getConnectionToNode(MASTER);
32 return conn;
33 }
34
35 private MySQLConnection getConnectionToMasterKeyDb() {
36 MySQLConnection conn=new MySQLConnection(MASTER,user,pwd,KEYMASTERDB);
37 return conn;
38 }
39
40 private MySQLConnection getConnectionToNode(String node) {
41 MySQLConnection conn=new MySQLConnection(node,user,pwd,db);
42 return conn;
43 }
44
45 private MySQLConnection getConnectionToNode(String node, String dbName){
46 MySQLConnection conn=new MySQLConnection(node,user,pwd,dbName);
47 return conn;
48 }
49
50 public void setDebug(boolean debug) {
51 this.debug=debug;
52 }
53
54 public static String[] getNodes() {
55 String[] nodes=null;
56 try {
57 File inputFile = new File(HFILE);
58 BufferedReader hostsFile = new BufferedReader(new FileReader(inputFile)); // open BufferedReader to the file
59 String nodesstr=hostsFile.readLine();
60 nodes=nodesstr.split(" ");
61 hostsFile.close();
62 }
63 catch (IOException e){
64 e.printStackTrace();
65 System.err.println("Couldn't read from file "+HFILE);
66 }
67 return nodes;
68 }
69
70 public String[] getTables4Db(){
71 String[] tables=null;
72 ArrayList<String> tablesAL=new ArrayList<String>();
73 String query="SELECT table_name FROM information_schema.tables WHERE table_schema='"+db+"' ORDER BY table_name DESC;";
74 try {
75 MySQLConnection conn = this.getConnectionToMaster();
76 Statement S=conn.createStatement();
77 ResultSet R=S.executeQuery(query);
78 while (R.next()){
79 tablesAL.add(R.getString(1));
80 }
81 S.close();
82 R.close();
83 conn.close();
84 tables=new String[tablesAL.size()];
85 for (int i=0;i<tablesAL.size();i++) {
86 tables[i]=tablesAL.get(i);
87 }
88 }
89 catch(SQLException e){
90 e.printStackTrace();
91 System.err.println("Couldn't get table names from "+MASTER+" for db="+db);
92 System.exit(1);
93 }
94 return tables;
95 }
96
97 public void initializeDirs() {
98 if (!((new File(dumpdir)).mkdir())) {
99 System.err.println("Couldn't create directory "+dumpdir);
100 System.exit(1);
101 }
102 SystemCmd.exec("chmod go+rw "+dumpdir);
103 }
104
105 public void finalizeDirs() {
106 if (debug) {
107 System.out.println("Temporary directory "+dumpdir+" was not removed. You must remove it manually");
108 } else {
109 System.out.println("Removing temporary directory "+dumpdir);
110 //TODO must capture exit state and print to error if problems deleting dir
111 SystemCmd.exec("rm -rf "+dumpdir);
112 }
113 }
114
115 public void dumpData(String[] srchosts, String[] tables) {
116 // initialising temporary dump directory
117 initializeDirs();
118 for (String node: srchosts) {
119 dumpData(node,tables);
120 }
121 System.out.println ("Dump finished.");
122 }
123
124 private void dumpData(String srchost, String[] tables) {
125 //String quickpar="--quick"; // not to buffer to memory, needed for big tables
126 if (!((new File(dumpdir+"/"+srchost+"/"+db)).mkdirs())) {
127 System.err.println("Couldn't create directory "+dumpdir+"/"+srchost+"/"+db);
128 System.exit(1);
129 }
130 SystemCmd.exec("chmod -R go+rw "+dumpdir+"/"+srchost);
131 for ( String tbl: tables) {
132 String outfile=dumpdir+"/"+srchost+"/"+db+"/"+tbl+".txt";
133 String wherestr="";
134 String sqldumpstr="SELECT * FROM `"+tbl+"` "+wherestr+" INTO OUTFILE '"+outfile+"';";
135 //String dumpstr="$MYSQLDIR/bin/mysql $srcconnpar $quickpar -e \"$sqldumpstr\" ";
136 if (debug) {System.out.println ("HOST="+srchost+", sqldumpstr="+sqldumpstr);}
137 else {
138 try {
139 MySQLConnection conn = this.getConnectionToNode(srchost);
140 Statement S=conn.createStatement();
141 S.executeQuery(sqldumpstr);
142 System.out.println ("Dumped from host="+srchost+", database="+db+", table="+tbl+" to outfile="+outfile);
143 S.close();
144 conn.close();
145 }
146 catch(SQLException e){
147 e.printStackTrace();
148 System.err.println("Couldn't dump from host="+srchost+", database="+db+", table="+tbl+" to outfile="+outfile);
149 System.exit(1);
150 }
151 } //end else if debug
152 } // end foreach tbl
153 }
154
155 public void loadData(String[] srchosts, String[] desthosts,String[] tables, String destDb) {
156 for (String desthost: desthosts) {
157 for (String srchost: srchosts) {
158 loadData(srchost,desthost,tables,destDb);
159 }
160 }
161 System.out.println ("Load finished.");
162 finalizeDirs();
163 }
164
165 private void loadData(String srchost, String desthost,String[] tables, String destDb) {
166 for (String tbl:tables) {
167 String dumpfile=dumpdir+"/"+srchost+"/"+db+"/"+tbl+".txt";
168 String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tbl+"`;";
169 if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", sqlloadstr="+sqlloadstr);}
170 else {
171 try {
172 MySQLConnection conn = this.getConnectionToNode(desthost,destDb);
173 conn.executeSql(sqlloadstr);
174 System.out.println ("HOST: "+desthost+". Loaded from file="+dumpfile+", into database="+destDb+", table="+tbl);
175 conn.close();
176 }
177 catch(SQLException e){
178 e.printStackTrace();
179 System.err.println("Errors occurred while loading data from file="+dumpfile+" into host="+desthost+", database="+destDb+", table="+tbl);
180 System.exit(1);
181 }
182 }
183 } // end foreach tbl
184 }
185
186 private void loadSplitData(String srchost, String[] desthosts, String tableName, String destDb) {
187 for (String desthost:desthosts) {
188 String dumpfile=dumpdir+"/"+srchost+"/"+db+"/"+tableName+"_split_"+desthost+".txt";
189 String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tableName+"`;";
190 if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", sqlloadstr="+sqlloadstr);}
191 else {
192 try {
193 MySQLConnection conn = this.getConnectionToNode(desthost,destDb);
194 conn.executeSql(sqlloadstr);
195 System.out.println ("HOST: "+desthost+". Loaded from file="+dumpfile+", database="+db+", table="+tableName);
196 conn.close();
197 }
198 catch(SQLException e){
199 e.printStackTrace();
200 System.err.println("Errors occurred while loading data from file="+dumpfile+" into host="+desthost+", database="+destDb+", table="+tableName);
201 System.exit(1);
202 }
203 }
204 } // end foreach desthosts
205 System.out.println ("Load finished.");
206 finalizeDirs();
207 }
208
209 public boolean checkCountsAllTables (){
210 boolean checkResult=true;
211 String[] nodes = getNodes();
212 String[] tables = getTables4Db();
213 // getting hashmap of all counts from all tables from nodes
214 HashMap<String,HashMap<String,Integer>> countsNodes=new HashMap<String,HashMap<String,Integer>>();
215 for (String node:nodes){
216 HashMap<String,Integer> tableCounts = new HashMap<String,Integer>();
217 for (String tbl:tables){
218 String query="SELECT count(*) FROM "+tbl+";";
219 try {
220 MySQLConnection conn = this.getConnectionToNode(node);
221 Statement S=conn.createStatement();
222 ResultSet R=S.executeQuery(query);
223 if (R.next()){
224 tableCounts.put(tbl,R.getInt(1));
225 }
226 S.close();
227 R.close();
228 conn.close();
229 }
230 catch(SQLException e){
231 e.printStackTrace();
232 System.err.println("Couldn't execute query in host="+node+", database="+db);
233 System.exit(1);
234 }
235 }
236 countsNodes.put(node,tableCounts);
237 }
238 // getting hashmap of all counts of all tables from master
239 HashMap<String,Integer> countsMaster= new HashMap<String,Integer>();
240 for (String tbl:tables){
241 String query="SELECT count(*) FROM "+tbl+";";
242 try {
243 MySQLConnection conn = this.getConnectionToMaster();
244 Statement S=conn.createStatement();
245 ResultSet R=S.executeQuery(query);
246 if (R.next()){
247 countsMaster.put(tbl,R.getInt(1));
248 }
249 S.close();
250 R.close();
251 conn.close();
252 }
253 catch(SQLException e){
254 e.printStackTrace();
255 System.err.println("Couldn't execute query in host="+MASTER+", database="+db);
256 System.exit(1);
257 }
258 }
259 // comparing the nodes counts with the master counts
260 for (String tbl:countsMaster.keySet()){
261 int masterCount=countsMaster.get(tbl);
262 for (String node:countsNodes.keySet()){
263 int thisNodeCount=countsNodes.get(node).get(tbl);
264 if (masterCount!=thisNodeCount) {
265 System.out.println("Count difers for table "+tbl+" in database "+db+" of node "+node+". MASTER COUNT="+masterCount+", NODE COUNT="+thisNodeCount);
266 checkResult=false; // if one count difers then the check fails, we return false for checkResult. If no difers at all we return true
267 }
268 else {
269 System.out.println("Count check passed for node "+node+", table "+tbl);
270 }
271
272 }
273 }
274 return checkResult;
275 } // end checkCounts
276
277 /**
278 * To check the key counts in master and nodes for a certain key.
279 * @param key the name of the key
280 * @return boolean: true if check passed, false if not passed
281 */
282 public boolean checkKeyCounts(String key) {
283 boolean checkResult=true;
284 ClusterConnection cconn = new ClusterConnection(db,key,user,pwd);
285 String keyTable=cconn.getTableOnNode();
286 String masterKeyTable=cconn.getKeyTable();
287 cconn.close();
288 // getting hashmap of counts of keys from nodes
289 String[] nodes = getNodes();
290 HashMap<String,int[]> countsNodes=new HashMap<String,int[]>();
291 String query="SELECT count("+key+"),count(DISTINCT "+key+") FROM "+keyTable+";";
292 for (String node:nodes){
293 try {
294 MySQLConnection conn = this.getConnectionToNode(node);
295 Statement S=conn.createStatement();
296 ResultSet R=S.executeQuery(query);
297 int[] thisNodeKeyCount=new int[2];
298 if (R.next()){
299 thisNodeKeyCount[0]=R.getInt(1);
300 thisNodeKeyCount[1]=R.getInt(2);
301 }
302 countsNodes.put(node,thisNodeKeyCount);
303 S.close();
304 R.close();
305 conn.close();
306 }
307 catch(SQLException e){
308 e.printStackTrace();
309 System.err.println("Couldn't execute query: "+query+"in host="+node+", database="+db);
310 System.exit(1);
311 }
312 }
313 // getting hashmap of counts of keys from master
314 HashMap<String,Integer> countsMaster= new HashMap<String,Integer>();
315 for (String node:nodes){
316 String queryM="SELECT count(*) FROM "+masterKeyTable+" AS a,clients_names as c WHERE a.client_id=c.client_id AND c.client_name='"+node+"';";
317 try {
318 MySQLConnection conn = this.getConnectionToMasterKeyDb();
319 Statement S=conn.createStatement();
320 ResultSet R=S.executeQuery(queryM);
321 if (R.next()){
322 countsMaster.put(node,R.getInt(1));
323 }
324 S.close();
325 R.close();
326 conn.close();
327 }
328 catch(SQLException e){
329 e.printStackTrace();
330 System.err.println("Couldn't execute query in host="+MASTER+", database="+KEYMASTERDB);
331 System.exit(1);
332 }
333 }
334 //compare the two hashmaps of key counts
335 for (String node:countsMaster.keySet()){
336 int masterCount=countsMaster.get(node);
337 int[] thisNodeCount=countsNodes.get(node);
338 if (thisNodeCount[0]!=thisNodeCount[1]) {
339 System.out.println("Key count and distinct key count do not coincide for key "+key+" in node "+node+". Key count="+thisNodeCount[0]+", distinct key count="+thisNodeCount[1]);
340 checkResult=false;
341 }
342 else if (thisNodeCount[0]!=masterCount) {
343 System.out.println("Key counts do not coincide for key "+key+" in master and node "+node+". MASTER COUNT="+masterCount+", NODE COUNT="+thisNodeCount[0]);
344 System.out.print("Differing "+key+"'s are: ");
345 int[] diffKeys = getDifferingKeys(key,node);
346 for (int k:diffKeys){
347 System.out.print(k+" ");
348 }
349 System.out.println();
350 checkResult=false;
351 }
352 else {
353 System.out.println("Key counts check passed for key "+key+" in node "+node+". The count is: "+masterCount);
354 }
355 }
356 return checkResult;
357 }
358
359 /**
360 * Method to get the differing keys for a certain key and node. Used by checkKeycounts method. Shouldn't be used out of this class.
361 * @param key the key name
362 * @param node the host name of the cluster node
363 * @return array of ints with all differing keys for this key and node
364 */
365 public int[] getDifferingKeys (String key,String node) {
366 ArrayList<Integer> diffKeys = new ArrayList<Integer>();
367 int[] diffKeysAr;
368 ClusterConnection cconn = new ClusterConnection(db,key,user,pwd);
369 String keyTable=cconn.getTableOnNode();
370 String masterKeyTable=cconn.getKeyTable();
371 cconn.close();
372 String query="SELECT DISTINCT "+key+" FROM "+keyTable+" ORDER BY "+key+";";
373 MySQLConnection mconn=null;
374 try {
375 MySQLConnection nconn = this.getConnectionToNode(node);
376 mconn = this.getConnectionToMasterKeyDb();
377 Statement S=nconn.createStatement();
378 ResultSet R=S.executeQuery(query);
379 mconn.executeSql("CREATE TEMPORARY TABLE tmp_keys ("+key+" int(11) default NULL) ENGINE=MEMORY;");
380 int thisKey=0;
381 while (R.next()){
382 thisKey=R.getInt(1);
383 query="INSERT INTO tmp_keys VALUES ("+thisKey+");";
384 mconn.executeSql(query);
385 }
386 S.close();
387 R.close();
388 nconn.close();
389 }
390 catch(SQLException e){
391 e.printStackTrace();
392 System.err.println("Couldn't execute query: "+query+"in host="+node+", database="+db);
393 System.exit(1);
394 }
395 try {
396 query="SELECT c.k " +
397 "FROM " +
398 "(SELECT u.id AS k,count(u.id) AS cnt " +
399 "FROM " +
400 "(SELECT "+key+" AS id FROM tmp_keys UNION ALL SELECT kt."+key+" AS id FROM "+masterKeyTable+" AS kt LEFT JOIN clients_names AS cn ON kt.client_id=cn.client_id WHERE cn.client_name='"+node+"') AS u GROUP BY u.id) " +
401 "AS c " +
402 "WHERE c.cnt=1;";
403 Statement S=mconn.createStatement();
404 ResultSet R=S.executeQuery(query);
405 while (R.next()){
406 diffKeys.add(R.getInt(1));
407 }
408 S.close();
409 R.close();
410 }
411 catch(SQLException e){
412 e.printStackTrace();
413 System.err.println("Couldn't execute query: "+query+"in host="+MASTER+", database="+KEYMASTERDB);
414 System.exit(1);
415 }
416 diffKeysAr= new int[diffKeys.size()];
417 for (int i=0;i<diffKeys.size();i++) {
418 diffKeysAr[i]=diffKeys.get(i);
419 }
420 return diffKeysAr;
421 }
422
423 /**
424 * Method used by splitIdsIntoSet and getIdSetsFromNodes.
425 * To get all ordered ids from a certain key and table from this db in a certain host
426 * @param key the key name
427 * @param table the table name
428 * @param host the host name
429 * @return int array containing all ids
430 */
431 public int[] getAllIds4KeyAndTable(String key, String table, String host){
432 int[] allIds=null;
433 try {
434 MySQLConnection conn=this.getConnectionToNode(host);
435 Statement S=conn.createStatement();
436 String query="SELECT DISTINCT "+key+" FROM "+table+" ORDER BY "+key+";";
437 ResultSet R=S.executeQuery(query);
438 ArrayList<Integer> idsAL=new ArrayList<Integer>();
439 while (R.next()){
440 idsAL.add(R.getInt(1));
441 }
442 allIds=new int[idsAL.size()];
443 for (int i=0;i<idsAL.size();i++) {
444 allIds[i]=idsAL.get(i);
445 }
446 R.close();
447 S.close();
448 conn.close();
449 }
450 catch (SQLException e){
451 e.printStackTrace();
452 }
453 return allIds;
454 }
455
456 /**
457 * The two following methods getIdSetsFromNodes and splitIdsIntoSets are very related.
458 * Both return the same thing: a "data distribution", which is defined as a HashMap of int[],
459 * being the keys the name of the nodes and the int[] all ids that the node contains.
460 * Maybe should create a class that encapsulates the "data distribution" HashMap
461 */
462
463 /**
464 * For a certain key and table finds out how the table is splitted in nodes and returns the "data distribution"
465 * To be used when data for a table is already splitted (splitted doesn't need to mean that there is different chunks of it
466 * in different nodes, but rather that the same table is somehow in all nodes (either in chunks or the whole thing)
467 * Take care when using this method as an argument of insertIdsToKeyMaster: if table is not in chunks (i.e. all data in all)
468 * then ids can't be inserted in key_master as there will be duplication, i.e. for key_id=1 as data is in all nodes there
469 * will be 40 copies of it and thus 40 equal ids will try to be inserted in key_master, which a) makes no sense and
470 * b) mysql won't do it anyway
471 * @param key
472 * @param table
473 * @return idSets HashMap, keys are node names, values: int array with the ids for each node
474 */
475 public HashMap<String,int[]> getIdSetsFromNodes(String key, String table){
476 HashMap<String,int[]> idSets =new HashMap<String,int[]>();
477 String[] nodes = getNodes();
478 for (String node:nodes){
479 idSets.put(node,getAllIds4KeyAndTable(key,table,node));
480 }
481 return idSets;
482 }
483
484 /**
485 * For a certain key and table returns a certain "data distribution" (kind of evenly distributed) of the data to the nodes
486 * To be used when we have a table that we are going to split to the nodes
487 * TODO eventually the code could be cleverer so that the data is actually evenly distributed, right now is only evenly distributed on key ids
488 * @param key
489 * @param table
490 * @return idSets HashMap, keys are node names, values: int array with the ids for each node
491 */
492 public HashMap<String,int[]> splitIdsIntoSets(String key, String table){
493 HashMap<String,int[]> idSets =new HashMap<String,int[]>();
494 String[] nodes=DataDistribution.getNodes();
495 int numNodes=nodes.length;
496 int[] allIds=this.getAllIds4KeyAndTable(key,table,MASTER);
497 int numIds=allIds.length;
498 int setSize=numIds/numNodes;
499 int remainder=numIds%numNodes;
500 for (int i=0;i<numNodes;i++){
501 if (i<remainder){ // for the first "remainder" number of nodes we put setSize+1 ids in the node
502 int[] thisnodeidset=new int[setSize+1];
503 for (int j=0;j<thisnodeidset.length;j++){
504 thisnodeidset[j]=allIds[j+i*(setSize+1)];
505 }
506 idSets.put(nodes[i],thisnodeidset);
507 } else { // for the rest we put only setSize ids
508 int[] thisnodeidset=new int[setSize];
509 for (int j=0;j<thisnodeidset.length;j++){
510 thisnodeidset[j]=allIds[j+remainder*(setSize+1)+(i-remainder)*setSize];
511 }
512 idSets.put(nodes[i],thisnodeidset);
513 }
514 }
515 return idSets;
516 }
517
518 /**
519 * To split a given table in chunks based on a key, split tables remain in same database and server
520 * @param key
521 * @param table
522 */
523 public void splitTable (String key,String table){
524 String query;
525 HashMap<String,int[]> idSets = this.splitIdsIntoSets(key,table);
526 String[] splitTables=new String[idSets.size()];
527 try {
528 MySQLConnection conn=this.getConnectionToMaster();
529 int i=0;
530 for (String node:idSets.keySet()) {
531 String splitTbl=table+"_split_"+node;
532 splitTables[i]=splitTbl;
533 i++;
534 // we create permanent tables
535 query="CREATE TABLE "+splitTbl+" LIKE "+table+";";
536 conn.executeSql(query);
537 // drop the indexes if there was any, indexes will slow down the creation of split tables
538 String[] indexes=conn.getAllIndexes4Table(table);
539 for (String index:indexes) {
540 conn.executeSql("DROP INDEX "+index+" ON "+splitTbl+";");
541 }
542 int idmin=idSets.get(node)[0];
543 int idmax=idSets.get(node)[idSets.get(node).length-1];
544 query="INSERT INTO "+splitTbl+" SELECT * FROM "+table+" WHERE "+key+">="+idmin+" AND "+key+"<="+idmax+";";
545 conn.executeSql(query);
546 //TODO recreate indexes, use method getCreateIndex4Table from MySQLConnection
547 }
548 conn.close();
549 }
550 catch (SQLException e){
551 e.printStackTrace();
552 }
553 }
554
555 /**
556 * To split a given table in chunks based on a key, split tables go to different nodes of cluster
557 * @param key
558 * @param table
559 * @param destDb name of destination db
560 */
561 public void splitTableToCluster (String key,String table, String destDb){
562 String query;
563 HashMap<String,int[]> idSets = this.splitIdsIntoSets(key,table);
564 String[] splitTables=new String[idSets.size()];
565 try {
566 MySQLConnection conn=this.getConnectionToMaster();
567 int i=0;
568 for (String node:idSets.keySet()) {
569 String splitTbl=table+"_split_"+node;
570 splitTables[i]=splitTbl;
571 i++;
572 // we create permanent tables, later we drop them. Can't be temporary as we use another connection for dumpData
573 query="CREATE TABLE "+splitTbl+" LIKE "+table+";";
574 conn.executeSql(query);
575 // drop the indexes if there was any, indexes will slow down the creation of split tables
576 String[] indexes=conn.getAllIndexes4Table(table);
577 for (String index:indexes) {
578 conn.executeSql("DROP INDEX "+index+" ON "+splitTbl+";");
579 }
580 // make the table a memory table (won't be feasible in general case where tables can be VERY big, even white won't cope)
581 //query="ALTER TABLE "+splitTbl+" TYPE=MEMORY;";
582 //conn.executeSql(query);
583 int idmin=idSets.get(node)[0];
584 int idmax=idSets.get(node)[idSets.get(node).length-1];
585 query="INSERT INTO "+splitTbl+" SELECT * FROM "+table+" WHERE "+key+">="+idmin+" AND "+key+"<="+idmax+";";
586 conn.executeSql(query);
587 }
588 // transfering data across
589 String[] srchosts={MASTER};
590 String[] desthosts=getNodes();
591 dumpData(srchosts,splitTables);
592 // using here loadSplitData rather than loadData because table names are not the same on source and destination, i.e. source: table_split_tla01, dest: table
593 loadSplitData(MASTER,desthosts,table,destDb);
594 // droping table, we don't want them anymore after loading data to nodes
595 for (String tbl:splitTables){
596 query="DROP TABLE "+tbl+";";
597 conn.executeSql(query);
598 }
599 conn.close();
600 }
601 catch (SQLException e){
602 e.printStackTrace();
603 }
604 // putting the ids in the key_master database so we keep track of where everything is
605 insertIdsToKeyMaster(key,table,destDb,idSets);
606 }
607
608 /**
609 * Insert all ids to the key_master database creating a new table for this table/destDb combination if not exists
610 * @param key name of key on which distribution of table is based
611 * @param table name of table that we are distributing
612 * @param destDb name of database in nodes where data is distributed
613 * @param idSets as returned from splitIdsIntoSets or getIdSetsFromNodes
614 */
615 public void insertIdsToKeyMaster(String key,String table,String destDb,HashMap<String,int[]> idSets) {
616 MySQLConnection conn = this.getConnectionToMasterKeyDb();
617 String keyMasterTbl = createNewKeyMasterTbl(key,table,destDb);
618 removePK(keyMasterTbl,key); // attention removing primary keys, duplicates won't be checked!!!
619 for (String node:idSets.keySet()){
620 int[] thisNodeIds=idSets.get(node);
621 for (int id:thisNodeIds){
622 String query="INSERT INTO "+keyMasterTbl+" ("+key+",client_id) " +
623 "SELECT "+id+",c.client_id FROM clients_names AS c WHERE client_name='"+node+"';";
624 try {
625 conn.executeSql(query);
626 } catch (SQLException e) {
627 e.printStackTrace();
628 }
629 }
630 }
631 removeZeros(keyMasterTbl,key); // we only have inserted 0s for records that we didn't want, it's safe now to get rid of them
632 addPK(keyMasterTbl,key); // if there were duplicates, this should barf
633 }
634
635 /**
636 * To create a new key master table in the key_master database given a key, table and db. Used by insertIdsToKeyMaster
637 * Eventually this method on other key_master related should go into their own class, shouldn't they?
638 * @param key
639 * @param table
640 * @param destDb the db
641 * @return the name of the key master table created
642 */
643 public String createNewKeyMasterTbl(String key,String table,String destDb) {
644 String keyMasterTbl=destDb+"__"+table;
645 MySQLConnection conn=this.getConnectionToMasterKeyDb();
646 try {
647 String query="CREATE TABLE IF NOT EXISTS "+keyMasterTbl+" ("+
648 key+" int(11) NOT NULL auto_increment, " +
649 "client_id smallint(6) NOT NULL default '0', " +
650 "PRIMARY KEY (`"+key+"`) " +
651 ") ENGINE=MyISAM DEFAULT CHARSET=ascii COLLATE=ascii_bin;";
652 Statement S=conn.createStatement();
653 S.executeUpdate(query);
654 S.close();
655 } catch (SQLException e) {
656 System.err.println("Couldn't create table "+keyMasterTbl);
657 e.printStackTrace();
658 }
659 try {
660 Statement S=conn.createStatement();
661 String query="INSERT INTO dbs_keys (key_name,db,key_master_table) VALUES (\'"+key+"\',\'"+db+"\',\'"+keyMasterTbl+"\');";
662 S.executeUpdate(query);
663 S.close();
664 } catch (SQLException e) {
665 System.err.println("Didn't insert new record into table dbs_keys of database: "+KEYMASTERDB+". The record for key: "+key+", table: "+table+" existed already. This is usually a harmless error!");
666 System.err.println("SQLException: " + e.getMessage());
667 }
668 conn.close();
669 return keyMasterTbl;
670 }
671
672 public void removePK (String keyMasterTbl,String key){
673 MySQLConnection conn=this.getConnectionToMasterKeyDb();
674 try {
675 String query="ALTER TABLE "+keyMasterTbl+" MODIFY "+key+" int(11) NOT NULL default '0';";
676 conn.executeSql(query);
677 query="ALTER TABLE "+keyMasterTbl+" DROP PRIMARY KEY;";
678 conn.executeSql(query);
679 } catch (SQLException e) {
680 e.printStackTrace();
681 }
682 conn.close();
683 }
684
685 public void addPK (String keyMasterTbl, String key){
686 MySQLConnection conn=this.getConnectionToMasterKeyDb();
687 try {
688 String query="ALTER TABLE "+keyMasterTbl+" ADD PRIMARY KEY("+key+");";
689 conn.executeSql(query);
690 query="ALTER TABLE "+keyMasterTbl+" MODIFY "+key+" int(11) NOT NULL auto_increment;";
691 conn.executeSql(query);
692 } catch (SQLException e) {
693 e.printStackTrace();
694 }
695 conn.close();
696 }
697
698 public void removeZeros (String keyMasterTbl, String key){
699 MySQLConnection conn=this.getConnectionToMasterKeyDb();
700 try {
701 String query="DELETE FROM "+keyMasterTbl+" WHERE "+key+"=0;";
702 conn.executeSql(query);
703 } catch (SQLException e) {
704 e.printStackTrace();
705 }
706 conn.close();
707 }
708
709 /**
710 * Executes a query in all nodes in cluster.
711 * TODO Right now it is serial, must parallelize this with threads
712 * @param query
713 */
714 public void clusterExecuteQuery(String query){
715 String[] nodes = getNodes();
716 for (String node: nodes){
717 try {
718 MySQLConnection conn = this.getConnectionToNode(node);
719 conn.executeSql(query);
720 conn.close();
721 }
722 catch(SQLException e){
723 e.printStackTrace();
724 System.err.println("Couldn't execute query="+query+", in node="+node);
725 System.exit(1);
726 }
727 }
728
729 }
730
731 /**
732 * Executes a query in all nodes in cluster given a HashMap containing a set of queries (one per node)
733 * TODO Right now it is serial, must parallelize this with threads
734 * TODO This can be used in the load/dump methods in this class where queries are different for each node
735 * @param queries a HashMap containing a query per node
736 */
737 public void clusterExecuteQuery(HashMap<String,String> queries){
738 String[] nodes = getNodes();
739 for (String node: nodes){
740 String query="";
741 try {
742 query=queries.get(node);
743 MySQLConnection conn = this.getConnectionToNode(node);
744 conn.executeSql(query);
745 conn.close();
746 }
747 catch(SQLException e){
748 e.printStackTrace();
749 System.err.println("Couldn't execute query="+query+", in node="+node);
750 System.exit(1);
751 }
752 }
753
754 }
755
756 }

Properties

Name Value
svn:executable *