ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/owl/trunk/tools/DataDistribution.java
Revision: 79
Committed: Wed Apr 12 11:03:01 2006 UTC (12 years, 10 months ago) by duarte
File size: 27365 byte(s)
Log Message:
Modified insertIdsToKeyMaster method. Now removing pk in table before inserting, then removing zeros and adding finally the pk back. At that point mysql should complain if there are duplicates
New methods to deal with key_master database actions. They belong in another class, but for the moment here.
- createNewKeyMasterTbl
- removePK
- addPK
- removeZeros
Line User Rev File contents
1 duarte 65 package tools;
2    
3     import java.io.*;
4     import java.sql.*;
5     import java.util.ArrayList;
6     import java.util.HashMap;
7    
8     public class DataDistribution {
9    
10     final static String GLOBALDIR="/project/snow/global/tmp";
11     final static String ADMINDIR="/project/StruPPi/Cluster/admin";
12 duarte 66 public final static String MASTER="white";
13 duarte 65 final static String HFILE=ADMINDIR+"/hosts_ss.txt";
14 duarte 70 final static String KEYMASTERDB="key_master";
15 duarte 65
16     boolean debug=false; // if set to true only mysql commands written, no actual dump or load, also dump directory not removed. Use setDebug method to change it
17     String dumpdir;
18     private String db;
19     private String user;
20     private String pwd;
21    
22    
23     public DataDistribution(String db,String user,String pwd) {
24     this.dumpdir=GLOBALDIR+"/dumps_tmp_"+System.currentTimeMillis();
25     this.db=db;
26     this.user=user;
27     this.pwd=pwd;
28     }
29    
30     private MySQLConnection getConnectionToMaster() {
31     MySQLConnection conn=this.getConnectionToNode(MASTER);
32     return conn;
33     }
34    
35     private MySQLConnection getConnectionToMasterKeyDb() {
36     MySQLConnection conn=new MySQLConnection(MASTER,user,pwd,KEYMASTERDB);
37     return conn;
38     }
39    
40     private MySQLConnection getConnectionToNode(String node) {
41     MySQLConnection conn=new MySQLConnection(node,user,pwd,db);
42     return conn;
43     }
44    
45     private MySQLConnection getConnectionToNode(String node, String dbName){
46     MySQLConnection conn=new MySQLConnection(node,user,pwd,dbName);
47     return conn;
48     }
49    
50     public void setDebug(boolean debug) {
51     this.debug=debug;
52     }
53    
54     public static String[] getNodes() {
55     String[] nodes=null;
56     try {
57     File inputFile = new File(HFILE);
58     BufferedReader hostsFile = new BufferedReader(new FileReader(inputFile)); // open BufferedReader to the file
59     String nodesstr=hostsFile.readLine();
60     nodes=nodesstr.split(" ");
61     hostsFile.close();
62     }
63     catch (IOException e){
64     e.printStackTrace();
65     System.err.println("Couldn't read from file "+HFILE);
66     }
67     return nodes;
68     }
69    
70     public String[] getTables4Db(){
71     String[] tables=null;
72     ArrayList<String> tablesAL=new ArrayList<String>();
73     String query="SELECT table_name FROM information_schema.tables WHERE table_schema='"+db+"' ORDER BY table_name DESC;";
74     try {
75     MySQLConnection conn = this.getConnectionToMaster();
76     Statement S=conn.createStatement();
77     ResultSet R=S.executeQuery(query);
78     while (R.next()){
79     tablesAL.add(R.getString(1));
80     }
81     S.close();
82     R.close();
83     conn.close();
84     tables=new String[tablesAL.size()];
85     for (int i=0;i<tablesAL.size();i++) {
86     tables[i]=tablesAL.get(i);
87     }
88     }
89     catch(SQLException e){
90     e.printStackTrace();
91     System.err.println("Couldn't get table names from "+MASTER+" for db="+db);
92     System.exit(1);
93     }
94     return tables;
95     }
96    
97     public void initializeDirs() {
98     if (!((new File(dumpdir)).mkdir())) {
99     System.err.println("Couldn't create directory "+dumpdir);
100     System.exit(1);
101     }
102     SystemCmd.exec("chmod go+rw "+dumpdir);
103     }
104    
105     public void finalizeDirs() {
106     if (debug) {
107     System.out.println("Temporary directory "+dumpdir+" was not removed. You must remove it manually");
108     } else {
109     System.out.println("Removing temporary directory "+dumpdir);
110     //TODO must capture exit state and print to error if problems deleting dir
111     SystemCmd.exec("rm -rf "+dumpdir);
112     }
113     }
114    
115     public void dumpData(String[] srchosts, String[] tables) {
116     // initialising temporary dump directory
117     initializeDirs();
118     for (String node: srchosts) {
119     dumpData(node,tables);
120     }
121     System.out.println ("Dump finished.");
122     }
123    
124     private void dumpData(String srchost, String[] tables) {
125     //String quickpar="--quick"; // not to buffer to memory, needed for big tables
126     if (!((new File(dumpdir+"/"+srchost+"/"+db)).mkdirs())) {
127     System.err.println("Couldn't create directory "+dumpdir+"/"+srchost+"/"+db);
128     System.exit(1);
129     }
130     SystemCmd.exec("chmod -R go+rw "+dumpdir+"/"+srchost);
131     for ( String tbl: tables) {
132     String outfile=dumpdir+"/"+srchost+"/"+db+"/"+tbl+".txt";
133     String wherestr="";
134     String sqldumpstr="SELECT * FROM `"+tbl+"` "+wherestr+" INTO OUTFILE '"+outfile+"';";
135     //String dumpstr="$MYSQLDIR/bin/mysql $srcconnpar $quickpar -e \"$sqldumpstr\" ";
136     if (debug) {System.out.println ("HOST="+srchost+", sqldumpstr="+sqldumpstr);}
137     else {
138     try {
139     MySQLConnection conn = this.getConnectionToNode(srchost);
140     Statement S=conn.createStatement();
141     S.executeQuery(sqldumpstr);
142     System.out.println ("Dumped from host="+srchost+", database="+db+", table="+tbl+" to outfile="+outfile);
143     S.close();
144     conn.close();
145     }
146     catch(SQLException e){
147     e.printStackTrace();
148     System.err.println("Couldn't dump from host="+srchost+", database="+db+", table="+tbl+" to outfile="+outfile);
149     System.exit(1);
150     }
151     } //end else if debug
152     } // end foreach tbl
153     }
154    
155     public void loadData(String[] srchosts, String[] desthosts,String[] tables, String destDb) {
156     for (String desthost: desthosts) {
157     for (String srchost: srchosts) {
158     loadData(srchost,desthost,tables,destDb);
159     }
160     }
161     System.out.println ("Load finished.");
162     finalizeDirs();
163     }
164    
165     private void loadData(String srchost, String desthost,String[] tables, String destDb) {
166     for (String tbl:tables) {
167     String dumpfile=dumpdir+"/"+srchost+"/"+db+"/"+tbl+".txt";
168     String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tbl+"`;";
169     if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", sqlloadstr="+sqlloadstr);}
170     else {
171     try {
172     MySQLConnection conn = this.getConnectionToNode(desthost,destDb);
173     conn.executeSql(sqlloadstr);
174     System.out.println ("HOST: "+desthost+". Loaded from file="+dumpfile+", into database="+destDb+", table="+tbl);
175     conn.close();
176     }
177     catch(SQLException e){
178     e.printStackTrace();
179     System.err.println("Errors occurred while loading data from file="+dumpfile+" into host="+desthost+", database="+destDb+", table="+tbl);
180     System.exit(1);
181     }
182     }
183     } // end foreach tbl
184     }
185    
186     private void loadSplitData(String srchost, String[] desthosts, String tableName, String destDb) {
187     for (String desthost:desthosts) {
188     String dumpfile=dumpdir+"/"+srchost+"/"+db+"/"+tableName+"_split_"+desthost+".txt";
189     String sqlloadstr="LOAD DATA INFILE '"+dumpfile+"' INTO TABLE `"+tableName+"`;";
190     if (debug) {System.out.println ("SRCHOST="+srchost+", DESTHOST="+desthost+", sqlloadstr="+sqlloadstr);}
191     else {
192     try {
193     MySQLConnection conn = this.getConnectionToNode(desthost,destDb);
194     conn.executeSql(sqlloadstr);
195     System.out.println ("HOST: "+desthost+". Loaded from file="+dumpfile+", database="+db+", table="+tableName);
196     conn.close();
197     }
198     catch(SQLException e){
199     e.printStackTrace();
200     System.err.println("Errors occurred while loading data from file="+dumpfile+" into host="+desthost+", database="+destDb+", table="+tableName);
201     System.exit(1);
202     }
203     }
204     } // end foreach desthosts
205     System.out.println ("Load finished.");
206     finalizeDirs();
207     }
208    
209     public boolean checkCountsAllTables (){
210     boolean checkResult=true;
211     String[] nodes = getNodes();
212     String[] tables = getTables4Db();
213     // getting hashmap of all counts from all tables from nodes
214     HashMap<String,HashMap<String,Integer>> countsNodes=new HashMap<String,HashMap<String,Integer>>();
215     for (String node:nodes){
216     HashMap<String,Integer> tableCounts = new HashMap<String,Integer>();
217     for (String tbl:tables){
218     String query="SELECT count(*) FROM "+tbl+";";
219     try {
220     MySQLConnection conn = this.getConnectionToNode(node);
221     Statement S=conn.createStatement();
222     ResultSet R=S.executeQuery(query);
223     if (R.next()){
224     tableCounts.put(tbl,R.getInt(1));
225     }
226     S.close();
227     R.close();
228     conn.close();
229     }
230     catch(SQLException e){
231     e.printStackTrace();
232     System.err.println("Couldn't execute query in host="+node+", database="+db);
233     System.exit(1);
234     }
235     }
236     countsNodes.put(node,tableCounts);
237     }
238     // getting hashmap of all counts of all tables from master
239     HashMap<String,Integer> countsMaster= new HashMap<String,Integer>();
240     for (String tbl:tables){
241     String query="SELECT count(*) FROM "+tbl+";";
242     try {
243     MySQLConnection conn = this.getConnectionToMaster();
244     Statement S=conn.createStatement();
245     ResultSet R=S.executeQuery(query);
246     if (R.next()){
247     countsMaster.put(tbl,R.getInt(1));
248     }
249     S.close();
250     R.close();
251     conn.close();
252     }
253     catch(SQLException e){
254     e.printStackTrace();
255     System.err.println("Couldn't execute query in host="+MASTER+", database="+db);
256     System.exit(1);
257     }
258     }
259     // comparing the nodes counts with the master counts
260     for (String tbl:countsMaster.keySet()){
261     int masterCount=countsMaster.get(tbl);
262     for (String node:countsNodes.keySet()){
263     int thisNodeCount=countsNodes.get(node).get(tbl);
264     if (masterCount!=thisNodeCount) {
265     System.out.println("Count difers for table "+tbl+" in database "+db+" of node "+node+". MASTER COUNT="+masterCount+", NODE COUNT="+thisNodeCount);
266     checkResult=false; // if one count difers then the check fails, we return false for checkResult. If no difers at all we return true
267     }
268     else {
269     System.out.println("Count check passed for node "+node+", table "+tbl);
270     }
271    
272     }
273     }
274     return checkResult;
275     } // end checkCounts
276    
277     /**
278     * To check the key counts in master and nodes for a certain key.
279     * @param key the name of the key
280     * @return boolean: true if check passed, false if not passed
281     */
282     public boolean checkKeyCounts(String key) {
283     boolean checkResult=true;
284 duarte 73 ClusterConnection cconn = new ClusterConnection(db,key,user,pwd);
285     String keyTable=cconn.getTableOnNode();
286     String masterKeyTable=cconn.getKeyTable();
287     cconn.close();
288 duarte 65 // getting hashmap of counts of keys from nodes
289     String[] nodes = getNodes();
290     HashMap<String,int[]> countsNodes=new HashMap<String,int[]>();
291 duarte 73 String query="SELECT count("+key+"),count(DISTINCT "+key+") FROM "+keyTable+";";
292 duarte 65 for (String node:nodes){
293     try {
294     MySQLConnection conn = this.getConnectionToNode(node);
295     Statement S=conn.createStatement();
296     ResultSet R=S.executeQuery(query);
297     int[] thisNodeKeyCount=new int[2];
298     if (R.next()){
299     thisNodeKeyCount[0]=R.getInt(1);
300     thisNodeKeyCount[1]=R.getInt(2);
301     }
302     countsNodes.put(node,thisNodeKeyCount);
303     S.close();
304     R.close();
305     conn.close();
306     }
307     catch(SQLException e){
308     e.printStackTrace();
309     System.err.println("Couldn't execute query: "+query+"in host="+node+", database="+db);
310     System.exit(1);
311     }
312     }
313     // getting hashmap of counts of keys from master
314     HashMap<String,Integer> countsMaster= new HashMap<String,Integer>();
315     for (String node:nodes){
316     String queryM="SELECT count(*) FROM "+masterKeyTable+" AS a,clients_names as c WHERE a.client_id=c.client_id AND c.client_name='"+node+"';";
317     try {
318     MySQLConnection conn = this.getConnectionToMasterKeyDb();
319     Statement S=conn.createStatement();
320     ResultSet R=S.executeQuery(queryM);
321     if (R.next()){
322     countsMaster.put(node,R.getInt(1));
323     }
324     S.close();
325     R.close();
326     conn.close();
327     }
328     catch(SQLException e){
329     e.printStackTrace();
330     System.err.println("Couldn't execute query in host="+MASTER+", database="+KEYMASTERDB);
331     System.exit(1);
332     }
333     }
334     //compare the two hashmaps of key counts
335     for (String node:countsMaster.keySet()){
336     int masterCount=countsMaster.get(node);
337     int[] thisNodeCount=countsNodes.get(node);
338     if (thisNodeCount[0]!=thisNodeCount[1]) {
339 duarte 73 System.out.println("Key count and distinct key count do not coincide for key "+key+" in node "+node+". Key count="+thisNodeCount[0]+", distinct key count="+thisNodeCount[1]);
340 duarte 65 checkResult=false;
341     }
342     else if (thisNodeCount[0]!=masterCount) {
343 duarte 73 System.out.println("Key counts do not coincide for key "+key+" in master and node "+node+". MASTER COUNT="+masterCount+", NODE COUNT="+thisNodeCount[0]);
344     System.out.print("Differing "+key+"'s are: ");
345 duarte 65 int[] diffKeys = getDifferingKeys(key,node);
346     for (int k:diffKeys){
347     System.out.print(k+" ");
348     }
349     System.out.println();
350     checkResult=false;
351     }
352     else {
353 duarte 73 System.out.println("Key counts check passed for key "+key+" in node "+node+". The count is: "+masterCount);
354 duarte 65 }
355     }
356     return checkResult;
357     }
358    
359     /**
360     * Method to get the differing keys for a certain key and node. Used by checkKeycounts method. Shouldn't be used out of this class.
361     * @param key the key name
362     * @param node the host name of the cluster node
363     * @return array of ints with all differing keys for this key and node
364     */
365     public int[] getDifferingKeys (String key,String node) {
366     ArrayList<Integer> diffKeys = new ArrayList<Integer>();
367     int[] diffKeysAr;
368 duarte 73 ClusterConnection cconn = new ClusterConnection(db,key,user,pwd);
369     String keyTable=cconn.getTableOnNode();
370     String masterKeyTable=cconn.getKeyTable();
371     cconn.close();
372     String query="SELECT DISTINCT "+key+" FROM "+keyTable+" ORDER BY "+key+";";
373 duarte 65 MySQLConnection mconn=null;
374     try {
375     MySQLConnection nconn = this.getConnectionToNode(node);
376     mconn = this.getConnectionToMasterKeyDb();
377     Statement S=nconn.createStatement();
378     ResultSet R=S.executeQuery(query);
379 duarte 73 mconn.executeSql("CREATE TEMPORARY TABLE tmp_keys ("+key+" int(11) default NULL) ENGINE=MEMORY;");
380 duarte 65 int thisKey=0;
381     while (R.next()){
382     thisKey=R.getInt(1);
383     query="INSERT INTO tmp_keys VALUES ("+thisKey+");";
384     mconn.executeSql(query);
385     }
386     S.close();
387     R.close();
388     nconn.close();
389     }
390     catch(SQLException e){
391     e.printStackTrace();
392     System.err.println("Couldn't execute query: "+query+"in host="+node+", database="+db);
393     System.exit(1);
394     }
395     try {
396     query="SELECT c.k " +
397     "FROM " +
398     "(SELECT u.id AS k,count(u.id) AS cnt " +
399     "FROM " +
400 duarte 73 "(SELECT "+key+" AS id FROM tmp_keys UNION ALL SELECT kt."+key+" AS id FROM "+masterKeyTable+" AS kt LEFT JOIN clients_names AS cn ON kt.client_id=cn.client_id WHERE cn.client_name='"+node+"') AS u GROUP BY u.id) " +
401 duarte 65 "AS c " +
402     "WHERE c.cnt=1;";
403     Statement S=mconn.createStatement();
404     ResultSet R=S.executeQuery(query);
405     while (R.next()){
406     diffKeys.add(R.getInt(1));
407     }
408     S.close();
409     R.close();
410     }
411     catch(SQLException e){
412     e.printStackTrace();
413     System.err.println("Couldn't execute query: "+query+"in host="+MASTER+", database="+KEYMASTERDB);
414     System.exit(1);
415     }
416     diffKeysAr= new int[diffKeys.size()];
417     for (int i=0;i<diffKeys.size();i++) {
418     diffKeysAr[i]=diffKeys.get(i);
419     }
420     return diffKeysAr;
421     }
422    
423     /**
424 duarte 71 * Method used by splitIdsIntoSet and getIdSetsFromNodes.
425     * To get all ordered ids from a certain key and table from this db in a certain host
426 duarte 65 * @param key the key name
427     * @param table the table name
428 duarte 71 * @param host the host name
429 duarte 65 * @return int array containing all ids
430     */
431 duarte 71 public int[] getAllIds4KeyAndTable(String key, String table, String host){
432 duarte 73 int[] allIds=null;
433 duarte 65 try {
434 duarte 71 MySQLConnection conn=this.getConnectionToNode(host);
435 duarte 65 Statement S=conn.createStatement();
436 duarte 73 String query="SELECT DISTINCT "+key+" FROM "+table+" ORDER BY "+key+";";
437 duarte 65 ResultSet R=S.executeQuery(query);
438     ArrayList<Integer> idsAL=new ArrayList<Integer>();
439     while (R.next()){
440     idsAL.add(R.getInt(1));
441     }
442     allIds=new int[idsAL.size()];
443     for (int i=0;i<idsAL.size();i++) {
444     allIds[i]=idsAL.get(i);
445     }
446     R.close();
447     S.close();
448     conn.close();
449     }
450     catch (SQLException e){
451     e.printStackTrace();
452     }
453     return allIds;
454     }
455    
456     /**
457 duarte 71 * The two following methods getIdSetsFromNodes and splitIdsIntoSets are very related.
458     * Both return the same thing: a "data distribution", which is defined as a HashMap of int[],
459     * being the keys the name of the nodes and the int[] all ids that the node contains.
460     * Maybe should create a class that encapsulates the "data distribution" HashMap
461     */
462    
463     /**
464     * For a certain key and table finds out how the table is splitted in nodes and returns the "data distribution"
465     * To be used when data for a table is already splitted (splitted doesn't need to mean that there is different chunks of it
466     * in different nodes, but rather that the same table is somehow in all nodes (either in chunks or the whole thing)
467     * Take care when using this method as an argument of insertIdsToKeyMaster: if table is not in chunks (i.e. all data in all)
468     * then ids can't be inserted in key_master as there will be duplication, i.e. for key_id=1 as data is in all nodes there
469     * will be 40 copies of it and thus 40 equal ids will try to be inserted in key_master, which a) makes no sense and
470     * b) mysql won't do it anyway
471 duarte 65 * @param key
472 duarte 71 * @param table
473     * @return idSets HashMap, keys are node names, values: int array with the ids for each node
474     */
475     public HashMap<String,int[]> getIdSetsFromNodes(String key, String table){
476     HashMap<String,int[]> idSets =new HashMap<String,int[]>();
477     String[] nodes = getNodes();
478     for (String node:nodes){
479     idSets.put(node,getAllIds4KeyAndTable(key,table,node));
480     }
481     return idSets;
482     }
483    
484     /**
485     * For a certain key and table returns a certain "data distribution" (kind of evenly distributed) of the data to the nodes
486     * To be used when we have a table that we are going to split to the nodes
487     * TODO eventually the code could be cleverer so that the data is actually evenly distributed, right now is only evenly distributed on key ids
488     * @param key
489 duarte 65 * @param table
490 duarte 71 * @return idSets HashMap, keys are node names, values: int array with the ids for each node
491 duarte 65 */
492     public HashMap<String,int[]> splitIdsIntoSets(String key, String table){
493     HashMap<String,int[]> idSets =new HashMap<String,int[]>();
494     String[] nodes=DataDistribution.getNodes();
495     int numNodes=nodes.length;
496 duarte 71 int[] allIds=this.getAllIds4KeyAndTable(key,table,MASTER);
497 duarte 65 int numIds=allIds.length;
498     int setSize=numIds/numNodes;
499     int remainder=numIds%numNodes;
500     for (int i=0;i<numNodes;i++){
501     if (i<remainder){ // for the first "remainder" number of nodes we put setSize+1 ids in the node
502     int[] thisnodeidset=new int[setSize+1];
503     for (int j=0;j<thisnodeidset.length;j++){
504     thisnodeidset[j]=allIds[j+i*(setSize+1)];
505     }
506     idSets.put(nodes[i],thisnodeidset);
507     } else { // for the rest we put only setSize ids
508     int[] thisnodeidset=new int[setSize];
509     for (int j=0;j<thisnodeidset.length;j++){
510     thisnodeidset[j]=allIds[j+remainder*(setSize+1)+(i-remainder)*setSize];
511     }
512     idSets.put(nodes[i],thisnodeidset);
513     }
514     }
515     return idSets;
516     }
517    
518     /**
519     * To split a given table in chunks based on a key, split tables remain in same database and server
520     * @param key
521     * @param table
522     */
523     public void splitTable (String key,String table){
524     String query;
525     HashMap<String,int[]> idSets = this.splitIdsIntoSets(key,table);
526     String[] splitTables=new String[idSets.size()];
527     try {
528     MySQLConnection conn=this.getConnectionToMaster();
529     int i=0;
530     for (String node:idSets.keySet()) {
531     String splitTbl=table+"_split_"+node;
532     splitTables[i]=splitTbl;
533     i++;
534     // we create permanent tables
535     query="CREATE TABLE "+splitTbl+" LIKE "+table+";";
536     conn.executeSql(query);
537     // drop the indexes if there was any, indexes will slow down the creation of split tables
538     String[] indexes=conn.getAllIndexes4Table(table);
539     for (String index:indexes) {
540     conn.executeSql("DROP INDEX "+index+" ON "+splitTbl+";");
541     }
542     int idmin=idSets.get(node)[0];
543     int idmax=idSets.get(node)[idSets.get(node).length-1];
544 duarte 73 query="INSERT INTO "+splitTbl+" SELECT * FROM "+table+" WHERE "+key+">="+idmin+" AND "+key+"<="+idmax+";";
545 duarte 65 conn.executeSql(query);
546     //TODO recreate indexes, use method getCreateIndex4Table from MySQLConnection
547     }
548     conn.close();
549     }
550     catch (SQLException e){
551     e.printStackTrace();
552     }
553     }
554    
555     /**
556     * To split a given table in chunks based on a key, split tables go to different nodes of cluster
557     * @param key
558     * @param table
559     * @param destDb name of destination db
560     */
561     public void splitTableToCluster (String key,String table, String destDb){
562     String query;
563     HashMap<String,int[]> idSets = this.splitIdsIntoSets(key,table);
564     String[] splitTables=new String[idSets.size()];
565     try {
566     MySQLConnection conn=this.getConnectionToMaster();
567     int i=0;
568     for (String node:idSets.keySet()) {
569     String splitTbl=table+"_split_"+node;
570     splitTables[i]=splitTbl;
571     i++;
572     // we create permanent tables, later we drop them. Can't be temporary as we use another connection for dumpData
573     query="CREATE TABLE "+splitTbl+" LIKE "+table+";";
574     conn.executeSql(query);
575     // drop the indexes if there was any, indexes will slow down the creation of split tables
576     String[] indexes=conn.getAllIndexes4Table(table);
577     for (String index:indexes) {
578     conn.executeSql("DROP INDEX "+index+" ON "+splitTbl+";");
579     }
580     // make the table a memory table (won't be feasible in general case where tables can be VERY big, even white won't cope)
581     //query="ALTER TABLE "+splitTbl+" TYPE=MEMORY;";
582     //conn.executeSql(query);
583     int idmin=idSets.get(node)[0];
584     int idmax=idSets.get(node)[idSets.get(node).length-1];
585 duarte 73 query="INSERT INTO "+splitTbl+" SELECT * FROM "+table+" WHERE "+key+">="+idmin+" AND "+key+"<="+idmax+";";
586 duarte 65 conn.executeSql(query);
587     }
588     // transfering data across
589     String[] srchosts={MASTER};
590     String[] desthosts=getNodes();
591     dumpData(srchosts,splitTables);
592     // using here loadSplitData rather than loadData because table names are not the same on source and destination, i.e. source: table_split_tla01, dest: table
593     loadSplitData(MASTER,desthosts,table,destDb);
594     // droping table, we don't want them anymore after loading data to nodes
595     for (String tbl:splitTables){
596     query="DROP TABLE "+tbl+";";
597     conn.executeSql(query);
598     }
599     conn.close();
600     }
601     catch (SQLException e){
602     e.printStackTrace();
603     }
604 duarte 69 // putting the ids in the key_master database so we keep track of where everything is
605     insertIdsToKeyMaster(key,table,destDb,idSets);
606 duarte 65 }
607 duarte 69
608     /**
609     * Insert all ids to the key_master database creating a new table for this table/destDb combination if not exists
610     * @param key name of key on which distribution of table is based
611     * @param table name of table that we are distributing
612     * @param destDb name of database in nodes where data is distributed
613 duarte 71 * @param idSets as returned from splitIdsIntoSets or getIdSetsFromNodes
614 duarte 69 */
615     public void insertIdsToKeyMaster(String key,String table,String destDb,HashMap<String,int[]> idSets) {
616 duarte 79 MySQLConnection conn = this.getConnectionToMasterKeyDb();
617     String keyMasterTbl = createNewKeyMasterTbl(key,table,destDb);
618     removePK(keyMasterTbl,key); // attention removing primary keys, duplicates won't be checked!!!
619 duarte 69 for (String node:idSets.keySet()){
620     int[] thisNodeIds=idSets.get(node);
621     for (int id:thisNodeIds){
622 duarte 75 String query="INSERT INTO "+keyMasterTbl+" ("+key+",client_id) " +
623 duarte 69 "SELECT "+id+",c.client_id FROM clients_names AS c WHERE client_name='"+node+"';";
624     try {
625     conn.executeSql(query);
626     } catch (SQLException e) {
627     e.printStackTrace();
628     }
629     }
630     }
631 duarte 79 removeZeros(keyMasterTbl,key); // we only have inserted 0s for records that we didn't want, it's safe now to get rid of them
632     addPK(keyMasterTbl,key); // if there were duplicates, this should barf
633 duarte 69 }
634 duarte 65
635     /**
636 duarte 79 * To create a new key master table in the key_master database given a key, table and db. Used by insertIdsToKeyMaster
637     * Eventually this method on other key_master related should go into their own class, shouldn't they?
638     * @param key
639     * @param table
640     * @param destDb the db
641     * @return the name of the key master table created
642     */
643     public String createNewKeyMasterTbl(String key,String table,String destDb) {
644     String keyMasterTbl=destDb+"__"+table;
645     MySQLConnection conn=this.getConnectionToMasterKeyDb();
646     try {
647     String query="CREATE TABLE IF NOT EXISTS "+keyMasterTbl+" ("+
648     key+" int(11) NOT NULL auto_increment, " +
649     "client_id smallint(6) NOT NULL default '0', " +
650     "PRIMARY KEY (`"+key+"`) " +
651     ") ENGINE=MyISAM DEFAULT CHARSET=ascii COLLATE=ascii_bin;";
652     Statement S=conn.createStatement();
653     S.executeUpdate(query);
654     S.close();
655     } catch (SQLException e) {
656     System.err.println("Couldn't create table "+keyMasterTbl);
657     e.printStackTrace();
658     }
659     try {
660     Statement S=conn.createStatement();
661     String query="INSERT INTO dbs_keys (key_name,db,key_master_table) VALUES (\'"+key+"\',\'"+db+"\',\'"+keyMasterTbl+"\');";
662     S.executeUpdate(query);
663     S.close();
664     } catch (SQLException e) {
665     System.err.println("Didn't insert new record into table dbs_keys of database: "+KEYMASTERDB+". The record for key: "+key+", table: "+table+" existed already. This is usually a harmless error!");
666     System.err.println("SQLException: " + e.getMessage());
667     }
668     conn.close();
669     return keyMasterTbl;
670     }
671    
672     public void removePK (String keyMasterTbl,String key){
673     MySQLConnection conn=this.getConnectionToMasterKeyDb();
674     try {
675     String query="ALTER TABLE "+keyMasterTbl+" MODIFY "+key+" int(11) NOT NULL default '0';";
676     conn.executeSql(query);
677     query="ALTER TABLE "+keyMasterTbl+" DROP PRIMARY KEY;";
678     conn.executeSql(query);
679     } catch (SQLException e) {
680     e.printStackTrace();
681     }
682     conn.close();
683     }
684    
685     public void addPK (String keyMasterTbl, String key){
686     MySQLConnection conn=this.getConnectionToMasterKeyDb();
687     try {
688     String query="ALTER TABLE "+keyMasterTbl+" ADD PRIMARY KEY("+key+");";
689     conn.executeSql(query);
690     query="ALTER TABLE "+keyMasterTbl+" MODIFY "+key+" int(11) NOT NULL auto_increment;";
691     conn.executeSql(query);
692     } catch (SQLException e) {
693     e.printStackTrace();
694     }
695     conn.close();
696     }
697    
698     public void removeZeros (String keyMasterTbl, String key){
699     MySQLConnection conn=this.getConnectionToMasterKeyDb();
700     try {
701     String query="DELETE FROM "+keyMasterTbl+" WHERE "+key+"=0;";
702     conn.executeSql(query);
703     } catch (SQLException e) {
704     e.printStackTrace();
705     }
706     conn.close();
707     }
708    
709     /**
710 duarte 65 * Executes a query in all nodes in cluster.
711     * TODO Right now it is serial, must parallelize this with threads
712     * @param query
713     */
714     public void clusterExecuteQuery(String query){
715     String[] nodes = getNodes();
716     for (String node: nodes){
717     try {
718     MySQLConnection conn = this.getConnectionToNode(node);
719     conn.executeSql(query);
720     conn.close();
721     }
722     catch(SQLException e){
723     e.printStackTrace();
724     System.err.println("Couldn't execute query="+query+", in node="+node);
725     System.exit(1);
726     }
727     }
728    
729     }
730 duarte 68
731     /**
732     * Executes a query in all nodes in cluster given a HashMap containing a set of queries (one per node)
733     * TODO Right now it is serial, must parallelize this with threads
734     * TODO This can be used in the load/dump methods in this class where queries are different for each node
735 duarte 75 * @param queries a HashMap containing a query per node
736 duarte 68 */
737     public void clusterExecuteQuery(HashMap<String,String> queries){
738     String[] nodes = getNodes();
739     for (String node: nodes){
740     String query="";
741     try {
742     query=queries.get(node);
743     MySQLConnection conn = this.getConnectionToNode(node);
744     conn.executeSql(query);
745     conn.close();
746     }
747     catch(SQLException e){
748     e.printStackTrace();
749     System.err.println("Couldn't execute query="+query+", in node="+node);
750     System.exit(1);
751     }
752     }
753    
754     }
755    
756 duarte 65 }

Properties

Name Value
svn:executable *